Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * JBoss, Home of Professional Open Source
   * Copyright 2005, JBoss Inc., and individual contributors as indicated
   * by the @authors tag. See the copyright.txt in the distribution for a
   * full listing of individual contributors.
   *
   * This is free software; you can redistribute it and/or modify it
   * under the terms of the GNU Lesser General Public License as
   * published by the Free Software Foundation; either version 2.1 of
  * the License, or (at your option) any later version.
  *
  * This software is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  * Lesser General Public License for more details.
  *
  * You should have received a copy of the GNU Lesser General Public
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  */
 
 /*
  * Created on Jul 23, 2005
  */
 
 package org.jboss.remoting.transport.multiplex;
 
 
 import  org.jboss.logging.Logger;
 
 import java.util.Map;

Protocol is responsible for handling internal Multiplex messages. Some of these, for example, the messages involved in creating a new connection (acceptConnect(), connect(), answerConnect()) are synchronous. Others, such as a request to shut down, are received asynchronously by Protocol.BackChannelThread.

Copyright (c) 2005

Deprecated:
As of release 2.4.0 the multiplex transport will no longer be actively supported.
Author(s):
Ron Sigal
 
 public class Protocol
 {
    protected static final Logger log = Logger.getLogger(Protocol.class);
 
    // message types
    public static final int MP_CONNECT 	 	  		   = 0;
    public static final int MP_CONNECTED 	  		   = 1;
    public static final int MP_VERIFY_CONNECTION        = 2;
 // public static final int MP_INPUT_SHUTDOWN 		   = 3;
    public static final int MP_OUTPUT_SHUTDOWN 		   = 4;
    public static final int MP_DISCONNECT	  		   = 5;
    public static final int MP_REGISTER_REMOTE_SERVER   = 6;
    public static final int MP_UNREGISTER_REMOTE_SERVER = 7;
    public static final int MP_REQUEST_MANAGER_SHUTDOWN = 8;
    public static final int MP_ERROR 	 	  		   = 9;
    public static final int MP_TRUE					   = 10;
    public static final int MP_FALSE					   = 11;

   
InputStream used to receive synchronous messages
 
OutputStream used by connect() to communicate with a ServerSocket
 
OutputStream for sending messages to remote backChannelInputStream
 
 
    private boolean trace;
    private boolean debug;
    private boolean info;


   

Parameters:
virtualSelector
Returns:
 
    public static BackChannelThread getBackChannelThread(VirtualSelector virtualSelector)
    {
       return new BackChannelThread(virtualSelector);
    }


   

Parameters:
manager
Throws:
IOException
   public Protocol(MultiplexingManager managerthrows IOException
   {
       = .isTraceEnabled();
       = .isDebugEnabled();
       = .isInfoEnabled();
   }


   

Parameters:
is
socketId
Returns:
Throws:
IOException
   public SocketId connect(MultiplexingInputStream isSocketId socketIdthrows IOException
   {
      return connect(issocketId, 0);
   }


   

Parameters:
is
socketId
Returns:
Throws:
IOException
   public SocketId connect(MultiplexingInputStream isSocketId socketIdint timeoutthrows IOException
   {
      .debug("entering Protocol.connect()");
      long start = System.currentTimeMillis();
      int timeLeft = 0;
      int savedTimeout = is.getTimeout();
      synchronized ()
      {
         if (.debug("Protocol.connect(): wrote: CONNECT (" +  + ")");
         .writeInt(socketId.getPort());
         if (.debug("Protocol.connect(): wrote port: " + socketId.getPort());
      }
      try
      {
         if (timeout > 0)
            if ((timeLeft = timeout - (int)(System.currentTimeMillis() - start)) <= 0)
               throw new SocketTimeoutException();
         is.setTimeout(timeLeft);
         int messageType = is.read();
         if (.debug("Protocol.connect(): read message type: " + messageType);
         switch (messageType)
         {
            case :
               if (timeout > 0)
                  if ((timeLeft = timeout - (int) (System.currentTimeMillis() - start)) <= 0)
                     throw new SocketTimeoutException("connect timed out");
               is.setTimeout(timeLeft);
               int remotePort = is.readInt();
               if (.debug("Protocol.connect(): read port: " + remotePort);
               return new SocketId(remotePort);
            default:
               .error("Protocol.connect(): expecting a CONNECTED message: received: " + messageType);
            throw new IOException("Protocol.connect(): expecting a CONNECTED message: received: " + messageType);
         }
      }
      catch (SocketTimeoutException e)
      {
         .info("timeout in Protocol.connect()");
         throw e;
      }
      catch (Exception e)
      {
         .error(e);
         StackTraceElement[] stes = e.getStackTrace();
         for (int i = 0; i < stes.lengthi++)
            .error(stes[i].toString());
         throw new IOException(e.getMessage());
      }
      finally
      {
         is.setTimeout(savedTimeout);
      }
   }


   

Parameters:
is
timeout
Returns:
Throws:
IOException
   public SocketId acceptConnect(MultiplexingInputStream isint timeoutthrows IOException
   {
      .debug("entered acceptConnect()");
      long start = System.currentTimeMillis();
      int timeLeft = timeout;
      int savedTimeout = is.getTimeout();
      try
      {
         is.setTimeout(timeLeft);
         int messageType = is.read();
         if (.debug("Protocol.acceptConnect(): read message type: " + messageType);
         switch (messageType)
         {
            case :
               if (timeout > 0)
                  if ((timeLeft = timeout - (int)(System.currentTimeMillis() - start)) <= 0)
                     throw new SocketTimeoutException();
               is.setTimeout(timeLeft);
               int remotePort = is.readInt();
               if (.debug("Protocol.acceptConnect(): read port: " + remotePort);
               return new SocketId(remotePort);
            case -1:
               .info("Protocol.acceptConnect(): end of file");
               throw new EOFException();
            default:
               .error("Protocol.acceptConnect: expecting a CONNECT message: received: " + messageType);
            throw new IOException("Protocol.acceptConnect: expecting a CONNECT message: received: " + messageType);
         }
      }
      catch (SocketTimeoutException e)
      {
         .info("timeout in Protocol.acceptConnect()");;
         throw e;
      }
      finally
      {
         is.setTimeout(savedTimeout);
      }
   }


   

Parameters:
os
port
Throws:
IOException
   public void answerConnect(MultiplexingOutputStream osint portthrows IOException
   {
      os.write();
      if (.debug("Protocol.answerConnect(): wrote: CONNECTED (" +  + ")");
      os.writeInt(port);
      if (.debug("Protocol.answerConnect(): wrote port: " + port);
   }


   

Parameters:
socketId
   public void notifyOutputShutdown(SocketId socketId)
   {
      int port = socketId.getPort();
      try
      {
         synchronized ()
         {
            .write(port);
            .writeInt(portport);
         }
         if (.debug("Protocol.notifyOutputShutdown(): wrote: OUTPUT_SHUTDOWN (" +  + ") for port: " + port);
      }
      catch (IOException ignored)
      {
         .error("Protocol.notifyOutputShutdown(): unable to send MP_OUTPUT_SHUTDOWN message to port: " + port);
      }
   }


   

Parameters:
socketId
   public void disconnect(SocketId socketId)
   {
      int port = socketId.getPort();
      try
      {
         synchronized ()
         {
            .write(port);
            .writeInt(portport);
         }
         if (.debug("Protocol.disconnect(): wrote: DISCONNECT (" +  + ") for port: " + port);
      }
      catch (IOException ignored)
      {
         .error("Protocol.disconnect(): unable to send DISCONNECT message to port: " + port);
      }
   }


   

Parameters:
timeout
   public void registerRemoteServerSocket(int timeoutthrows IOException
   {
      int answer = ;
      synchronized ()
      {
         synchronized ()
         {
         }
         if (.debug("Protocol.registerRemoteServerSocket(): wrote: REGISTER_REMOTE_SERVER (" +  + ")");
         .setTimeout(timeout);
         answer = .read();
      }
      if (.debug("Protocol.registerRemoteServerSocket(): read: " + (answer ==  ? "true" : "false"));
      if (answer == )
         throw new IOException("unable to register remote socket");
   }


   
   public void unregisterRemoteServerSocket()
   {
      .debug("unregisterRemoteServerSocket()");
      try
      {
         synchronized ()
         {
         }
         if (.debug("Protocol.disconnect(): wrote: UNREGISTER_REMOTE_SERVER (" +  + ")");
      }
      catch (IOException ignored)
      {
         .error("Protocol.unregisterRemoteServerSocket(): unable to send UNREGISTER_REMOTE_SERVER");
      }
   }
   public boolean requestManagerShutdown(int timeoutthrows IOException
   {
      int b;
      synchronized ()
      {
         synchronized ()
         {
         }
         if (.debug("Protocol.requestManagerShutdown(): wrote: REQUEST_MANAGER_SHUTDOWN (" +  + ")");
         .setTimeout(timeout);
         b = .read();
      }
      boolean answer = (b == ) ? true : false;
      if (.debug("Protocol.requestManagerShutdown(): read: " + answer);
      return answer;
   }


   
   static class BackChannelThread extends StoppableThread
   {
      VirtualSocket socket;
      public BackChannelThread(VirtualSelector virtualSelector)
      {
         this. = virtualSelector;
      }


      
      public void shutdown()
      {
         .debug("back channel thread: beginning shut down");
         super.shutdown();
         .close();
         interrupt();
      }


      
      protected void doInit()
      {
         .debug("back channel thread starting");
      }


      
      protected void doRun()
      {
         MultiplexingManager manager = null;
         Map streamMap;
         int messageType;
         int port;
         int answer;
//         while (null == (streamMap = virtualSelector.select()) && running)
//            log.debug("select() loop");
//
//         if (!running)
//            return;
         streamMap = .select();
         if (streamMap == null)
            return;
         Iterator it = streamMap.keySet().iterator();
         while (it.hasNext())
         {
            try
            {
               MultiplexingInputStream is = (MultiplexingInputStreamit.next();
               if (is.available() == 0)
               {
                  .debug("available == 0");
                  .remove(is);
                  continue;
               }
               manager = (MultiplexingManagerstreamMap.get(is);
               if (manager == null)
                  continue;
               OutputStream os = manager.getBackchannelOutputStream();
               messageType = is.read();
               .debug("back channel thread: read message type: " + messageType);
               switch (messageType)
               {
                  case :
                     port = is.readInt();
                     if (.isDebugEnabled())
                        .debug("back channel thread: read OUTPUT_SHUTDOWN for port: " + port);
                      = manager.getSocketByLocalPort(new SocketId(port));
                     if ( == null)
                     {
                        .info("back channel thread (OUTPUT_SHUTDOWN): unable to retrieve socket at port: " + port);
                     }
                     else
                     {
                        .handleRemoteOutputShutDown();
                     }
                     break;
                  case :
                     port = is.readInt();
                     .debug("back channel thread: read DISCONNECT for port: " + port);
                      = manager.getSocketByLocalPort(new SocketId(port));
                     if ( == null)
                     {
                        .info("back channel thread (DISCONNECT): unable to retrieve socket at port: " + port);
                     }
                     else
                     {
                        .handleRemoteDisconnect();
                     }
                     break;
                  case :
                     // remote VirtualServerSocket is starting up
                     .debug("back channel thread: read REGISTER_REMOTE_SERVER");
                     answer = ;
                     try
                     {
                        manager.registerRemoteServerSocket();
                     }
                     catch (Exception e)
                     {
                        answer = ;
                        .info("back channel thread: unable to register remote server"e);
                     }
                     os.write(answer);
                     break;
                  case :
                     // remote VirtualServerSocket is shutting down
                     .debug("back channel thread: read UNREGISTER_REMOTE_SERVER");
                     manager.unRegisterRemoteServerSocket();
                     break;
                  case :
                     // remote MultiplexingManager is shutting down
                     .debug("back channel thread: read REQUEST_MANAGER_SHUTDOWN");
                     answer = manager.respondToShutdownRequest() ?  : ;
                     if (.isDebugEnabled()) .debug("back channel thread: writing " + answer);
                     os.write(answer);
                     break;
                  default:
                     .error("unexpected message type in back channel thread: " + messageType);
               }
            }
            catch (InterruptedIOException e)
            {
               if (isRunning())
                  .error("back channel thread: i/o interruption"e);
               else
                  .error("back channel thread: i/o interruption");
            }
            catch (IOException e)
            {
               if (isRunning())
               {
                  .error("back channel thread: i/o error: " + manager.getSocket().toString(), e);
               }
               else
                  .error("back channel thread: i/o error");
            }
         }
      }


      
      protected void doShutDown()
      {
         .debug("back channel thread shutting down");
      }
   }
New to GrepCode? Check out our FAQ X