Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
  * JBoss, Home of Professional Open Source
  * Copyright 2005, JBoss Inc., and individual contributors as indicated
  * by the @authors tag. See the copyright.txt in the distribution for a
  * full listing of individual contributors.
  *
  * This is free software; you can redistribute it and/or modify it
  * under the terms of the GNU Lesser General Public License as
  * published by the Free Software Foundation; either version 2.1 of
 * the License, or (at your option) any later version.
 *
 * This software is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this software; if not, write to the Free
 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
 */
 
 package org.jboss.remoting.transport.socket;
 
 import  org.jboss.util.propertyeditor.PropertyEditors;
 import  org.jboss.logging.Logger;
 
 import java.util.Map;
 import java.util.Set;
SocketServerInvoker is the server-side of a SOCKET based transport

Author(s):
Jeff Haynie
Tom Elrod
Ovidiu Feodorov
Version:
$Revision: 6291 $
Jmx:
mbean
 
 public class SocketServerInvoker extends ServerInvoker implements RunnableSocketServerInvokerMBean
 {
    private static final Logger log = Logger.getLogger(SocketServerInvoker.class);
 
    private static boolean trace = .isTraceEnabled();
 
    static int clientCount = 0;
 
    private Properties props = new Properties();
 
    private static int BACKLOG_DEFAULT = 200;
    protected static int MAX_POOL_SIZE_DEFAULT = 300;

   
Key for indicating if socket invoker should continue to keep socket connection between client and server open after invocations by sending a ping on the connection before being re-used. The default for this is false.
 
    public static final String CHECK_CONNECTION_KEY = "socket.check_connection";

   
Specifies the fully qualified class name for the custom SocketWrapper implementation to use on the server.
 
    public static final String SERVER_SOCKET_CLASS_FLAG = "serverSocketClass";
    protected String serverSocketClass = ServerSocketWrapper.class.getName();
 
    protected ServerSocket serverSocket = null;
    protected boolean running = false;
    protected int backlog = ;
    protected Thread[] acceptThreads;
    protected int numAcceptThreads = 1;
    protected int maxPoolSize = ;
    protected LRUPool clientpool;
    protected LinkedList threadpool;
    protected int acceptThreadPriorityIncrement = 0;
 
    protected boolean newServerSocketFactory = false;
    protected Object serverSocketFactoryLock = new Object();
 
    protected boolean reuseAddress = true;
 
    // defaults to -1 as to not have idle timeouts
    protected int idleTimeout = -1;
    protected IdleTimerTask idleTimerTask = null;
    
   protected int writeTimeout = -1;
   public SocketServerInvoker(InvokerLocator locator)
   {
      super(locator);
   }
   public SocketServerInvoker(InvokerLocator locatorMap configuration)
   {
      super(locatorconfiguration);
   }

   
after a truststore update use this to set a new ServerSocketFactory to the invoker
then a new ServerSocket is created that accepts the new connections

Parameters:
serverSocketFactory
   public void setNewServerSocketFactory(ServerSocketFactory serverSocketFactory)
   {
      .trace("entering setNewServerSocketFactory()");
      synchronized ()
      {
         =true;
         setServerSocketFactory(serverSocketFactory);
         .notify();
         .info("ServerSocketFactory has been updated");
      }
   }

   
refreshes the serverSocket by closing old one and creating a new ServerSocket from new ServerSocketFactory

Throws:
IOException
   protected void refreshServerSocket() throws IOException
   {
      .trace("entering refreshServerSocket()");
      synchronized ()
      {
         // If release() is able to enter its synchronized block and sees 
         // serverSocket == null, then it knows that something went wrong.
         =false;
         ServerSocket oldServerSocket = ;
          = null;
         oldServerSocket.close();
         InetAddress bindAddress = InetAddress.getByName(getServerBindAddress());
         ServerSocket newServerSocket = createServerSocket(getServerBindPort(), bindAddress);
         newServerSocket.setReuseAddress();
          = newServerSocket;
         .info("ServerSocket has been updated");
      }
      .trace("leavinging refreshServerSocket()");
   }
   protected void setup() throws Exception
   {
      PropertyEditors.mapJavaBeanProperties(thisfalse);
      super.setup();
      if(ssclass != null)
      {
          = ssclass;
      }
   }
   protected void finalize() throws Throwable
   {
      stop();
      super.finalize();
   }

   
Starts the invoker.

Jmx.managed:
operation description = "Start sets up the ServerInvoker we are wrapping." impact = "ACTION"
   public synchronized void start() throws IOException
   {
      if(!)
      {
         .debug(this + " starting");
         InetAddress bindAddress = InetAddress.getByName(getServerBindAddress());
         if( <= 0)
         {
            //need to reset to default
             = ;
         }
         try
         {
             = createServerSocket(getServerBindPort(), bindAddress);
            .setReuseAddress();
         }
         catch(IOException e)
         {
            .error("Error starting ServerSocket.  Bind port: " + getServerBindPort() +
               ", bind address: " + bindAddress);
            throw e;
         }
          = new LRUPool(2, );
         .create();
          = new LinkedList();
          = new Thread[];
         for(int i = 0; i < i++)
         {
            if() { .trace(this + " creating another AcceptThread"); }
            String name = getThreadName(i);
            [i] = new Thread(thisname);
            if() { .trace(this + " created and registered " + [i
                                       + " with priority " + [i].getPriority()); }
         }
      }
      try
      {
         super.start();
      }
      catch(IOException e)
      {
         .error("Error starting SocketServerInvoker."e);
         cleanup();
      }
      if(!)
      {
          = true;
         for(int i = 0; i < i++)
         {
            [i].start();
         }
      }
      if( > 0)
      {
         if( != null)
         {
            .cancel();
         }
          = new IdleTimerTask();
         TimerUtil.schedule( * 1000);
      }
      else
      {
         if( != null)
         {
            .cancel();
         }
      }
      .debug(this + " started");
   }
   protected ServerSocket createServerSocket(int serverBindPort,
                                             int backlog,
                                             InetAddress bindAddressthrows IOException
   {
      return getServerSocketFactory().createServerSocket(serverBindPortbacklogbindAddress);
   }
   protected String getThreadName(int i)
   {
      return "AcceptorThread#" + i + ":" + getServerBindPort();
   }
   public void destroy()
   {
      if( != null)
      {
         .destroy();
      }
      super.destroy();
   }

   
Stops the invoker.

Jmx.managed:
operation description = "Stops the invoker." impact = "ACTION"
   public synchronized void stop()
   {
      if()
      {
         cleanup();
      }
      super.stop();
   }
   protected void cleanup()
   {
       = false;
       = 0; // so ServerThreads don't reinsert themselves
      if( != null)
      {
         for(int i = 0; i < .i++)
         {
            try
            {
               [i].interrupt();
            }
            catch(Exception ignored)
            {
            }
         }
      }
      // The following code has been changed to avoid a race condition with ServerThread.run() which
      // can result in leaving ServerThreads alive, which causes a memory leak.
      if ( != null)
      {
         synchronized ()
         {
            Set svrThreads = .getContents();
            Iterator itr = svrThreads.iterator();
            while(itr.hasNext())
            {
               Object o = itr.next();
               ServerThread st = (ServerThreado;
               st.shutdown();
            }
            .flush();
            .stop();
            if ( != null)
            {
               synchronized()
               {
                  int threadsToShutdown = .size();
                  for(int i = 0; i < threadsToShutdowni++)
                  {
                     ServerThread thread = (ServerThread.removeFirst();
                     thread.shutdown();
                  }
               }
            }
         }
      }
      try
      {
         .close();
      }
      catch(Exception e)
      {
      }
   }

   
Indicates if SO_REUSEADDR is enabled on server sockets Default is true.
   public boolean getReuseAddress()
   {
      return ;
   }

   
Sets if SO_REUSEADDR is enabled on server sockets. Default is true.

Parameters:
reuse
   public void setReuseAddress(boolean reuse)
   {
      this. = reuse;
   }

   

Returns:
Number of idle ServerThreads
Jmx:
managed-attribute
   public int getCurrentThreadPoolSize()
   {
      return .size();
   }

   

Returns:
Number of ServerThreads current executing or waiting on an invocation
Jmx:
managed-attribute
   public int getCurrentClientPoolSize()
   {
      return .size();
   }

   
Getter for property numAcceptThreads

Returns:
The number of threads that exist for accepting client connections
Jmx:
managed-attribute
   public int getNumAcceptThreads()
   {
      return ;
   }

   
Setter for property numAcceptThreads

Parameters:
size The number of threads that exist for accepting client connections
Jmx:
managed-attribute
   public void setNumAcceptThreads(int size)
   {
      this. = size;
   }

   
Setter for max pool size. The number of server threads for processing client. The default is 300.

Returns:
Jmx:
managed-attribute
   public int getMaxPoolSize()
   {
      return ;
   }

   
The number of server threads for processing client. The default is 300.

Parameters:
maxPoolSize
Jmx:
managed-attribute
   public void setMaxPoolSize(int maxPoolSize)
   {
      this. = maxPoolSize;
   }

   

Jmx:
managed-attribute
   public int getBacklog()
   {
      return ;
   }

   

Jmx:
managed-attribute
   public void setBacklog(int backlog)
   {
      if(backlog < 0)
      {
         this. = ;
      }
      else
      {
         this. = backlog;
      }
   }
   public int getIdleTimeout()
   {
      return ;
   }

   
Sets the timeout for idle threads to be removed from pool. If the value is greater than 0, then idle timeout will be activated, otherwise no idle timeouts will occur. By default, this value is -1.

Parameters:
idleTimeout number of seconds before a idle thread is timed out.
   public void setIdleTimeout(int idleTimeout)
   {
      this. = idleTimeout;
      if(isStarted())
      {
         if(idleTimeout > 0)
         {
            if( != null)
            {
               .cancel();
            }
             = new IdleTimerTask();
            TimerUtil.schedule(idleTimeout * 1000);
         }
         else
         {
            if( != null)
            {
               .cancel();
            }
         }
      }
   }
   
   public int getWriteTimeout()
   {
      return ;
   }
   public void setWriteTimeout(int writeTimeout)
   {
      this. = writeTimeout;
   }
   {
   }
   public void setAcceptThreadPriorityIncrement(int acceptThreadPriorityIncrement)
   {
      int resultingPriority = . + acceptThreadPriorityIncrement;
      if (resultingPriority < . || resultingPriority > .)
      {
         .warn(this + " resulting priority out of range: " + resultingPriority);
      }
      else
      {
         this. = acceptThreadPriorityIncrement;
      }
   }
   public void run()
   {
      if() { .trace(this + " started execution of method run()"); }
      ServerSocketRefresh thread = new ServerSocketRefresh();
      thread.setDaemon(true);
      thread.start();
      try
      {
         while()
         {
            try
            {
               thread.release(); //goes on if serversocket refresh is completed
               if() { .trace(this + " is going to wait on serverSocket.accept()"); }
               Socket socket = .accept();
               if() { .trace(this + " accepted " + socket); }
               // the acceptor thread should spend as little time as possbile doing any kind of
               // operation, and under no circumstances should perform IO on the new socket, which
               // can potentially block and lock up the server. For this reason, the acceptor thread
               // should grab a worker thread and delegate all subsequent work to it. This is what
               // processInvocation() does.
               processInvocation(socket);
            }
            catch (SSLException e)
            {
               .error("SSLServerSocket error"e);
               return;
            }
            catch (InvalidStateException e)
            {
               .error("Cannot proceed without functioning server socket.  Shutting down");
               stop();
            }
            catch(Throwable ex)
            {  
               if()
               {
                  .error(this + " failed to handle socket"ex);
               }
               else
               {
                  .debug(this + " caught exception in run()"ex);     
               }
            }
         }
      }
      finally
      {
         thread.shutdown();
      }
   }


   
The acceptor thread should spend as little time as possbile doing any kind of operation, and under no circumstances should perform IO on the new socket, which can potentially block and lock up the server. For this reason, the acceptor thread should grab a worker thread and delegate all subsequent work to it.
   protected void processInvocation(Socket socketthrows Exception
   {
      ServerThread worker = null;
      boolean newThread = false;
      while(worker == null)
      {
         if() { .trace(this + " trying to get a worker thread from threadpool for processing"); }
         synchronized()
         {
            if(.size() > 0)
            {
               worker = (ServerThread).removeFirst();
               if() { .trace(this + (worker == null ? " found NO threads in threadpool" : " got " + worker + " from threadpool")); }
            }
            else if () { { .trace(this + " has an empty threadpool"); } }
         }
         if(worker == null)
         {
            synchronized()
            {
               if(.size() < )
               {
                  if() { .trace(this + " creating new worker thread"); }
                  worker = new ServerThread(socketthis,
                                            getTimeout(), );
                  worker.setPriority(.);
                  if() { .trace(this + " created " + worker + " with priority " + worker.getPriority()); }
                  newThread = true;
               }
               if(worker == null)
               {
                  if() {.trace(this + " trying to evict a thread from clientpool"); }
                  .evict();
                  if() {.trace(this + " waiting for a thread from clientpool"); }
                  .wait();
                  if() { .trace(this + " notified of clientpool thread availability"); }
               }
            }
         }
      }
      synchronized()
      {
         .insert(workerworker);
      }
      if(newThread)
      {
         if() {.trace(this + " starting " + worker); }
         worker.start();
      }
      else
      {
         if() { .trace(this + " reusing " + worker); }
         worker.wakeup(socketgetTimeout(), this);
      }
   }

   
returns true if the transport is bi-directional in nature, for example, SOAP in unidirectional and SOCKETs are bi-directional (unless behind a firewall for example).
   public boolean isTransportBiDirectional()
   {
      return true;
   }
   public String toString()
   {
      return "SocketServerInvoker[" +
         ( == null ?
            "UNINITIALIZED" :
            .getInetAddress().getHostAddress() + ":" + .getLocalPort()) +
         "]";
   }

   
Each implementation of the remote client invoker should have a default data type that is uses in the case it is not specified in the invoker locator uri.
   protected String getDefaultDataType()
   {
      return .;
   }

   
this thread checks if a new ServerSocketFactory was set,
if so initializes a serversocket refresh

Author(s):
Michael Voss
   public class ServerSocketRefresh extends Thread
   {  
      private boolean running = true;
      
      public ServerSocketRefresh()
      {
         super("ServerSocketRefresh");
      }
      
      public void run()
      {
         while()
         {
            synchronized ()
            {  
               if()
               {
                  .debug("got notice about new ServerSocketFactory");
                  try
                  {
                     .debug("refreshing server socket");
                     refreshServerSocket();
                  } catch (IOException e)
                  {
                     .debug("could not refresh server socket");
                     .debug("message is: "+e.getMessage());
                  }
                  .debug("server socket refreshed");
               }
               
               try
               {
                  .wait();
                  .trace("ServerSocketRefresh thread woke up");
               }
               catch (InterruptedException e)
               {
               }
            }
         }
      }

      
Let SocketServerInvoker.run() resume when refresh is completed
      public void release() throws InvalidStateException
      {
         synchronized ()
         {
            if ( == null)
            {
               throw new InvalidStateException("error refreshing ServerSocket");
            }
            .trace("passed through ServerSocketRefresh.release()");
         }
      }
      
      public void shutdown()
      {
          = false;
         
         synchronized ()
         {
            .notify();
         }
      }
   }

   
The IdleTimerTask is used to periodically check the server threads to see if any have been idle for a specified amount of time, and if so, release those threads and their connections and clear from the server thread pool.
   public class IdleTimerTask extends TimerTask
   {
      public void run()
      {
         Object[] svrThreadArray = null;
         synchronized()
         {
            Set svrThreads = .getContents();
            svrThreadArray = svrThreads.toArray();
         }
         if()
         {
            if(svrThreadArray != null)
               {
                  .trace("Idle timer task fired.  Number of ServerThreads = " + svrThreadArray.length);
               }
         }
         // iterate through pooled server threads and evict idle ones
         if(svrThreadArray != null)
         {
            long currentTime = System.currentTimeMillis();
            for(int x = 0; x < svrThreadArray.lengthx++)
            {
               ServerThread svrThread = (ServerThread)svrThreadArray[x];
               // check the idle time and evict
               long idleTime = currentTime - svrThread.getLastRequestTimestamp();
               if()
               {
                  .trace("Idle time for ServerThread (" + svrThread + ") is " + idleTime);
               }
               long idleTimeout = getIdleTimeout() * 1000;
               if(idleTime > idleTimeout)
               {
                  if()
                  {
                     .trace("Idle timeout reached for ServerThread (" + svrThread + ") and will be evicted.");
                  }
                  synchronized ()
                  {
                     .remove(svrThread);
                     .notify();
                  }
                  svrThread.shutdown();
               }
            }
         }
         // now check idle server threads in the thread pool
         svrThreadArray = null;
         synchronized()
         {
            if(.size() > 0)
            {
               // now need to check the tread pool to remove threads
               svrThreadArray = .toArray();
            }
         }
         if()
         {
            if(svrThreadArray != null)
            {
               .trace("Number of ServerThread in thead pool = " + svrThreadArray.length);
            }
         }
         if(svrThreadArray != null)
         {
            long currentTime = System.currentTimeMillis();
            for(int x = 0; x < svrThreadArray.lengthx++)
            {
               ServerThread svrThread = (ServerThread)svrThreadArray[x];
               long idleTime = currentTime - svrThread.getLastRequestTimestamp();
               if()
               {
                  .trace("Idle time for ServerThread (" + svrThread + ") is " + idleTime);
               }
               long idleTimeout = getIdleTimeout() * 1000;
               if(idleTime > idleTimeout)
               {
                  if()
                  {
                     .trace("Idle timeout reached for ServerThread (" + svrThread + ") and will be removed from thread pool.");
                  }
                  synchronized()
                  {
                     .remove(svrThread);
                  }
                  svrThread.shutdown();
               }
            }
         }
      }
   }
New to GrepCode? Check out our FAQ X