Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package org.apache.solr.cloud;
  
  /*
   * Licensed to the Apache Software Foundation (ASF) under one or more
   * contributor license agreements.  See the NOTICE file distributed with
   * this work for additional information regarding copyright ownership.
   * The ASF licenses this file to You under the Apache License, Version 2.0
   * (the "License"); you may not use this file except in compliance with
   * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
 import java.net.URI;
 import java.util.List;
 
 
Kindly borrowed the idea and base implementation from the ActiveMQ project; useful for blocking traffic on a specified port.
 
 public class SocketProxy {
   
   private static final transient Logger log = LoggerFactory.getLogger(SocketProxy.class);
   
   public static final int ACCEPT_TIMEOUT_MILLIS = 100;
   
   private URI proxyUrl;
   private URI target;
   
   private Acceptor acceptor;
   private ServerSocket serverSocket;
   
   private CountDownLatch closed = new CountDownLatch(1);
   
   public List<Bridgeconnections = new LinkedList<Bridge>();
   
   private int listenPort = 0;
   
   private int receiveBufferSize = -1;
   
   private boolean pauseAtStart = false;
   
   private int acceptBacklog = 50;
   
   public SocketProxy() throws Exception {}
   
   public SocketProxy(URI urithrows Exception {
     this(0, uri);
   }
   
   public SocketProxy(int portURI urithrows Exception {
      = port;
      = uri;
     open();
   }
   
   public String toString() {
     return "SocketyProxy: port="++"; target="+;
   }
     
   public void setReceiveBufferSize(int receiveBufferSize) {
     this. = receiveBufferSize;
   }
   
   public void setTarget(URI tcpBrokerUri) {
      = tcpBrokerUri;
   }
   
   public void open() throws Exception {
     if ( > 0) {
     }
    if ( == null) {
    } else {
    }
    if () {
      .pause();
    }
    new Thread(null"SocketProxy-Acceptor-"
        + .getLocalPort()).start();
     = new CountDownLatch(1);
  }
  
  private boolean isSsl(URI target) {
    return "ssl".equals(target.getScheme());
  }
  
  private ServerSocket createServerSocket(URI targetthrows Exception {
    if (isSsl(target)) {
      return SSLServerSocketFactory.getDefault().createServerSocket();
    }
    return new ServerSocket();
  }
  
  private Socket createSocket(URI targetthrows Exception {
    if (isSsl(target)) {
      return SSLSocketFactory.getDefault().createSocket();
    }
    return new Socket();
  }
  
  public URI getUrl() {
    return ;
  }
  
  /*
   * close all proxy connections and acceptor
   */
  public void close() {
    List<Bridgeconnections;
    synchronized (this.) {
      connections = new ArrayList<Bridge>(this.);
    }
    .warn("Closing " + connections.size()+" connections to: "+getUrl());
    for (Bridge con : connections) {
      closeConnection(con);
    }
    .close();
    .countDown();
  }
  
  /*
   * close all proxy receive connections, leaving acceptor open
   */
  public void halfClose() {
    List<Bridgeconnections;
    synchronized (this.) {
      connections = new ArrayList<Bridge>(this.);
    }
    .info("halfClose, numConnections=" + connections.size());
    for (Bridge con : connections) {
      halfCloseConnection(con);
    }
  }
  
  public boolean waitUntilClosed(long timeoutSeconds)
      throws InterruptedException {
    return .await(timeoutSeconds.);
  }
  
  /*
   * called after a close to restart the acceptor on the same port
   */
  public void reopen() {
    .info("Re-opening connectivity to "+getUrl());
    try {
      open();
    } catch (Exception e) {
      .debug("exception on reopen url:" + getUrl(), e);
    }
  }
  
  /*
   * pause accepting new connections and data transfer through existing proxy
   * connections. All sockets remain open
   */
  public void pause() {
    synchronized () {
      .info("pause, numConnections=" + .size());
      .pause();
      for (Bridge con : ) {
        con.pause();
      }
    }
  }
  
  /*
   * continue after pause
   */
  public void goOn() {
    synchronized () {
      .info("goOn, numConnections=" + .size());
      for (Bridge con : ) {
        con.goOn();
      }
    }
    .goOn();
  }
  
  private void closeConnection(Bridge c) {
    try {
      c.close();
    } catch (Exception e) {
      .debug("exception on close of: " + ce);
    }
  }
  
  private void halfCloseConnection(Bridge c) {
    try {
      c.halfClose();
    } catch (Exception e) {
      .debug("exception on half close of: " + ce);
    }
  }
  
  public boolean isPauseAtStart() {
    return ;
  }
  
  public void setPauseAtStart(boolean pauseAtStart) {
    this. = pauseAtStart;
  }
  
  public int getAcceptBacklog() {
    return ;
  }
  
  public void setAcceptBacklog(int acceptBacklog) {
    this. = acceptBacklog;
  }
  
  private URI urlFromSocket(URI uriServerSocket serverSocket)
      throws Exception {
    int listenPort = serverSocket.getLocalPort();
    
    return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(),
        listenPorturi.getPath(), uri.getQuery(), uri.getFragment());
  }
  
  public class Bridge {
    
    private Socket receiveSocket;
    private Socket sendSocket;
    private Pump requestThread;
    private Pump responseThread;
    
    public Bridge(Socket socketURI targetthrows Exception {
       = socket;
       = createSocket(target);
      if ( > 0) {
      }
      .connect(new InetSocketAddress(target.getHost(), target
          .getPort()));
      .info("proxy connection " +  + ", receiveBufferSize="
          + .getReceiveBufferSize());
    }
    
    public void goOn() {
      .goOn();
      .goOn();
    }
    
    public void pause() {
      .pause();
      .pause();
    }
    
    public void close() throws Exception {
      synchronized () {
        .remove(this);
      }
      .close();
      .close();
    }
    
    public void halfClose() throws Exception {
      .close();
    }
    
    private void linkWithThreads(Socket sourceSocket dest) {
       = new Pump(sourcedest);
      .start();
       = new Pump(destsource);
      .start();
    }
    
    public class Pump extends Thread {
      
      protected Socket src;
      private Socket destination;
      
      public Pump(Socket sourceSocket dest) {
        super("SocketProxy-DataTransfer-" + source.getPort() + ":"
            + dest.getPort());
         = source;
         = dest;
        .set(new CountDownLatch(0));
      }
      
      public void pause() {
        .set(new CountDownLatch(1));
      }
      
      public void goOn() {
        .get().countDown();
      }
      
      public void run() {
        byte[] buf = new byte[1024];
        try {
          InputStream in = .getInputStream();
          OutputStream out = .getOutputStream();
          while (true) {
            int len = in.read(buf);
            if (len == -1) {
              .debug("read eof from:" + );
              break;
            }
            .get().await();
            out.write(buf, 0, len);
          }
        } catch (Exception e) {
          .debug("read/write failed, reason: " + e.getLocalizedMessage());
          try {
            if (!.isClosed()) {
              // for halfClose, on read/write failure if we close the
              // remote end will see a close at the same time.
              close();
            }
          } catch (Exception ignore) {}
        }
      }
    }
  }
  
  public class Acceptor implements Runnable {
    
    private ServerSocket socket;
    private URI target;
    
    public Acceptor(ServerSocket serverSocketURI uri) {
       = serverSocket;
       = uri;
      .set(new CountDownLatch(0));
      try {
      } catch (SocketException e) {
        e.printStackTrace();
      }
    }
    
    public void pause() {
      .set(new CountDownLatch(1));
    }
    
    public void goOn() {
      .get().countDown();
    }
    
    public void run() {
      try {
        while (!.isClosed()) {
          .get().await();
          try {
            Socket source = .accept();
            .get().await();
            if ( > 0) {
              source.setReceiveBufferSize();
            }
            .info("accepted " + source + ", receiveBufferSize:"
                + source.getReceiveBufferSize());
            synchronized () {
              .add(new Bridge(source));
            }
          } catch (SocketTimeoutException expected) {}
        }
      } catch (Exception e) {
        .debug("acceptor: finished for reason: " + e.getLocalizedMessage());
      }
    }
    
    public void close() {
      try {
        .close();
        .countDown();
        goOn();
      } catch (IOException ignored) {}
    }
  }
  
New to GrepCode? Check out our FAQ X