Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * JBoss, Home of Professional Open Source
   * Copyright 2005, JBoss Inc., and individual contributors as indicated
   * by the @authors tag. See the copyright.txt in the distribution for a
   * full listing of individual contributors.
   *
   * This is free software; you can redistribute it and/or modify it
   * under the terms of the GNU Lesser General Public License as
   * published by the Free Software Foundation; either version 2.1 of
  * the License, or (at your option) any later version.
  *
  * This software is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  * Lesser General Public License for more details.
  *
  * You should have received a copy of the GNU Lesser General Public
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  */
 
 /*
  * Created on Jul 22, 2005
  */
 
 package org.jboss.remoting.transport.multiplex;
 
 
 import  org.jboss.logging.Logger;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
OutputMultiplexor is one of the key Multiplex classes, responsible for multiplexing multiple byte streams that share a single TCP connection. It has an inner class that performs this function.

The data stream created here consists of a sequence of packets, each consisting of a header, with the format:

byte: version (current version is 0)
int: destination virtual socket id
short: number of data bytes to follow

followed by the number of data bytes specified in the header.

OutputMultiplexor has two fairness constraints that prevent one virtual stream from starving the others.

  1. maxTimeSlice determines the maximum time devoted to writing bytes for a given virtual connection before going on to process another virtual connection, and
  2. maxDataSlice determines the maximum number of bytes written for a given virtual connection before going on to process another virtual connection.

For additional information about configuring OutputMultiplexor, please see the documentation at labs.jbos.org.

Copyright (c) 2005

Deprecated:
As of release 2.4.0 the multiplex transport will no longer be actively supported.
Author(s):
Ron Sigal
 
 public class OutputMultiplexor
 {
    protected static final Logger log = Logger.getLogger(OutputMultiplexor.class);
 
    protected static final int BRACKETS_ALL = -1;
   protected static final int BRACKETS_NONE = -2;
   protected static final int HEADER_SIZE = 7;
   private int messagePoolSize;
   private int messageSize;
   private int maxChunkSize;
   private int maxTimeSlice;
   private int maxDataSlice;
   private int maxErrors;
   private Map configuration = new HashMap();
   private Map writeQueues = Collections.synchronizedMap(new HashMap());
   private Map readyQueues = Collections.synchronizedMap(new HashMap());
   private Map previousDestinationIds = Collections.synchronizedMap(new HashMap());
   private Set unregisteredClients = Collections.synchronizedSet(new HashSet());
   private List messagePool;
   private ByteBuffer buffer;
   private byte[] header = new byte[];
   private int errorCount;
   private boolean trace;
   private boolean debug;
   private boolean info;


Parameters:
configuration
Throws:
IOException
   protected OutputMultiplexor(Map configurationthrows IOException
   {
      this..putAll(configuration);
      
         = Multiplex.getOneParameter(configuration,
                                     "messagePoolSize",
                                     .,
                                     .);
      
         = Multiplex.getOneParameter(configuration,
                                     "messageSize",
                                     .,
                                     .);
      
         = Multiplex.getOneParameter(configuration,
                                     "maxChunkSize",
                                     .,
                                     .);
      
         = Multiplex.getOneParameter(configuration,
                                     "maxTimeSlice",
                                     .,
                                     .);
      
         = Multiplex.getOneParameter(configuration,
                                     "maxDataSlice",
                                     .,
                                     .);
      
         = Multiplex.getOneParameter(configuration,
                                  "maxErrors",
                                  .,
                                  .);
      .debug("messagePoolSize: " + );
      .debug("messageSize:     " + );
      .debug("maxChunkSize:    " + );
      .debug("maxTimeSlice:    " + );
      .debug("maxDataSlice:    " + );
      .debug("maxErrors:       " + );
       = Collections.synchronizedList(new ArrayList());
      for (int i = 0; i < i++)
         .add(new Message());
       = ByteBuffer.allocate( + );
       = .isTraceEnabled();
       = .isDebugEnabled();
       = .isInfoEnabled();
   }


   
A class implementing this interface can register to be notified when all of its bytes have been processed.
   public interface OutputMultiplexorClient
   {
      void outputFlushed();
   }


Returns:
   {
      return new OutputThread();
   }


Parameters:
manager
socketId
content
Throws:
IOException
   public void write(MultiplexingManager managerSocketId socketIdbyte[] content)
   throws IOException
   {
      write(managersocketIdcontent);
   }


Parameters:
manager
socketId
content
Throws:
InterruptedException
   public void write(MultiplexingManager managerSocketId socketIdbyte[] contentint brackets)
   throws IOException
   {
      .debug("entering write()");
      if ()
      {
         String messageEnd = "";
         if (content.length > 0)
            messageEnd = ": [" + (0xff & content[0]) + "]";
         .trace("OutputMultiplexor.write(): queueing "
               + content.length + " bytes for \n  manager: " + manager
               + "\n  socket: " + socketId.getPort() + messageEnd);
      }
      if (content.length == 0)
         return;
      synchronized ()
      {
         List writeQueue = (List.get(manager);
         if (writeQueue == null)
         {
            .error("unregistered client: " + manager);
            return;
         }
         synchronized (writeQueue)
         {
            if (!writeQueue.isEmpty())
            {
               Message message = (MessagewriteQueue.get(writeQueue.size() - 1);
               if (message.getDestination().equals(socketId) && message.hasCompatibleBrackets(brackets))
               {
                  message.addContent(content);
               }
               else
                  writeQueue.add(getaMessage(socketIdcontentbrackets));
            }
            else
               writeQueue.add(getaMessage(socketIdcontentbrackets));
         }
         .put(managerwriteQueue);
         .notifyAll();
      }
   }


Allows a OutputMultiplexorClient to register to be notified when all of its bytes have been processed.

Parameters:
client
   public void register(OutputMultiplexorClient client)
   {
      if (.debug("registering: " + client);
      synchronized ()
      {
         List writeQueue = Collections.synchronizedList(new LinkedList());
         .put(clientwriteQueue);
      }
   }


Unregisters an OutputMultiplexorClient.

Parameters:
client
   public void unregister(OutputMultiplexorClient client)
   {
      if (.debug("unregistering: " + client);
      synchronized ()
      {
         List writeQueue = (List.get(client);
         if (writeQueue == null)
         {
            .debug("attempt to unregister unknown Listener: " + client);
            client.outputFlushed();
            return;
         }
         if (writeQueue.isEmpty())
         {
            .remove(client);
            .remove(client);
            client.outputFlushed();
         }
         else
         {
            .add(client);
         }
      }
   }
   protected Message getaMessage(SocketId socketIdbyte[] contentint bracketsthrows IOException
   {
      Message m = null;
      if (.isEmpty())
         m = new Message();
      else
         m = (Message.remove(0);
      m.set(socketIdcontentbrackets);
      return m;
   }
   protected void releaseMessage(Message m)
   {
      if (.size() < )
      {
         .add(m);
      }
   }

   class OutputThread extends StoppableThread
   {
      private final Logger log = Logger.getLogger(OutputMultiplexor.OutputThread.class);
      private boolean socketIsOpen = true;
      private Map localWriteQueues = new HashMap();
      private Message pendingMessage;
      public OutputThread()
      {
      }


      
      public void shutdown()
      {
         super.shutdown();
         interrupt();
      }
      protected void doInit()
      {
         .debug("output thread starting");
      }
      protected void doRun()
      {
         while (isRunning())
         {
            .debug("STARTING new output round");
            .clear();
            // Wait until there is a pending message in some socket group, then get a
            // local copy of the writeQueue Map.
            synchronized ()
            {
               while (.isEmpty())
               {
                  try
                  {
                     .debug("waiting");
                     .wait();
                  }
                  catch (InterruptedException e)
                  {
                     if (!isRunning())
                        return;
                  }
               }
               .putAll();
               .clear();
            }
            // Process each socket group that has a pending message.
            Iterator it = .keySet().iterator();
            while (it.hasNext())
            {
               try
               {
                  MultiplexingManager manager = (MultiplexingManagerit.next();
                  List writeQueue = (List.get(manager);
                  OutputStream os = manager.getOutputStream();
                  SocketId destination = null;
                  int dataOutCount = 0;
                  long startTime = System.currentTimeMillis();
                  // Process pending messages in one socket group.
                  while (!writeQueueisEmpty())
                  {
                     long timeSpent = System.currentTimeMillis() - startTime;
                     if (timeSpent >  || dataOutCount > )
                     {
                        if ()
                        {
                           .debug("returning queue: data out: " + dataOutCount + ", time spent: " + timeSpent);
                        }
                        synchronized ()
                        {
                           .put(managerwriteQueue);
                        }
                        break;
                     }
                      = (MessagewriteQueue.remove(0);
                     destination =  .getDestination();
                     // The following code, which combines contiguous messages to the same
                     // destination, slightly degraded performance in tests.
//                     while (!writeQueue.isEmpty())
//                     {
//                        if (!destination.equals(((Message) writeQueue.get(0)).getDestination()))
//                              break;
//
//                        Message nextMessage = (Message) writeQueue.remove(0);
//                        int start = nextMessage.getStart();
//                        int length = nextMessage.getLength();
//                        pendingMessage.addContent(nextMessage.getContent(), start, length);
//                     }
                     int start = .getStart();
                     int length = Math.min(.getLength(), );
                     try
                     {
                        encode(destination.getContent(), start,
                               lengthosmanager.getSocket().getChannel());
                     }
                     catch (ClosedChannelException e)
                     {
                        .info(e);
                        writeQueue.clear();
                        manager.setWriteException(e);
                        break;
                     }
                     catch (IOException e)
                     {
                        String message = e.getMessage();
                        if ("An existing connection was forcibly closed by the remote host".equals(message) ||
                            "An established connection was aborted by the software in your host machine".equals(message) ||
                            "Broken pipe".equals(message) ||
                            "Connection reset".equals(message) ||
                            "Connection closed by remote host".equals(message) ||
                            "Socket is closed".equals(message)
                            )
                        {
                           .debug(e);
                           writeQueue.clear();
                           manager.setWriteException(e);
                           break;
                        }
                        else if (++ > )
                        {
                           .error(e);
                           manager.setWriteException(e);
                           throw e;
                        }
                        else
                        {
//                         Haven't reached maxErrors yet
                           throw e;
                        }
                     }
                     // If it's a long message with bytes left over, return to message queue.
                     if (length < .getLength())
                        returnLongMessageToQueue(writeQueue);
                     else
                        releaseMessage();
                     dataOutCount += length;
                      = null;
                     if ()
                        .trace("output thread wrote: " + length + " bytes to socket " + destination.getPort());
                  }
                  if (writeQueue.isEmpty() && .contains(manager))
                  {
                     .remove(writeQueue);
                     .remove(manager);
                     .remove(manager);
                     manager.outputFlushed();
                     continue;
                  }
                  .put(managerdestination);
                  if (interrupted()) // outside of writeQueue.take()
                     throw new InterruptedException();
               }
               catch (InterruptedException e)
               {
                  handleError("output thread: interrupted"e);
               }
               catch (SocketException e)
               {
                  handleError("output thread: socket exception"e);
               }
               catch (IOException e)
               {
                  handleError("output thread: i/o error"e);
               }
               finally
               {
                  // Indicate that messages for this socket group have been written.
                  it.remove();
               }
            }
         }
         .debug("output thread: socketIsConnected: " + );
         .debug("output thread: running: " + );
         .debug("output thread: pendingMessage ==  " + );
      }


      
      protected void doShutDown()
      {
         .debug("output thread shutting down");
      }


      

Parameters:
bytes
start
length
os
channel
Throws:
IOException
      protected void encode(SocketId destinationbyte[] bytes,  int start,
                            int lengthOutputStream osSocketChannel channel)
      throws IOException
      {
         // Create header.
         int port = destination.getPort();
         // Set version.
         [0] = (byte) 0;
         // Set destination.
         [1] = (byte) ((port >>> 24) & 0xff);
         [2] = (byte) ((port >>> 16) & 0xff);
         [3] = (byte) ((port >>>  8) & 0xff);
         [4] = (byte) ( port         & 0xff);
         // Set size.
         [5] = (byte) ((length >> 8) & 0xff);
         [6] = (byte) ( length       & 0xff);
         if (channel == null)
         {
            os.write();
            os.write(bytesstartlength);
            os.flush();
         }
         else
         {
            .clear();
            .put();
            .put(bytesstartlength);
            .flip();
            while (.hasRemaining())
               channel.write();
         }
         if ()
         {
            .trace("encode(): wrote " + length + " bytes to: " + destination);
            .trace("header: " + [0] + " " +
                                   [1] + " " + [2] + " " + [3] + " " + [4] + " " +
                                   [5] + " " + [6]);
            for (int i = 0; i < lengthi++)
               .trace("" + (0xff & bytes[i]));
         }
      }
      protected void returnLongMessageToQueue(List writeQueueMessage pendingMessage)
      {
         SocketId destination = pendingMessage.getDestination();
         pendingMessage.markUsed();
         synchronized (writeQueue)
         {
            if (!writeQueue.isEmpty())
            {
               ListIterator lit = writeQueue.listIterator();
               boolean processed = false;
               int remotePort = destination.getPort();
               int brackets = pendingMessage.getBrackets();
               while (lit.hasNext())
               {
                  Message message = (Messagelit.next();
                  if (message.brackets(remotePort))
                  {
                     lit.previous();
                     lit.add(pendingMessage);
                     processed = true;
                     break;
                  }
                  if (message.getDestination().equals(destination)
                        && ( == message.getBrackets() || brackets == message.getBrackets()))
                  {
                     pendingMessage.addContent(message.getContent(), message.getStart(), message.getLength());
                     lit.set(pendingMessage);
                     processed = true;
                     break;
                  }
               }
               if (!processed)
               {
                  writeQueue.add(pendingMessage);
               }
            }
            else
            {
               writeQueue.add(pendingMessage);
            }
         }
      }


      

Parameters:
message
e
      protected void handleError(String messageThrowable e)
      {
         if ( != null)
         {
            if (e instanceof InterruptedException)
            {
               if ()
                  .trace(messagee);
            }
            else
               .error(messagee);
         }
      }
   }



   
A Message holds the destination and content of a byte array destined for the endpoint of a virtual connection.

It also has a variable brackets which can be used to indicate that this Message should be sent after other Messages to a given destination. There are three cases:

valuemeaning
BRACKETS_ALL all other Messages should preceed this one
BRACKETS_NONE there are no constraints on this Message
any other integer x all other Messages to destination x should preceed this Message
   private static class Message
   {
      private SocketId socketId;
      private ByteArrayOutputStream baos;
      private int start;
      private int length;
      private int brackets;
      public Message(int size)
      {
          = new ByteArrayOutputStream(size);
      }
      public void set(SocketId socketIdbyte[] contentint bracketsthrows IOException
      {
         this. = socketId;
         .reset();
         .write(content);
          = 0;
          = content.length;
         this. = brackets;
      }
      public SocketId getDestination()
      {
         return ;
      }
      public byte[] getContent()
      {
         return .toByteArray();
      }
      public void addContent(byte[] bytesthrows IOException
      {
         .write(bytes);
          += bytes.length;
      }
      public void addContent(byte[] bytesint startint length)
      {
         .write(bytesstartlength);
         this. += length;
      }
      public int getStart()
      {
         return ;
      }
      public int getLength()
      {
         return ;
      }
      public int getBrackets()
      {
         return ;
      }
      public void markUsed(int used)
      {
          -= used;
         if ( <= 0)
         {
             = 0;
             = 0;
            .reset();
         }
         else
         {
             += used;
         }
      }
      public boolean brackets(int b)
      {
         if ( == )
            return true;
         if ( == )
            return false;
         return ( == b);
      }
      public boolean hasCompatibleBrackets(int b)
      {
         if ( ==  || b == )
            return true;
         return ( == b);
      }
   }
   public int getMaxChunkSize()
   {
      return ;
   }
   public void setMaxChunkSize(int maxChunkSize)
   {
      this. = maxChunkSize;
   }
   public int getMessagePoolSize()
   {
      return ;
   }
   public void setMessagePoolSize(int messagePoolSize)
   {
      this. = messagePoolSize;
   }
   public int getMessageSize()
   {
      return ;
   }
   public void setMessageSize(int messageSize)
   {
      this. = messageSize;
   }
New to GrepCode? Check out our FAQ X