Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * JBoss, Home of Professional Open Source
   * Copyright 2005, JBoss Inc., and individual contributors as indicated
   * by the @authors tag. See the copyright.txt in the distribution for a
   * full listing of individual contributors.
   *
   * This is free software; you can redistribute it and/or modify it
   * under the terms of the GNU Lesser General Public License as
   * published by the Free Software Foundation; either version 2.1 of
  * the License, or (at your option) any later version.
  *
  * This software is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  * Lesser General Public License for more details.
  *
  * You should have received a copy of the GNU Lesser General Public
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  */
 
 package org.jboss.remoting.transport.multiplex;
 
 import  org.jboss.logging.Logger;
 
 import java.util.Map;
 import java.util.Set;
 
InputMultiplexor is one of the key Multiplex classes, responsible for demultiplexing multiple byte streams sharing a single TCP connection. It has two inner classes which can perform this function. MultiGroupInputThread can perform demultiplexing for any number of NIO sockets, taking advantage of the Selector facility. For non-NIO sockets, notably SSL sockets, SingleGroupInputThread handles demultiplexing for a single socket.

The data stream, created at the other end of the TCP connection by the OutputMultiplexor class, consists of a sequence of packets, each consisting of a header, giving version, destination virtual socket, and number of bytes. followed by the specified number of data bytes. (See OutputMultiplexor for the header format. Each of the demultiplexing thread classes reads a header and transfers the following bytes to the input stream of the target virtual socket.

Copyright (c) 2005

Deprecated:
As of release 2.4.0 the multiplex transport will no longer be actively supported.
Author(s):
Ron Sigal
 
 public class InputMultiplexor
 {
    protected static final Logger log = Logger.getLogger(InputMultiplexor.class);
    private static final int HEADER_LENGTH = 7;
 
    private int bufferSize;
    private int maxErrors;
 
 
    public InputMultiplexor(Map configuration)
    {
       
          = Multiplex.getOneParameter(configuration,
                                     "bufferSize",
                                     .,
                                     .);
 
       
          = Multiplex.getOneParameter(configuration,
                                      "maxErrors",
                                      .,
                                      .);
    }


   
Returns a MultiGroupInputThread designed to handle multiple virtual socket groups.

Parameters:
configuration
Returns:
a MultiGroupInputThread designed to handle multiple virtual socket groups
   {
      return new MultiGroupInputThread();
   }


   
Returns a SingleGroupInputThread designed to handle a single virtual socket group.

Returns:
a SingleGroupInputThread designed to handle a single virtual socket group
   {
      return new SingleGroupInputThread(managersocketos);
   }
   public class MultiGroupInputThread extends StoppableThread
   {
      private static final String errMsg1 = "An existing connection was forcibly closed by the remote host";
      private static final String errMsg2 = "An established connection was aborted by the software in your host machine";
      private Map managerProcessorMap;
      private Set socketGroupsToBeRegistered = new HashSet();
      private Set tempSocketGroupSet = new HashSet();
      private boolean socketGroupsAreWaiting;
      private Selector selector;
      private ByteBuffer buffer;
      private byte[] data;
      private boolean trace;
      private boolean debug;
      private boolean info;
      public MultiGroupInputThread() throws IOException
      {
          = Collections.synchronizedMap(new HashMap());
          = Selector.open();
          = ByteBuffer.allocate();
          = new byte[];
          = .isTraceEnabled();
          = .isDebugEnabled();
          = .isInfoEnabled();
      }


      
Registers manager and socket with NIO Selector

Parameters:
manager MultiplexingManager
Returns:
Throws:
IOException
      public void registerSocketGroup(MultiplexingManager managerthrows IOException
      {
         if (.debug(" accepting socket group for registration: " + manager);
         synchronized ()
         {
            .add(manager);
             = true;
         }
      }
      protected void doRegistration()
      {
         .clear();
         synchronized()
         {
            .clear();
             = false;
         }
         Iterator it = .iterator();
         while (it.hasNext())
         {
            MultiplexingManager manager = (MultiplexingManagerit.next();
            GroupProcessor groupProcessor = new GroupProcessor(manager);
            SelectableChannel channel = manager.getSocket().getChannel();
            try
            {
               SelectionKey key = channel.register(.groupProcessor);
               groupProcessor.setKey(key);
               .put(managergroupProcessor);
            }
            catch (IOException e)
            {
               // channel might be closed.
               .warn(e);
            }
         }
      }


      
Removes references to virtual socket group.

Parameters:
manager
      public void unregisterSocketGroup(MultiplexingManager manager)
      {
         // Leave GroupProcessor in Map until SelectionKey is cancelled.
         GroupProcessor groupProcessor = (GroupProcessor.get(manager);
         if(groupProcessor == null)
         {
            .debug("attempting to unregister unknown MultiplexingManager: " + manager);
            return;
         }
         SelectionKey key = groupProcessor.getKey();
         key.cancel();
         .remove(manager);
         if (.debug("unregistered socket group:" + manager);
      }
      public void shutdown()
      {
         // in case thread is still reading
         super.shutdown();
         try
         {
            .close();
         }
         catch (IOException e)
         {
            .error("unable to close selector"e);
         }
         interrupt();
      }
      protected void doInit()
      {
         .debug("MultiGroupInputThread thread starting");
      }
      protected void doRun()
      {
         .debug("entering doRun()");
         Set keys = null;
         try
         {
            while (true)
            {
               if (!)
                  return;
               if ()
                  doRegistration();
               .select(200);
               keys = .selectedKeys();
               if (!keys.isEmpty())
                  break;
            }
         }
         catch (IOException e)
         {
            .info(e);
         }
         catch (ClosedSelectorException e)
         {
            .info("Selector is closed: shutting down input thread");
            super.shutdown();
            return;
         }
         if ()
         {
            .trace("keys: " + .keys().size());
            .trace("selected keys: " + keys.size());
         }
         Iterator it = keys.iterator();
         while (it.hasNext())
         {
            SelectionKey key = (SelectionKeyit.next();
            it.remove();
            GroupProcessor groupProcessor = (GroupProcessorkey.attachment();
            if (groupProcessor == null)
            {
               if (key.isValid())
                  .error("valid SelectionKey has no attachment: " + key);
               continue;
            }
            groupProcessor.processChannel(key);
         }
      }
      protected void doShutDown()
      {
         .debug("MultiGroupInputThread shutting down");
      }
      class GroupProcessor
      {
         // Message header
         private byte[] b = new byte[];
         private int    headerCount;
         private byte   version;
         private int    destination;
         private short  size;
         private MultiplexingManager manager;
         private OutputStream   outputStream;
         private SelectionKey key;
         private int errorCount;
         public GroupProcessor(MultiplexingManager manager)
         {
            this. = manager;
         }
         public void processChannel(SelectionKey key)
         {
            .debug("processChannel()");
            SocketChannel channel = (SocketChannelkey.channel();
            .clear();
            try
            {
               if (channel.read() < 0)
                     throw new EOFException();
               .flip();
               if ()
                  .debug("read: " + .remaining());
               while (.hasRemaining())
               {
                  if ( <  ||  == 0)
                  {
                     // then prepare to process next virtual stream.
                     completeHeader();
                     if ( < )
                        return;
                     SocketId socketId = new SocketId();
                      = .getOutputStreamByLocalSocket(socketId);
                     if ( == null)
                     {
                        // We'll get an OutputStream to stash these bytes, just in case they
                        // are coming from a valid source and the local VirtualSocket is still
                        // getting set up.
                        .info("unknown socket id: " + );
                         = .getConnectedOutputStream(socketId);
                     }
                     if (!.hasRemaining())
                        return;
                  }
                  int n = Math.min(.remaining());
                  .get(, 0, n);
                  .write(, 0, n);
                  if ()
                  {
                     .trace("received " + n + " bytes for socket: " + );
                     for (int i = 0; i < ni++)
                        .trace("" + (0xff & [i]));
                  }
                   -= n;
                  if ( == 0)
                      = 0;
               }
            }
            catch (IOException e)
            {
               handleChannelException(ekeychannel);
            }
            catch (Throwable t)
            {
               .error("doRun()");
               .error(t);
            }
         }
         public SelectionKey getKey()
         {
            return ;
         }
         public void setKey(SelectionKey key)
         {
            this. = key;
         }
         private void completeHeader(ByteBuffer bbthrows IOException
         {
            int n = Math.min(bb.remaining(),  - );
            bb.get(n);
             += n;
            if ( == )
            {
                = [0];
                =                 ([1] << 24) | (0x00ff0000 & ([2] << 16)) |
                               (0x0000ff00 & ([3] << 8)) | (0x000000ff & [4]);
                = (short) ((0x0000ff00 & ([5] << 8)) | (0x000000ff & [6]));
               if (  < 0 ||  < )
                  throw new CorruptedStreamException("invalid chunk size read on: " +  + ": ");
               if ( != 0)
                  throw new CorruptedStreamException("invalid version read on: " +  + ": " + );
            }
         }
         private void handleChannelException(IOException eSelectionKey keySocketChannel channel)
         {
            try
            {
               if (!channel.isOpen())
               {
                  key.cancel();
                  return;
               }
               if (e instanceof EOFException)
               {
                  key.cancel();
                  .setEOF();
                  .debug(e);
                  return;
               }
               
               if (e instanceof SSLException)
               {
                  key.cancel();
                  .error(e);
                  return;
               }
               if (++ > )
                 {
                    .setReadException(e);
                    channel.close();
                    key.cancel();
                    .error(e);
                    .error("error count exceeds max errors: " + );
                    return;
                 }
               Socket socket = channel.socket();
               String message = e.getMessage();
               if (socket.isClosed() || socket.isInputShutdown() ||
                   .equals(message) || .equals(message) ||
                   e instanceof CorruptedStreamException)
               {
                  .setReadException(e);
                  channel.close();
                  key.cancel();
                  .info(e);
                  return;
               }
               // Haven't reached maxErrors yet
               .warn(e);
            }
            catch (IOException e2)
            {
               .error("problem closing channel: "  + e2);
            }
         }
         public int          getDestination()   {return ;}
         public short        getSize()          {return ;}
         public byte         getVersion()       {return ;}
         public OutputStream getOutputStream()  {return ;}
      }
   }
   {
      private InputStream is;
      private OutputStream currentOutputStream;
      private byte[] dataBytes = new byte[];
      private MultiplexingManager manager;
      private int dataInCount = 0;
      private int errorCount;
      private boolean eof;
      // Message header
      private byte[] headerBytes = new byte[];
      private int    headerCount;
      private byte   version;
      private int    destination;
      private short  size;
      private boolean trace;
      private boolean debug;
      private boolean info;
      public SingleGroupInputThread(MultiplexingManager managerSocket socketOutputStream os)
      throws IOException
      {
         this. = new BufferedInputStream(socket.getInputStream());
         this. = manager;
          = os;
          = .isTraceEnabled();
          = .isDebugEnabled();
          = .isInfoEnabled();
      }
      public void shutdown()
      {
         // in case thread is still reading
         super.shutdown();
         .info("interrupting input thread");
         interrupt();
      }


      
      protected void doInit()
      {
         .debug("SingleGroupInputThread thread starting");
      }


      
      protected void doRun()
      {
         try
         {
            // end of file
            if (!completeHeader())
            {
                = true;
               return;
            }
            SocketId socketId = new SocketId();
             = .getOutputStreamByLocalSocket(socketId);
            if ( == null)
            {
               // We'll get an OutputStream to stash these bytes, just in case they
               // are coming from a valid source and the local VirtualSocket is still
               // getting set up.
               .info("unknown socket id: " + );
                = .getConnectedOutputStream(socketId);
            }
            int bytesRead = 0;
            while (bytesRead < )
            {
               int n = .read(, 0,  - bytesRead);
               if (n < 0)
               {
                   = true;
                  return;
               }
               .write(, 0, n);
               bytesRead += n;
               if ()
               {
                  for (int i = 0; i < ni++)
                     .trace("" + [i]);
               }
            }
         }
         catch (SSLException e)
         {
            .debug(e.getMessage());
         }
         catch (EOFException e)
         {
             = true;
            .info("end of file");
         }
         catch (IOException e)
         {
            if (++ > )
            {
               .setReadException(e);
               super.shutdown();
               .error(e);
            }
            else
               .warn(e);
         }
         finally
         {
            if ()
            {
               super.shutdown();
               .setEOF();
            }
         }
      }
      private boolean completeHeader() throws IOException
      {
         while ( < )
         {
            int n = .read( - );
            // end of file
            if (n < 0)
               return false;
             += n;
         }
         // Reset for next header.
          = 0;
          = [0];
          =               ([1] << 24) | (0x00ff0000 & ([2] << 16)) |
                       (0x0000ff00 & ([3] << 8)) | (0x000000ff & [4]);
          = (short) ((0x0000ff00 & ([5] << 8)) | (0x000000ff & [6]));
         if ()
         {
            .trace("version:     " + );
            .trace("destination: " + );
            .trace("size:        " + );
         }
         if (  < 0 ||  < )
            throw new CorruptedStreamException("invalid chunk size read on: " +  + ": ");
         if ( != 0)
            throw new CorruptedStreamException("invalid version read on: " +  + ": " + );
         return true;
      }
      protected void doShutDown()
      {
         .debug("input thread: data bytes read: " + );
         .debug("input thread shutting down");
      }
   }
   private static class CorruptedStreamException extends IOException
   {
      CorruptedStreamException(String message) {super(message);}
   }
New to GrepCode? Check out our FAQ X