Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Copyright (C) FuseSource, Inc. http://fusesource.com Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 package org.fusesource.fabric.dosgi.tcp;
 
 
 import java.util.Map;
 
 public class ServerInvokerImpl implements ServerInvokerDispatched {
 
     protected static final Logger LOGGER = LoggerFactory.getLogger(ServerInvokerImpl.class);
     static private final HashMap<StringClassPRIMITIVE_TO_CLASS = new HashMap<StringClass>(8, 1.0F);
     static {
         .put("Z"boolean.class);
         .put("B"byte.class);
         .put("C"char.class);
         .put("S"short.class);
         .put("I"int.class);
         .put("J"long.class);
         .put("F"float.class);
         .put("D"double.class);
     }
 
     protected final ExecutorService blockingExecutor = Executors.newFixedThreadPool(8);
     protected final DispatchQueue queue;
     protected final TransportServer server;
     protected final Map<UTF8BufferServiceFactoryHolderholders = new HashMap<UTF8BufferServiceFactoryHolder>();
 
     static class MethodData {
 
         private final SerializationStrategy serializationStrategy;
         final InvocationStrategy invocationStrategy;
         final Method method;
 
         MethodData(InvocationStrategy invocationStrategySerializationStrategy serializationStrategyMethod method) {
             this. = invocationStrategy;
             this. = serializationStrategy;
             this. = method;
         }
     }
 
     class ServiceFactoryHolder {
 
         private final ServiceFactory factory;
         private final ClassLoader loader;
         private final Class clazz;
         private HashMap<BufferMethodDatamethod_cache = new HashMap<BufferMethodData>();
 
         public ServiceFactoryHolder(ServiceFactory factoryClassLoader loader) {
             this. = factory;
             this. = loader;
             Object o = factory.get();
              = o.getClass();
             factory.unget();
         }
 
             MethodData rc = .get(data);
             ifrc == null ) {
                 String[] parts = data.utf8().toString().split(",");
                 String name = parts[0];
                 Class params[] = new Class[parts.length-1];
                 forint  i=0; i < params.lengthi++) {
                     params[i] = decodeClass(parts[i+1]);
                 }
                 Method method = .getMethod(nameparams);
                Serialization annotation = method.getAnnotation(Serialization.class);
                SerializationStrategy serializationStrategy;
                ifannotation!=null ) {
                    serializationStrategy = .get(annotation.value());
                    ifserializationStrategy==null ) {
                        throw new RuntimeException("Could not find the serialization strategy named: "+annotation.value());
                    }
                } else {
                    serializationStrategy = .;
                }
                final InvocationStrategy invocationStrategy;
                if( AsyncInvocationStrategy.isAsyncMethod(method) ) {
                    invocationStrategy = .;
                } else {
                    invocationStrategy = .;
                }
                rc = new MethodData(invocationStrategyserializationStrategymethod);
                .put(datarc);
            }
            return rc;
        }
        private Class<?> decodeClass(String sthrows ClassNotFoundException {
            ifs.startsWith("[")) {
                Class<?> nested = decodeClass(s.substring(1));
                return Array.newInstance(nested,0).getClass();
            }
            String c = s.substring(0,1);
            ifc.equals("L") ) {
                return .loadClass(s.substring(1));
            } else {
                return .get(c);
            }
        }
    }
    public ServerInvokerImpl(String addressDispatchQueue queueMap<StringSerializationStrategyserializationStrategiesthrows Exception {
        this. = queue;
        this. = serializationStrategies;
        this. = new TcpTransportFactory().bind(address);
        this..setDispatchQueue(queue);
        this..setAcceptListener(new InvokerAcceptListener());
    }
        return this..getSocketAddress();
    }
    public DispatchQueue queue() {
        return ;
    }
    public String getConnectAddress() {
        return this..getConnectAddress();
    }
    public void registerService(final String idfinal ServiceFactory servicefinal ClassLoader classLoader) {
        queue().execute(new Runnable() {
            public void run() {
                .put(new UTF8Buffer(id), new ServiceFactoryHolder(serviceclassLoader));
            }
        });
    }
    public void unregisterService(final String id) {
        queue().execute(new Runnable() {
            public void run() {
                .remove(new UTF8Buffer(id));
            }
        });
    }
    public void start() throws Exception {
        start(null);
    }
    public void start(Runnable onCompletethrows Exception {
        this..start(onComplete);
    }
    public void stop() {
        stop(null);
    }
    public void stop(final Runnable onComplete) {
        this..stop(new Runnable() {
            public void run() {
                .shutdown();
                if (onComplete != null) {
                    onComplete.run();
                }
            }
        });
    }
    protected void onCommand(final Transport transportObject data) {
        try {
            final DataByteArrayInputStream bais = new DataByteArrayInputStream((Bufferdata);
            final int size = bais.readInt();
            final long correlation = bais.readVarLong();
            // Use UTF8Buffer instead of string to avoid encoding/decoding UTF-8 strings
            // for every request.
            final UTF8Buffer service = readBuffer(bais).utf8();
            final Buffer encoded_method = readBuffer(bais);
            final ServiceFactoryHolder holder = .get(service);
            final MethodData methodData = holder.getMethodData(encoded_method);
            final Object svc = holder.factory.get();
            Runnable task = new Runnable() {
                public void run() {
                    final DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
                    try {
                        baos.writeInt(0); // make space for the size field.
                        baos.writeVarLong(correlation);
                    } catch (IOException e) { // should not happen
                        throw new RuntimeException(e);
                    }
                    // Lets decode the remaining args on the target's executor
                    // to take cpu load off the
                    methodData.invocationStrategy.service(methodData.serializationStrategyholder.loadermethodData.methodsvcbaisbaosnew Runnable() {
                        public void run() {
                            holder.factory.unget();
                            final Buffer command = baos.toBuffer();
                            // Update the size field.
                            BufferEditor editor = command.buffer().bigEndianEditor();
                            editor.writeInt(command.length);
                            queue().execute(new Runnable() {
                                public void run() {
                                    transport.offer(command);
                                }
                            });
                        }
                    });
                }
            };
            Executor executor;
            ifsvc instanceof Dispatched ) {
                executor = ((Dispatched)svc).queue();
            } else {
                executor = ;
            }
            executor.execute(task);
        } catch (Exception e) {
            .info("Error while reading request"e);
        }
    }
    private Buffer readBuffer(DataByteArrayInputStream baisthrows IOException {
        byte b[] = new byte[bais.readVarInt()];
        bais.readFully(b);
        return new Buffer(b);
    }
    class InvokerAcceptListener implements TransportAcceptListener {
        public void onAccept(TransportServer transportServerTcpTransport transport) {
            transport.setProtocolCodec(new LengthPrefixedCodec());
            transport.setDispatchQueue(queue());
            transport.setTransportListener(new InvokerTransportListener());
            transport.start();
        }
        public void onAcceptError(TransportServer transportServerException error) {
            .info("Error accepting incoming connection"error);
        }
    }
    class InvokerTransportListener implements TransportListener {
        public void onTransportCommand(Transport transportObject command) {
            ServerInvokerImpl.this.onCommand(transportcommand);
        }
        public void onRefill(Transport transport) {
        }
        public void onTransportFailure(Transport transportIOException error) {
            if (!transport.isDisposed() && !(error instanceof EOFException)) {
                .info("Transport failure"error);
            }
        }
        public void onTransportConnected(Transport transport) {
            transport.resumeRead();
        }
        public void onTransportDisconnected(Transport transport) {
        }
    }
New to GrepCode? Check out our FAQ X