Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
  * JBoss, Home of Professional Open Source
  * Copyright 2005, JBoss Inc., and individual contributors as indicated
  * by the @authors tag. See the copyright.txt in the distribution for a
  * full listing of individual contributors.
  *
  * This is free software; you can redistribute it and/or modify it
  * under the terms of the GNU Lesser General Public License as
  * published by the Free Software Foundation; either version 2.1 of
 * the License, or (at your option) any later version.
 *
 * This software is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this software; if not, write to the Free
 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
 */
 
 
 package org.jboss.remoting.transport.socket;
 
 import  org.jboss.logging.Logger;
 import  org.jboss.serial.io.JBossObjectInputStream;
 
 import java.util.Map;

This Thread object hold a single Socket connection to a client and is kept alive until a timeout happens, or it is aged out of the SocketServerInvoker's LRU cache.

There is also a separate thread pool that is used if the client disconnects. This thread/object is re-used in that scenario and that scenario only.

This is a customization of the same ServerThread class used witht the PookedInvoker. The custimization was made to allow for remoting marshaller/unmarshaller.

Author(s):
Bill Burke
Tom Elrod
Ovidiu Feodorov
Version:
$Revision: 5709 $
 
 public class ServerThread extends Thread
 {
    // Constants ------------------------------------------------------------------------------------
    
   
Key used to determine if thread should return to threadpool after SocketTimeoutException
 
    public static final String CONTINUE_AFTER_TIMEOUT = "continueAfterTimeout";
    
    final static private Logger log = Logger.getLogger(ServerThread.class);
 
    // Static ---------------------------------------------------------------------------------------
 
    private static boolean trace = .isTraceEnabled();
 
    private static int idGenerator = 0;
 
    public static synchronized int nextID()
    {
       return ++;
    }
 
    // Attributes -----------------------------------------------------------------------------------
 
    protected volatile boolean running;
    protected volatile boolean handlingResponse;
    protected volatile boolean shutdown;
   protected LRUPool clientpool;
   protected LinkedList threadpool;
   protected String serverSocketClassName;
   protected Class serverSocketClass;
   private Socket socket;
   private int timeout;
   private int writeTimeout;
   protected SocketServerInvoker invoker;
   protected SocketWrapper socketWrapper;
   protected Marshaller marshaller;
   protected UnMarshaller unmarshaller;
   // the unique identity of the thread, which won't change during the life of the thread. The
   // thread may get associated with different IP addresses though.
   private int id = .;
   // Indicates if will check the socket connection when getting from pool by sending byte over the
   // connection to validate is still good.
    private boolean shouldCheckConnection;
   // Will indicate when the last request has been processed (used in determining idle
   // connection/thread timeout)
   private long lastRequestHandledTimestamp = System.currentTimeMillis();
   
   private boolean reuseAfterTimeout;
   // Constructors ---------------------------------------------------------------------------------
   public ServerThread(Socket socketSocketServerInvoker invokerLRUPool clientpool,
                       LinkedList threadpoolint timeoutint writeTimeoutString serverSocketClassName)
      throws Exception
   {
      super();
       = true;
       = true// start off as true so that nobody can interrupt us
      setName(getWorkerThreadName(socket));
      this. = socket;
      this. = timeout;
      this. = writeTimeout;
      this. = serverSocketClassName;
      this. = invoker;
      this. = clientpool;
      this. = threadpool;
      processNewSocket();
      if (invoker != null)
      {
         Map configMap = invoker.getConfiguration();
         String checkValue = (String)configMap.get(.);
         if (checkValue != null && checkValue.length() > 0)
         {
             = Boolean.valueOf(checkValue).booleanValue();
         }
         else if (Version.getDefaultVersion() == .)
         {
             = true;
         }
      }
   }
   // Thread overrides -----------------------------------------------------------------------------
   public void run()
   {
      try
      {
         while (true)
         {
            dorun();
            // The following code has been changed to eliminate a race condition with
            // SocketServerInvoker.cleanup().
            //
            // A ServerThread can shutdown for two reasons:
            // 1. the client shuts down, and
            // 2. the server shuts down.
            //
            // If both occur around the same time, a problem arises.  If a ServerThread starts to
            // shut down because the client shut down, it will test shutdown, and if it gets to the
            // test before SocketServerInvoker.cleanup() calls ServerThread.stop() to set shutdown
            // to true, it will return itself to threadpool.  If it moves from clientpool to
            // threadpool at just the right time, SocketServerInvoker could miss it in both places
            // and never call stop(), leaving it alive, resulting in a memory leak.  The solution is
            // to synchronize parts of ServerThread.run() and SocketServerInvoker.cleanup() so that
            // they interact atomically.
            synchronized (this)
            {
               synchronized ()
               {
                  synchronized ()
                  {
                     if ()
                     {
                         = null;
                        return// exit thread
                     }
                     else
                     {
                        if() { .trace(this + " removing itself from clientpool and going to threadpool"); }
                        .remove(this);
                        .add(this);
                        Thread.interrupted(); // clear any interruption so that we can be pooled.
                        .notify();
                     }
                  }
               }
               while (true)
               {
                  try
                  {
                     if() { .trace(this + " begins to wait"); }
                     wait();
                     if() { .trace(this + " woke up after wait"); }
                     
                     break;
                  }
                  catch (InterruptedException e)
                  {
                     if ()
                     {
                         = null;
                        return// exit thread
                     }
                  }
               }
            }
         }
      }
      catch (Exception e)
      {
         .debug(this + " exiting run on exception, definitively thrown out of the threadpool"e);
      }
   }
   // Public ---------------------------------------------------------------------------------------
   public synchronized void wakeup(Socket socketint timeoutSocketServerInvoker invoker)
      throws Exception
   {
      // rename the worker thread to reflect the new socket it is handling
      setName(getWorkerThreadName(socket));
      this. = socket;
      this. = timeout;
      this. = invoker;
       = true;
       = true;
      processNewSocket();
      notify();
      if() { .trace(this + " has notified on mutex"); }
   }
   public long getLastRequestTimestamp()
   {
      return ;
   }
   public void shutdown()
   {
       = true;
       = false;
      // This is a race and there is a chance that a invocation is going on at the time of the
      // interrupt.  But I see no way right now to protect for this.
      // NOTE ALSO!: Shutdown should never be synchronized. We don't want to hold up accept()
      // thread! (via LRUpool)
      if (!)
      {
         try
         {
            this.interrupt();
            Thread.interrupted(); // clear
         }
         catch (Exception ignored)
         {
         }
      }
   }

   
Sets if server thread should check connection before continue to process on next invocation request. If is set to true, will send an ACK to client to verify client is still connected on same socket.
   public void shouldCheckConnection(boolean checkConnection)
   {
      this. = checkConnection;
   }

   
Indicates if server will check with client (via an ACK) to see if is still there.
   public boolean getCheckingConnection()
   {
      return this.;
   }
   public void evict()
   {
       = false;
      // This is a race and there is a chance that a invocation is going on at the time of the
      // interrupt.  But I see no way right now to protect for this. There may not be a problem
      // because interrupt only effects threads blocking on IO.
      // NOTE ALSO!: Shutdown should never be synchronized. We don't want to hold up accept()
      // thread! (via LRUpool)
      if (!)
      {
         try
         {
            this.interrupt();
            Thread.interrupted(); // clear
         }
         catch (Exception ignored)
         {
         }
      }
   }

   
This method is intended to be used when need to unblock I/O read, which the thread will automatically loop back to do after processing a request.
   public void unblock()
   {
      try
      {
         .close();
      }
      catch (IOException e)
      {
         .warn("Error closing socket when attempting to unblock I/O"e);
      }
   }
   public String toString()
   {
      return getName();
   }
   // Package protected ----------------------------------------------------------------------------
   // Protected ------------------------------------------------------------------------------------

   
This is needed because Object*Streams leak
   protected void dorun()
   {
      if() { .trace("beginning dorun()"); }
       = true;
       = true;
      // lazy initialize the socketWrapper on the worker thread itself. We do this to avoid to have
      // it done on the acceptor thread (prone to lockup)
      try
      {
         if() { .trace("creating the socket wrapper"); }
          =
         boolean valueSet = false;
         Map configMap = .getConfiguration();
         Object o = configMap.get();
         if (o != null)
         {
            try
            {
                = Boolean.valueOf((String)o).booleanValue();
               valueSet = true;
               .debug(this + " setting reuseAfterTimeout to " + );
            }
            catch (Exception e)
            {
               .warn(this + " could not convert " +  + 
                        " value of " + o + " to a boolean value");
            }
         }
         
         if (!valueSet)
         {
            if (.getInputStream() instanceof JBossObjectInputStream)
            {
                = true;
            }
         }
         
         // Always do first one without an ACK because its not needed
         if() { .trace("processing first invocation without acknowledging"); }
      }
      catch (Exception ex)
      {
         .error("Worker thread initialization failure"ex);
          = false;
      }
      catch (Error e)
      {
         if (!)
         {
            .error("error"e);
         }
         else
         {
            .debug("error"e);
         }
      }
      // Re-use loop
      while ()
      {
         try
         {
            acknowledge();
            processInvocation();
         }
         catch (AcknowledgeFailure e)
         {
            if (! && )
            {
               .trace("keep alive acknowledge failed!");
            }
             = false;
         }
         catch(SocketTimeoutException ste)
         {
            if(!)
            {
               if()
               {
                  .trace(ste);
               }
            }
            
            if (!)
                = false;
         }
         catch (InterruptedIOException e)
         {
            if (!)
            {
               .error("Socket IO interrupted"e);
            }
             = false;
         }
         catch (InterruptedException e)
         {
            if()
            {
               .trace(e);
            }
            if (!)
            {
               .error("interrupted"e);
            }
         }
         catch (EOFException eof)
         {
            if (! && )
            {
               .trace("EOFException received. This is likely due to client finishing communication."eof);
            }
             = false;
         }
         catch (SocketException sex)
         {
            if (! && )
            {
               .trace("SocketException received. This is likely due to client disconnecting and resetting connection."sex);
            }
             = false;
         }
         catch (Exception ex)
         {
            if (!)
            {
               .error("failed"ex);
                = false;
            }
         }
         catch (Error e)
         {
            if (!)
            {
               .error("error"e);
            }
            else
            {
               .debug("error"e);
            }
         }
         
         // clear any interruption so that thread can be pooled.
          = false;
         Thread.interrupted();
      }
      // Ok, we've been shutdown.  Do appropriate cleanups.
      // The stream close code has been moved to SocketWrapper.close().
//      try
//      {
//         if (socketWrapper != null)
//         {
//            InputStream in = socketWrapper.getInputStream();
//            if (in != null)
//            {
//               in.close();
//            }
//            OutputStream out = socketWrapper.getOutputStream();
//            if (out != null)
//            {
//               out.close();
//            }
//         }
//      }
//      catch (Exception ex)
//      {
//         log.debug("failed to close in/out", ex);
//      }
      try
      {
         if ( != null)
         {
            .debug(this + " closing socketWrapper: " + );
            .close();
         }
      }
      catch (Exception ex)
      {
         .error("failed to close socket wrapper"ex);
      }
       = null;
   }
   protected void processInvocation(SocketWrapper socketWrapperthrows Exception
   {
      if() { .trace("preparing to process next invocation invocation"); }
       = true;
      // Ok, now read invocation and invoke
      //TODO: -TME This needs to be done by ServerInvoker
      int version = Version.getDefaultVersion();
      boolean performVersioning = Version.performVersioning();
      InputStream inputStream = socketWrapper.getInputStream();
      if (performVersioning)
      {
         version = readVersion(inputStream);
         //TODO: -TME Should I be checking for -1?
         // This is a best attempt to determine if is old version.  Typically, the first byte will
         // be -1, so if is, will reset stream and process as though is older version.
         // Originally this code (now uncommented) and the other commented code was to try to make
         // so could automatically detect older version that would not be sending a byte for the
         // version.  However, due to the way the serialization stream manager handles the stream,
         // resetting it does not work, so will probably have to throw away that idea. However, for
         // now, am uncommenting this section because if are using the flag to turn off connection
         // checking (ack back to client), then will get a -1 when the client closes connection.
         // Then when stream passed onto the versionedRead, will get EOFException thrown and will
         // process normally (as though came from the acknowledge, as would have happened if
         // connection checking was turned on).  Am hoping this is not a mistake...
         if(version == -1)
         {
//            version = Version.VERSION_1;
            throw new EOFException();
         }
      }
      Object obj = versionedRead(inputStreamgetClass().getClassLoader(), version);
      // setting timestamp since about to start processing
      InvocationRequest req = null;
      boolean createdInvocationRequest = false;
      boolean isError = false;
      if(obj instanceof InvocationRequest)
      {
         req = (InvocationRequest)obj;
      }
      else
      {
         req = createInvocationRequest(objsocketWrapper);
         createdInvocationRequest = true;
         performVersioning = false;
      }
      Object resp = null;
      try
      {
         // Make absolutely sure thread interrupted is cleared.
         Thread.interrupted();
         if() { .trace("about to call " +  + ".invoke()"); }
         // handle socket-specific invocations
         if ("$GET_CLIENT_LOCAL_ADDRESS$".equals(req.getParameter()))
         {
            Socket s = socketWrapper.getSocket();
            InetAddress a = s.getInetAddress();
            resp = new InvocationResponse(req.getSessionId(), afalsenull);
         }
         else
         {
             // call transport on the subclass, get the result to handback
             resp = .invoke(req);
         }
         if() { .trace( + ".invoke() returned " + resp); }
      }
      catch (Throwable ex)
      {
         resp = ex;
         isError = true;
         if (.trace( + ".invoke() call failed"ex);
      }
      Thread.interrupted(); // clear interrupted state so we don't fail on socket writes
      if(isOneway(req.getRequestPayload()))
      {
         if() { .trace("oneway request, writing no reply on the wire"); }
      }
      else
      {
         if(!createdInvocationRequest)
         {
            // need to return invocation response
            if() { .trace("creating response instance"); }
            resp = new InvocationResponse(req.getSessionId(), respisErrorreq.getReturnPayload());
         }
         OutputStream outputStream = socketWrapper.getOutputStream();
         if (performVersioning)
         {
            writeVersion(outputStreamversion);
         }
         versionedWrite(outputStreamthis.getClass().getClassLoader(), respversion);
      }
       = false;
      // set the timestamp for last successful processed request
   }
   protected void acknowledge(SocketWrapper socketWrapperthrows Exception
   {
      if ()
      {
         // HERE IS THE RACE between ACK received and handlingResponse = true. We can't synchronize
         // because readByte blocks and client is expecting a response and we don't want to hang
         // client. See shutdown and evict for more details. There may not be a problem because
         // interrupt only effects threads blocking on IO. and this thread will just continue.
          = true;
         try
         {
            if() { .trace("checking connection"); }
            socketWrapper.checkConnection();
         }
         catch (EOFException e)
         {
            throw new AcknowledgeFailure();
         }
         catch (SocketException se)
         {
            throw new AcknowledgeFailure();
         }
         catch (IOException ioe)
         {
            throw new AcknowledgeFailure();
         }
          = false;
      }
   }
   protected Object versionedRead(InputStream inputStreamServerInvoker invoker,
                                  ClassLoader classLoaderint version)
   {
      //TODO: -TME - Should I even botther to check for version here?  Only one way to do processing
      //             at this point, regardless of version.
      switch (version)
      {
         case .:
         case .:
         case .:
         {
            if() { .trace("blocking to read invocation from unmarshaller"); }
            Object o = null;
            if ( instanceof VersionedUnMarshaller)
               o = ((VersionedUnMarshaller)).read(inputStreamnullversion);
            else
               o = .read(inputStreamnull);
            if() { .trace("read " + o + " from unmarshaller"); }
            return o;
         }
         default:
         {
            throw new IOException("Can not read data for version " + version +
               ".  Supported versions: " + . + "," + . + "," + .);
         }
      }
   }
   // Private --------------------------------------------------------------------------------------
   private SocketWrapper createServerSocketWrapper(Socket socketint timeoutMap metadata)
      throws Exception
   {
      if ( == null)
      {
         if( == null)
         {
             = ClassLoaderUtility.loadClass(getClass());
         }
         try
         {
             = .
               getConstructor(new Class[]{Socket.classMap.classInteger.class});
         }
         catch (NoSuchMethodException e)
         {
             = .getConstructor(new Class[]{Socket.class});
         }
      }
      SocketWrapper serverSocketWrapper = null;
      if (.getParameterTypes().length == 3)
      {
         Map localMetadata = null;
         if (metadata == null)
         {
            localMetadata = new HashMap(2);
         }
         else
         {
            localMetadata = new HashMap(metadata);
         }
         localMetadata.put(.);
         localMetadata.put(.);
         if ( > 0)
         {
            localMetadata.put(.new Integer());
         }
         
         serverSocketWrapper = (SocketWrapper).
            newInstance(new Object[]{socketlocalMetadatanew Integer(timeout)});
      }
      else
      {
         serverSocketWrapper =
            (SocketWrapper).newInstance(new Object[]{socket});
         serverSocketWrapper.setTimeout(timeout);
      }
      return serverSocketWrapper;
   }
   private boolean isOneway(Map metadata)
   {
      boolean isOneway = false;
      if (metadata != null)
      {
         Object val = metadata.get(.);
         if (val != null && val instanceof String && Boolean.valueOf((Stringval).booleanValue())
         {
            isOneway = true;
         }
      }
      return isOneway;
   }
   private InvocationRequest createInvocationRequest(Object objSocketWrapper socketWrapper)
   {
      if(obj instanceof InvocationRequest)
      {
         return (InvocationRequest)obj;
      }
      else
      {
         // need to wrap request with invocation request
         SocketAddress remoteAddress = socketWrapper.getSocket().getRemoteSocketAddress();
         return new InvocationRequest(remoteAddress.toString(),
                                      .getSupportedSubsystems()[0],
                                      objnullnullnull);
      }
   }
   private void processNewSocket()
   {
      InvokerLocator locator = .getLocator();
      ClassLoader classLoader = getClass().getClassLoader();
      String dataType = .getDataType();
      String serializationType = .getSerializationType();
      //TODO: -TME Need better way to get the unmarshaller (via config)
      Map configMap = null;
      if ( != null)
      {
         configMap = .getConfiguration();
      }
      boolean passConfigMapToMarshalFactory = false;
      if (configMap != null)
      {
         Object o = configMap.get(.);
         if (o instanceof String)
         {
            passConfigMapToMarshalFactory = Boolean.valueOf((Stringo).booleanValue();
         }
         else if (o != null)
         {
            .warn("Value of " + . + " should be of type String: " + o);
         }
      }
      Map map = passConfigMapToMarshalFactory ? configMap : null;
      if ( == null)
      {
          = MarshalFactory.getUnMarshaller(locatorclassLoadermap);
      }
      if ( == null)
      {
          = MarshalFactory.getUnMarshaller(dataTypeserializationType);
      }
      if ( == null)
      {
          = MarshalFactory.getMarshaller(locatorclassLoadermap);
      }
      if ( == null)
      {
          = MarshalFactory.getMarshaller(dataTypeserializationType);
      }
   }
   private void versionedWrite(OutputStream outputStreamSocketServerInvoker invoker,
                               ClassLoader classLoaderObject respint versionthrows IOException
   {
      //TODO: -TME - Should I ever worry about checking version here?  Only one way to send data at this point.
      switch (version)
      {
         case .:
         case .:
         case .:
         {
            if ( instanceof VersionedMarshaller)
               ((VersionedMarshaller).write(respoutputStreamversion);
            else
               .write(respoutputStream);
            if () { .trace("wrote response to the output stream"); }
            return;
         }
         default:
         {
            throw new IOException("Can not write data for version " + version +
               ".  Supported version: " + . + ", " + . + ", " + .);
         }
      }
   }
   private int readVersion(InputStream inputStreamthrows IOException
   {
      if() { .trace("blocking to read version from input stream"); }
      int version = inputStream.read();
      if() { .trace("read version " + version + " from input stream"); }
      return version;
   }
   private void writeVersion(OutputStream outputStreamint versionthrows IOException
   {
      outputStream.write(version);
   }
   private String getWorkerThreadName(Socket currentSocket)
   {
      if ( == .)
      {
          = nextID();
      }
      StringBuffer sb = new StringBuffer("WorkerThread#");
      sb.append().append('[');
      sb.append(currentSocket.getInetAddress().getHostAddress());
      sb.append(':');
      sb.append(currentSocket.getPort());
      sb.append(']');
      return sb.toString();
   }
   // Inner classes --------------------------------------------------------------------------------
   public static class AcknowledgeFailure extends Exception
   {
   }
New to GrepCode? Check out our FAQ X