Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Copyright (C) 2011, FuseSource Corp. All rights reserved. http://fusesource.com The software in this package is published under the terms of the CDDL license a copy of which has been included with this distribution in the license.txt file.
  
  package org.fusesource.fabric.dosgi.tcp;
 
 
 import java.util.Map;
 
 public class ClientInvokerImpl implements ClientInvokerDispatched {
 
     public static final long DEFAULT_TIMEOUT = ..toMillis(5);
 
     protected static final Logger LOGGER = LoggerFactory.getLogger(ClientInvokerImpl.class);
 
     private final static HashMap<Class,StringCLASS_TO_PRIMITIVE = new HashMap<ClassString>(8, 1.0F);
 
     static {
         .put(boolean.class,"Z");
         .put(byte.class,"B");
         .put(char.class,"C");
         .put(short.class,"S");
         .put(int.class,"I");
         .put(long.class,"J");
         .put(float.class,"F");
         .put(double.class,"D");
     }
 
     protected final AtomicLong correlationGenerator = new AtomicLong();
     protected final DispatchQueue queue;
     protected final Map<StringTransportPooltransports = new HashMap<StringTransportPool>();
     protected final AtomicBoolean running = new AtomicBoolean(false);
     protected final Map<LongResponseFuturerequests = new HashMap<LongResponseFuture>();
     protected final long timeout;
 
     public ClientInvokerImpl(DispatchQueue queueMap<StringSerializationStrategyserializationStrategies) {
         this(queueserializationStrategies);
     }
 
     public ClientInvokerImpl(DispatchQueue queuelong timeoutMap<StringSerializationStrategyserializationStrategies) {
         this. = queue;
         this. = timeout;
         this. = serializationStrategies;
     }
 
     public DispatchQueue queue() {
         return ;
     }
 
     public void start() throws Exception {
         start(null);
     }
 
     public void start(Runnable onCompletethrows Exception {
         .set(true);
         if (onComplete != null) {
             onComplete.run();
         }
     }
 
     public void stop() {
         stop(null);
     }
 
     public void stop(final Runnable onComplete) {
         if (.compareAndSet(truefalse)) {
             queue().execute(new Runnable() {
                 public void run() {
                     final AtomicInteger latch = new AtomicInteger(.size());
                     final Runnable coutDown = new Runnable() {
                         public void run() {
                             if (latch.decrementAndGet() == 0) {
                                 if (onComplete != null) {
                                    onComplete.run();
                                }
                            }
                        }
                    };
                    for (TransportPool pool : .values()) {
                        pool.stop(coutDown);
                    }
                }
            });
        } else {
            if (onComplete != null) {
                onComplete.run();
            }
        }
    }
    public InvocationHandler getProxy(String addressString serviceClassLoader classLoader) {
        return new ProxyInvocationHandler(addressserviceclassLoader);
    }
    protected void onCommand(Object data) {
        try {
            DataByteArrayInputStream bais = new DataByteArrayInputStream( (Bufferdata);
            int size = bais.readInt();
            long correlation = bais.readVarLong();
            ResponseFuture response = .remove(correlation);
            ifresponse!=null ) {
                response.set(bais);
            }
        } catch (Exception e) {
            .info("Error while reading response"e);
        }
    }
    static final WeakHashMap<MethodMethodDatamethod_cache = new WeakHashMap<MethodMethodData>();
    static class MethodData {
        private final SerializationStrategy serializationStrategy;
        final Buffer signature;
        final InvocationStrategy invocationStrategy;
        MethodData(InvocationStrategy invocationStrategySerializationStrategy serializationStrategyBuffer signature) {
            this. = invocationStrategy;
            this. = serializationStrategy;
            this. = signature;
        }
    }
    private MethodData getMethodData(Method methodthrows IOException {
        MethodData rc = null;
        synchronized () {
            rc = .get(method);
        }
        ifrc==null ) {
            StringBuilder sb = new StringBuilder();
            sb.append(method.getName());
            sb.append(",");
            Class<?>[] types = method.getParameterTypes();
            for(int i=0; i < types.lengthi++) {
                ifi!=0 ) {
                    sb.append(",");
                }
                sb.append(encodeClassName(types[i]));
            }
            Buffer signature = new UTF8Buffer(sb.toString()).buffer();
            Serialization annotation = method.getAnnotation(Serialization.class);
            SerializationStrategy serializationStrategy;
            ifannotation!=null ) {
                serializationStrategy = .get(annotation.value());
                ifserializationStrategy==null ) {
                    throw new RuntimeException("Could not find the serialization strategy named: "+annotation.value());
                }
            } else {
                serializationStrategy = .;
            }
            final InvocationStrategy strategy;
            if( AsyncInvocationStrategy.isAsyncMethod(method) ) {
                strategy = .;
            } else {
                strategy = .;
            }
            rc = new MethodData(strategyserializationStrategysignature);
            synchronized () {
                .put(methodrc);
            }
        }
        return rc;
    }
    String encodeClassName(Class<?> type) {
        iftype.getComponentType()!=null ) {
            return "["encodeClassName(type.getComponentType());
        }
        iftype.isPrimitive() ) {
            return .get(type);
        } else {
            return "L"+type.getName();
        }
    }
    protected Object request(ProxyInvocationHandler handlerfinal String addressfinal UTF8Buffer servicefinal ClassLoader classLoaderfinal Method methodfinal Object[] argsthrows Exception {
        final long correlation = .incrementAndGet();
        // Encode the request before we try to pass it onto
        // IO layers so that #1 we can report encoding error back to the caller
        // and #2 reduce CPU load done in the execution queue since it's
        // serially executed.
        DataByteArrayOutputStream baos = new DataByteArrayOutputStream((int) (handler.lastRequestSize*1.10));
        baos.writeInt(0); // we don't know the size yet...
        baos.writeVarLong(correlation);
        writeBuffer(baosservice);
        MethodData methodData = getMethodData(method);
        writeBuffer(baosmethodData.signature);
        final ResponseFuture future = methodData.invocationStrategy.request(methodData.serializationStrategyclassLoadermethodargsbaos);
        // toBuffer() is better than toByteArray() since it avoids an
        // array copy.
        final Buffer command = baos.toBuffer();
        // Update the field size.
        BufferEditor editor = command.buffer().bigEndianEditor();
        editor.writeInt(command.length);
        handler.lastRequestSize = command.length;
        queue().execute(new Runnable() {
            public void run() {
                try {
                    TransportPool pool = .get(address);
                    if (pool == null) {
                        pool = new InvokerTransportPool(addressqueue());
                        .put(addresspool);
                        pool.start();
                    }
                    .put(correlationfuture);
                    pool.offer(command);
                } catch (Exception e) {
                    .info("Error while sending request"e);
                }
            }
        });
        // TODO: make that configurable, that's only for tests
        return future.get(.);
    }
    private void writeBuffer(DataByteArrayOutputStream baosBuffer valuethrows IOException {
        baos.writeVarInt(value.length);
        baos.write(value);
    }
    protected class ProxyInvocationHandler implements InvocationHandler {
        final String address;
        final UTF8Buffer service;
        final ClassLoader classLoader;
        int lastRequestSize = 250;
        public ProxyInvocationHandler(String addressString serviceClassLoader classLoader) {
            this. = address;
            this. = new UTF8Buffer(service);
            this. = classLoader;
        }
        public Object invoke(Object proxyMethod methodObject[] argsthrows Throwable {
            return request(thismethodargs);
        }
    }
    protected class InvokerTransportPool extends TransportPool {
        public InvokerTransportPool(String uriDispatchQueue queue) {
            super(uriqueue. << 1);
        }
        @Override
        protected Transport createTransport(String urithrows Exception {
            return new TcpTransportFactory().connect(uri);
        }
        @Override
        protected ProtocolCodec createCodec() {
            return new LengthPrefixedCodec();
        }
        @Override
        protected void onCommand(Object command) {
            ClientInvokerImpl.this.onCommand(command);
        }
    }
New to GrepCode? Check out our FAQ X