Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
  * JBoss, Home of Professional Open Source
  * Copyright 2005, JBoss Inc., and individual contributors as indicated
  * by the @authors tag. See the copyright.txt in the distribution for a
  * full listing of individual contributors.
  *
  * This is free software; you can redistribute it and/or modify it
  * under the terms of the GNU Lesser General Public License as
  * published by the Free Software Foundation; either version 2.1 of
 * the License, or (at your option) any later version.
 *
 * This software is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this software; if not, write to the Free
 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
 */
 
 package org.jboss.remoting.transport.rmi;
 
 import  org.jboss.logging.Logger;
 import  org.jboss.util.threadpool.BasicThreadPool;
 import  org.jboss.util.threadpool.BlockingMode;
 import  org.jboss.util.threadpool.RunnableTaskWrapper;
 import  org.jboss.util.threadpool.Task;
 import  org.jboss.util.threadpool.ThreadPool;
 
 import java.util.List;
 import java.util.Map;

RMIClientInvoker

Author(s):
Jeff Haynie
Tom Elrod
Version:
$Revision: 4177 $
 
 public class RMIClientInvoker extends RemoteClientInvoker
 {
   
Key for the configuration map that determines the threadpool size for simulated timeouts.
 
    public static final String MAX_NUM_TIMEOUT_THREADS = "maxNumTimeoutThreads";

   
Key for the configuration map that determines the queue size for simulated timeout threadpool.
 
    public static final String MAX_TIMEOUT_QUEUE_SIZE = "maxTimeoutQueueSize";
   
   
Specifies the default number of work threads in the thread pool for simulating timeouts.
 
    public static final int MAX_NUM_TIMEOUT_THREADS_DEFAULT = 10;
    
    private static final Logger log = Logger.getLogger(RMIClientInvoker.class);
    private static final boolean isTraceEnabled = .isTraceEnabled();
    
    private RMIServerInvokerInf server;
    
    private Object timeoutThreadPoolLock = new Object();
    private ThreadPool timeoutThreadPool;
   
   protected boolean rmiOnewayMarshalling;

   
Need flag to indicate if have been able to lookup registry and set stub. Can't do this in the constructor, as need to throw CannotConnectException so for clustering capability.

Parameters:
locator
   private boolean connected = false;
   public RMIClientInvoker(InvokerLocator locator)
   {
      super(locator);
      configureParameters();
   }
   public RMIClientInvoker(InvokerLocator locatorMap configuration)
   {
      super(locatorconfiguration);
      configureParameters();
   }
   
   protected void configureParameters()
   {
      Map params = ;
      if (params == null)
      {
         return;
      }
      // look for enableTcpNoDelay param
      if (val instanceof String)
      {
         try
         {
             = Boolean.valueOf((String)val).booleanValue();
            .debug(this + " setting rmiOnewayMarshalling to " + );
         }
         catch (Exception e)
         {
            .warn(this + " could not convert " + . +
                     " value of " + val + " to a boolean value.  Defaulting to false");
         }
      }
      else if (val != null)
      {
         .warn(this + " value of " + . +
                  " (" + val + ") must be a String.  Defaulting to false");
      }
   }
   private int getRegistryPort(InvokerLocator locator)
   {
      int port = .;
      // See if locator contains a specific registry port
      Map params = locator.getParameters();
      if(params != null)
      {
         String value = (Stringparams.get(.);
         if(value != null)
         {
            try
            {
               port = Integer.parseInt(value);
               .debug("Using port " + port + " for rmi registry.");
            }
            catch(NumberFormatException e)
            {
               .error("Can not set the RMIServerInvoker RMI registry to port " + value + ".  This is not a valid port number.");
            }
         }
      }
      return port;
   }

   
get the server stub

Parameters:
server
   public void setServerStub(RMIServerInvokerInf server)
   {
      this. = server;
      .trace(this.);
   }

   
return the RMI server stub

Returns:
   {
      return this.;
   }

   
subclasses must implement this method to provide a hook to connect to the remote server, if this applies to the specific transport. However, in some transport implementations, this may not make must difference since the connection is not persistent among invocations, such as SOAP. In these cases, the method should silently return without any processing.

Throws:
ConnectionFailedException
   protected void handleConnect()
   {  
      int registryPort = getRegistryPort();
      Home home = null;
      Exception savedException = null;
      Iterator it = getConnectHomes().iterator();
      
      while (it.hasNext())
      {
         //TODO: -TME Need to figure this out a little better as am now dealing with
         // with 2 ports, the rmi server and the registry.
         try
         {
            home = (Homeit.next();
            String host = home.host;
            final int port = home.port;
            .setHomeInUse(home);
            storeLocalConfig();
            .debug(this + " looking up registry: " + host + "," + port);
            final Registry registry = LocateRegistry.getRegistry(hostregistryPort);
            .debug(this + " trying to connect to: " + home);
            Remote remoteObj = SecurityUtility.lookup(registry"remoting/RMIServerInvoker/" + port);
            .debug("Remote RMI Stub: " + remoteObj);
            setServerStub((RMIServerInvokerInfremoteObj);
             = true;
            break;
         }
         catch(Exception e)
         {
            savedException = e;
             = false;
            RemotingRMIClientSocketFactory.removeLocalConfiguration();
            .trace("Unable to connect RMI invoker client to " + homee);
         }
      }
      if ( == null)
      {
         String message = this + " unable to connect RMI invoker client";
         .debug(message);
         throw new CannotConnectException(messagesavedException);
      }
   }
   
   protected Home getUsableAddress()
   {
      InvokerLocator savedLocator = ;
      String protocol = savedLocator.getProtocol();
      String path = savedLocator.getPath();
      Map params = savedLocator.getParameters();
      List homes = .getConnectHomeList();
      Home home = null;
      
      Iterator it = homes.iterator();
      while (it.hasNext())
      {
         try
         {
            home = (Homeit.next();
             = new InvokerLocator(protocolhome.hosthome.portpathparams);
            invoke(new InvocationRequest(nullnull.nullnullnull));
            if (.isTraceEnabled()) .trace(this + " able to contact server at: " + home);
            return home;
         }
         catch (Throwable e)
         {
            .debug(this + " unable to contact server at: " + home);
         }
         finally
         {
             = savedLocator;
         }
      }
   
      return home;
   }

   
subclasses must implement this method to provide a hook to disconnect from the remote server, if this applies to the specific transport. However, in some transport implementations, this may not make must difference since the connection is not persistent among invocations, such as SOAP. In these cases, the method should silently return without any processing.
   protected void handleDisconnect()
   {
      RemotingRMIClientSocketFactory.removeLocalConfiguration();
   }
   protected String getDefaultDataType()
   {
      return .;
   }
   protected void storeLocalConfig(Map config)
   {
      HashMap localConfig = new HashMap(config);
      // If a specific SocketFactory was passed in, use it.  If a SocketFactory was
      // generated from SSL parameters, discard it.  It will be recreated later by
      // SerializableSSLClientSocketFactory with any additional parameters sent
      // from server.
      if ( != null &&
            ! &&
            AbstractInvoker.isCompleteSocketFactory())
         localConfig.put(.);
      // Save configuration for SerializableSSLClientSocketFactory.
      RemotingRMIClientSocketFactory.addLocalConfiguration(localConfig);
   }
   
   protected Object transport(String sessionIdObject invocationMap metadataMarshaller marshallerUnMarshaller unmarshaller)
         throws IOExceptionConnectionFailedException
   {
      if(this. == null)
      {
         .debug("Server stub has not been set in RMI invoker client.  See previous errors for details.");
         //throw new IOException("Server stub hasn't been set!");
         throw new CannotConnectException("Server stub has not been set.");
      }
      try
      {
         Object payload = invocation;
         if(marshaller != null && !(marshaller instanceof RMIMarshaller))
         {
            if(marshaller instanceof MarshallerDecorator)
            {
               payload = ((MarshallerDecoratormarshaller).addDecoration(payload);
            }
            else
            {
               ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
               if (marshaller instanceof VersionedMarshaller)
                  ((VersionedMarshallermarshaller).write(payloadbyteOutgetVersion());
               else
                  marshaller.write(payloadbyteOut);
               
               byteOut.close();
               
               if ()
               {
                  // Legacy treatment, pre 2.4.0.
                  ByteArrayInputStream bais = new ByteArrayInputStream(byteOut.toByteArray());
                  SerializationManager manager = SerializationStreamFactory.getManagerInstance(getSerializationType());
                  ObjectInputStream ois = manager.createInput(baisgetClassLoader());
                  try
                  {
                     byteOut.close();
                     payload = SecurityUtility.readObject(ois);
                     ois.close();
                  }
                  catch(ClassNotFoundException e)
                  {
                     .debug("Could not marshall invocation payload object " + payloade);
                     throw new IOException(e.getMessage());
                  }
               }
               else
               {
                  payload = byteOut.toByteArray();
               }
            }
         }
         int simulatedTimeout = getSimulatedTimeout(metadata);
         if (simulatedTimeout <= 0)
         {
            Object result = SecurityUtility.callTransport(payload);
            return unmarshal(resultunmarshallermetadata);
         }
         else
         {
            if (.isTraceEnabled()) .trace("using simulated timeout: " + simulatedTimeout);
            
            class Holder {public Object value;}
            final Holder resultHolder = new Holder();
            final Object finalPayload = payload;
            
            Runnable r = new Runnable()
            {
               public void run()
               {
                  try
                  {
                     resultHolder.value = SecurityUtility.callTransport(finalPayload);
                     if (.isTraceEnabled()) .trace("result: " + resultHolder.value);
                  }
                  catch (Exception e)
                  {
                     resultHolder.value = e;
                     if (.isTraceEnabled()) .trace("exception: " + e); 
                  }
               }
            };
            
            // BasicThreadPool timeout mechanism depends on the interrupted status of
            // the running thread.
            Thread.interrupted();
            
            ThreadPool pool = getTimeoutThreadPool();
            WaitingTaskWrapper wrapper = new WaitingTaskWrapper(rsimulatedTimeout);
            if (.isTraceEnabled()) .trace("starting task in thread pool");
            pool.runTaskWrapper(wrapper);
            if (.isTraceEnabled()) .trace("task finished in thread pool");
            
            Object result = unmarshal(resultHolder.valueunmarshallermetadata);
            if (result == null)
            {
               if (.isTraceEnabled()) .trace("invocation timed out");
               Exception cause = new SocketTimeoutException("timed out");
               throw new CannotConnectException("Can not connect http client invoker."cause);
            }
            else if (result instanceof IOException)
            {
               throw (IOExceptionresult;
            }
            else if (result instanceof RuntimeException)
            {
               throw (RuntimeExceptionresult;
            }
            else
            {
               if (.isTraceEnabled()) .trace("returning result: " + result);
               return result;
            }
         }
      }
      catch(RemoteException e)
      {
         .debug("Error making invocation in RMI client invoker."e);
         throw new CannotConnectException("Error making invocation in RMI client invoker."e);
      }
   }
   
   private int getSimulatedTimeout(Map configurationMap metadata)
   {
      int timeout = -1;
      String connectionTimeout = (Stringconfiguration.get("timeout");
      String invocationTimeout = null;
      if (metadata != nullinvocationTimeout = (Stringmetadata.get("timeout");
      
      if (invocationTimeout != null && invocationTimeout.length() > 0)
      {
         try
         {
            timeout = Integer.parseInt(invocationTimeout);
         }
         catch (NumberFormatException e)
         {
            .warn("Could not set timeout for current invocation because value (" + invocationTimeout + ") is not a number.");
         }
      }
      
      if (timeout < 0 && connectionTimeout != null && connectionTimeout.length() > 0)
      {
         try
         {
            timeout = Integer.parseInt(connectionTimeout);
         }
         catch (NumberFormatException e)
         {
            .warn("Could not set timeout for http client connection because value (" + connectionTimeout + ") is not a number.");
         }
      }
      
      if (timeout < 0)
         timeout = 0;
      return timeout;
   }
   
   
   protected Object unmarshal(Object oUnMarshaller unmarshallerMap metadatathrows IOException
   {
      Object result = o;
      if(unmarshaller != null && !(unmarshaller instanceof RMIUnMarshaller) && !)
      {
         if(unmarshaller instanceof UnMarshallerDecorator)
         {
            result = ((UnMarshallerDecoratorunmarshaller).removeDecoration(o);
         }
         else
         {  
            byte[] byteIn = (byte[]) o;
            ByteArrayInputStream is = new ByteArrayInputStream(byteIn);
            try
            {
               if (unmarshaller instanceof VersionedUnMarshaller)
               {
                  result = ((VersionedUnMarshallerunmarshaller).read(ismetadatagetVersion());
               }
               else
               {
                  result = unmarshaller.read(ismetadata);
               }
            }
            catch(ClassNotFoundException e)
            {
               .debug("Could not unmarshall invocation response" + oe);
               throw new IOException(e.getMessage());
            }
         }
      }
      
      return result;
   }
   
   
Gets the thread pool being used for simulating timeouts. If one has not been specifically set via configuration or call to set it, will always return instance of org.jboss.util.threadpool.BasicThreadPool.
   public ThreadPool getTimeoutThreadPool()
   {
      synchronized ()
      {
         if ( == null)
         {
            int maxNumberThreads = ;
            int maxTimeoutQueueSize = -1;
            
            BasicThreadPool pool = new BasicThreadPool("HTTP timeout");
            .debug(this + " created new simulated timeout thread pool: " + pool);
            Object param = .get();
            if (param instanceof String)
            {
               try
               {
                  maxNumberThreads = Integer.parseInt((Stringparam);
               }
               catch (NumberFormatException  e)
               {
                  .warn("maxNumberThreads parameter has invalid format: " + param);
               }
            }
            else if (param != null)
            {
               .warn("maxNumberThreads parameter must be a string in integer format: " + param);
            }
            param = .get();
            if (param instanceof String)
            {
               try
               {
                  maxTimeoutQueueSize = Integer.parseInt((Stringparam);
               }
               catch (NumberFormatException  e)
               {
                  .warn("maxTimeoutQueueSize parameter has invalid format: " + param);
               }
            }
            else if (param != null)
            {
               .warn("maxTimeoutQueueSize parameter must be a string in integer format: " + param);
            }
            pool.setMaximumPoolSize(maxNumberThreads);
            if (maxTimeoutQueueSize > 0)
            {
               pool.setMaximumQueueSize(maxTimeoutQueueSize);
            }
            pool.setBlockingMode(BlockingMode.RUN);
             = pool;
         }
      }
      
      return ;
   }
   
   
   
When a WaitingTaskWrapper is run in a BasicThreadPool, the calling thread will block for the designated timeout period.
   static class WaitingTaskWrapper extends RunnableTaskWrapper
   {
      long completeTimeout;
      
      public WaitingTaskWrapper(Runnable runnablelong completeTimeout)
      {
         super(runnable, 0, completeTimeout);
         this. = completeTimeout;
      }
      public int getTaskWaitType()
      {
         return Task.WAIT_FOR_COMPLETE;
      }
      public String toString()
      {
         return "WaitingTaskWrapper[" +  + "]";
      }
   }
New to GrepCode? Check out our FAQ X