Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Copyright (C) 2011, FuseSource Corp. All rights reserved. http://fusesource.com The software in this package is published under the terms of the CDDL license a copy of which has been included with this distribution in the license.txt file.
  
  package org.fusesource.fabric.dosgi.tcp;
 
 
 import java.util.Map;
 
 public class ServerInvokerImpl implements ServerInvokerDispatched {
 
     protected static final Logger LOGGER = LoggerFactory.getLogger(ServerInvokerImpl.class);
     static private final HashMap<StringClassPRIMITIVE_TO_CLASS = new HashMap<StringClass>(8, 1.0F);
     static {
         .put("Z"boolean.class);
         .put("B"byte.class);
         .put("C"char.class);
         .put("S"short.class);
         .put("I"int.class);
         .put("J"long.class);
         .put("F"float.class);
         .put("D"double.class);
     }
 
     protected final ExecutorService blockingExecutor = Executors.newFixedThreadPool(8);
     protected final DispatchQueue queue;
     protected final TransportServer server;
     protected final Map<UTF8BufferServiceFactoryHolderholders = new HashMap<UTF8BufferServiceFactoryHolder>();
 
     static class MethodData {
 
         private final SerializationStrategy serializationStrategy;
         final InvocationStrategy invocationStrategy;
         final Method method;
 
         MethodData(InvocationStrategy invocationStrategySerializationStrategy serializationStrategyMethod method) {
             this. = invocationStrategy;
             this. = serializationStrategy;
             this. = method;
         }
     }
 
     class ServiceFactoryHolder {
 
         private final ServiceFactory factory;
         private final ClassLoader loader;
         private final Class clazz;
         private HashMap<BufferMethodDatamethod_cache = new HashMap<BufferMethodData>();
 
         public ServiceFactoryHolder(ServiceFactory factoryClassLoader loader) {
             this. = factory;
             this. = loader;
             Object o = factory.get();
              = o.getClass();
             factory.unget();
         }
 
             MethodData rc = .get(data);
             ifrc == null ) {
                 String[] parts = data.utf8().toString().split(",");
                 String name = parts[0];
                 Class params[] = new Class[parts.length-1];
                 forint  i=0; i < params.lengthi++) {
                     params[i] = decodeClass(parts[i+1]);
                 }
                 Method method = .getMethod(nameparams);
 
 
                 Serialization annotation = method.getAnnotation(Serialization.class);
                 SerializationStrategy serializationStrategy;
                 ifannotation!=null ) {
                     serializationStrategy = .get(annotation.value());
                     ifserializationStrategy==null ) {
                         throw new RuntimeException("Could not find the serialization strategy named: "+annotation.value());
                    }
                } else {
                    serializationStrategy = .;
                }
                final InvocationStrategy invocationStrategy;
                if( AsyncInvocationStrategy.isAsyncMethod(method) ) {
                    invocationStrategy = .;
                } else {
                    invocationStrategy = .;
                }
                rc = new MethodData(invocationStrategyserializationStrategymethod);
                .put(datarc);
            }
            return rc;
        }
        private Class<?> decodeClass(String sthrows ClassNotFoundException {
            ifs.startsWith("[")) {
                Class<?> nested = decodeClass(s.substring(1));
                return Array.newInstance(nested,0).getClass();
            }
            String c = s.substring(0,1);
            ifc.equals("L") ) {
                return .loadClass(s.substring(1));
            } else {
                return .get(c);
            }
        }
    }
    public ServerInvokerImpl(String addressDispatchQueue queueMap<StringSerializationStrategyserializationStrategiesthrows Exception {
        this. = queue;
        this. = serializationStrategies;
        this. = new TcpTransportFactory().bind(address);
        this..setDispatchQueue(queue);
        this..setAcceptListener(new InvokerAcceptListener());
    }
        return this..getSocketAddress();
    }
    public DispatchQueue queue() {
        return ;
    }
    public String getConnectAddress() {
        return this..getConnectAddress();
    }
    public void registerService(final String idfinal ServiceFactory servicefinal ClassLoader classLoader) {
        queue().execute(new Runnable() {
            public void run() {
                .put(new UTF8Buffer(id), new ServiceFactoryHolder(serviceclassLoader));
            }
        });
    }
    public void unregisterService(final String id) {
        queue().execute(new Runnable() {
            public void run() {
                .remove(new UTF8Buffer(id));
            }
        });
    }
    public void start() throws Exception {
        start(null);
    }
    public void start(Runnable onCompletethrows Exception {
        this..start(onComplete);
    }
    public void stop() {
        stop(null);
    }
    public void stop(final Runnable onComplete) {
        this..stop(new Runnable() {
            public void run() {
                .shutdown();
                if (onComplete != null) {
                    onComplete.run();
                }
            }
        });
    }
    protected void onCommand(final Transport transportObject data) {
        try {
            final DataByteArrayInputStream bais = new DataByteArrayInputStream((Bufferdata);
            final int size = bais.readInt();
            final long correlation = bais.readVarLong();
            // Use UTF8Buffer instead of string to avoid encoding/decoding UTF-8 strings
            // for every request.
            final UTF8Buffer service = readBuffer(bais).utf8();
            final Buffer encoded_method = readBuffer(bais);
            final ServiceFactoryHolder holder = .get(service);
            final MethodData methodData = holder.getMethodData(encoded_method);
            final Object svc = holder.factory.get();
            Runnable task = new Runnable() {
                public void run() {
                    final DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
                    try {
                        baos.writeInt(0); // make space for the size field.
                        baos.writeVarLong(correlation);
                    } catch (IOException e) { // should not happen
                        throw new RuntimeException(e);
                    }
                    // Lets decode the remaining args on the target's executor
                    // to take cpu load off the
                    methodData.invocationStrategy.service(methodData.serializationStrategyholder.loadermethodData.methodsvcbaisbaosnew Runnable() {
                        public void run() {
                            holder.factory.unget();
                            final Buffer command = baos.toBuffer();
                            // Update the size field.
                            BufferEditor editor = command.buffer().bigEndianEditor();
                            editor.writeInt(command.length);
                            queue().execute(new Runnable() {
                                public void run() {
                                    transport.offer(command);
                                }
                            });
                        }
                    });
                }
            };
            Executor executor;
            ifsvc instanceof Dispatched ) {
                executor = ((Dispatched)svc).queue();
            } else {
                executor = ;
            }
            executor.execute(task);
        } catch (Exception e) {
            .info("Error while reading request"e);
        }
    }
    private Buffer readBuffer(DataByteArrayInputStream baisthrows IOException {
        byte b[] = new byte[bais.readVarInt()];
        bais.readFully(b);
        return new Buffer(b);
    }
    class InvokerAcceptListener implements TransportAcceptListener {
        public void onAccept(TransportServer transportServerTcpTransport transport) {
            transport.setProtocolCodec(new LengthPrefixedCodec());
            transport.setDispatchQueue(queue());
            transport.setTransportListener(new InvokerTransportListener());
            transport.start();
        }
        public void onAcceptError(TransportServer transportServerException error) {
            .info("Error accepting incoming connection"error);
        }
    }
    class InvokerTransportListener implements TransportListener {
        public void onTransportCommand(Transport transportObject command) {
            ServerInvokerImpl.this.onCommand(transportcommand);
        }
        public void onRefill(Transport transport) {
        }
        public void onTransportFailure(Transport transportIOException error) {
            if (!transport.isDisposed() && !(error instanceof EOFException)) {
                .info("Transport failure"error);
            }
        }
        public void onTransportConnected(Transport transport) {
            transport.resumeRead();
        }
        public void onTransportDisconnected(Transport transport) {
        }
    }
New to GrepCode? Check out our FAQ X