Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Copyright (C) FuseSource, Inc. http://fusesource.com Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 package org.fusesource.fabric.dosgi.tcp;
 
 
 import java.util.Map;
 
 public class ClientInvokerImpl implements ClientInvokerDispatched {
 
     public static final long DEFAULT_TIMEOUT = ..toMillis(5);
 
     protected static final Logger LOGGER = LoggerFactory.getLogger(ClientInvokerImpl.class);
 
     private final static HashMap<Class,StringCLASS_TO_PRIMITIVE = new HashMap<ClassString>(8, 1.0F);
 
     static {
         .put(boolean.class,"Z");
         .put(byte.class,"B");
         .put(char.class,"C");
         .put(short.class,"S");
         .put(int.class,"I");
         .put(long.class,"J");
         .put(float.class,"F");
         .put(double.class,"D");
     }
 
     protected final AtomicLong correlationGenerator = new AtomicLong();
     protected final DispatchQueue queue;
     protected final Map<StringTransportPooltransports = new HashMap<StringTransportPool>();
     protected final AtomicBoolean running = new AtomicBoolean(false);
     protected final Map<LongResponseFuturerequests = new HashMap<LongResponseFuture>();
     protected final long timeout;
 
     public ClientInvokerImpl(DispatchQueue queueMap<StringSerializationStrategyserializationStrategies) {
         this(queueserializationStrategies);
     }
 
     public ClientInvokerImpl(DispatchQueue queuelong timeoutMap<StringSerializationStrategyserializationStrategies) {
         this. = queue;
         this. = timeout;
         this. = serializationStrategies;
     }
 
     public DispatchQueue queue() {
         return ;
     }
 
     public void start() throws Exception {
         start(null);
     }
 
     public void start(Runnable onCompletethrows Exception {
         .set(true);
         if (onComplete != null) {
             onComplete.run();
         }
     }
 
     public void stop() {
         stop(null);
     }
 
     public void stop(final Runnable onComplete) {
        if (.compareAndSet(truefalse)) {
            queue().execute(new Runnable() {
                public void run() {
                    final AtomicInteger latch = new AtomicInteger(.size());
                    final Runnable coutDown = new Runnable() {
                        public void run() {
                            if (latch.decrementAndGet() == 0) {
                                if (onComplete != null) {
                                    onComplete.run();
                                }
                            }
                        }
                    };
                    for (TransportPool pool : .values()) {
                        pool.stop(coutDown);
                    }
                }
            });
        } else {
            if (onComplete != null) {
                onComplete.run();
            }
        }
    }
    public InvocationHandler getProxy(String addressString serviceClassLoader classLoader) {
        return new ProxyInvocationHandler(addressserviceclassLoader);
    }
    protected void onCommand(Object data) {
        try {
            DataByteArrayInputStream bais = new DataByteArrayInputStream( (Bufferdata);
            int size = bais.readInt();
            long correlation = bais.readVarLong();
            ResponseFuture response = .remove(correlation);
            ifresponse!=null ) {
                response.set(bais);
            }
        } catch (Exception e) {
            .info("Error while reading response"e);
        }
    }
    static final WeakHashMap<MethodMethodDatamethod_cache = new WeakHashMap<MethodMethodData>();
    static class MethodData {
        private final SerializationStrategy serializationStrategy;
        final Buffer signature;
        final InvocationStrategy invocationStrategy;
        MethodData(InvocationStrategy invocationStrategySerializationStrategy serializationStrategyBuffer signature) {
            this. = invocationStrategy;
            this. = serializationStrategy;
            this. = signature;
        }
    }
    private MethodData getMethodData(Method methodthrows IOException {
        MethodData rc = null;
        synchronized () {
            rc = .get(method);
        }
        ifrc==null ) {
            StringBuilder sb = new StringBuilder();
            sb.append(method.getName());
            sb.append(",");
            Class<?>[] types = method.getParameterTypes();
            for(int i=0; i < types.lengthi++) {
                ifi!=0 ) {
                    sb.append(",");
                }
                sb.append(encodeClassName(types[i]));
            }
            Buffer signature = new UTF8Buffer(sb.toString()).buffer();
            Serialization annotation = method.getAnnotation(Serialization.class);
            SerializationStrategy serializationStrategy;
            ifannotation!=null ) {
                serializationStrategy = .get(annotation.value());
                ifserializationStrategy==null ) {
                    throw new RuntimeException("Could not find the serialization strategy named: "+annotation.value());
                }
            } else {
                serializationStrategy = .;
            }
            final InvocationStrategy strategy;
            if( AsyncInvocationStrategy.isAsyncMethod(method) ) {
                strategy = .;
            } else {
                strategy = .;
            }
            rc = new MethodData(strategyserializationStrategysignature);
            synchronized () {
                .put(methodrc);
            }
        }
        return rc;
    }
    String encodeClassName(Class<?> type) {
        iftype.getComponentType()!=null ) {
            return "["encodeClassName(type.getComponentType());
        }
        iftype.isPrimitive() ) {
            return .get(type);
        } else {
            return "L"+type.getName();
        }
    }
    protected Object request(ProxyInvocationHandler handlerfinal String addressfinal UTF8Buffer servicefinal ClassLoader classLoaderfinal Method methodfinal Object[] argsthrows Exception {
        final long correlation = .incrementAndGet();
        // Encode the request before we try to pass it onto
        // IO layers so that #1 we can report encoding error back to the caller
        // and #2 reduce CPU load done in the execution queue since it's
        // serially executed.
        DataByteArrayOutputStream baos = new DataByteArrayOutputStream((int) (handler.lastRequestSize*1.10));
        baos.writeInt(0); // we don't know the size yet...
        baos.writeVarLong(correlation);
        writeBuffer(baosservice);
        MethodData methodData = getMethodData(method);
        writeBuffer(baosmethodData.signature);
        final ResponseFuture future = methodData.invocationStrategy.request(methodData.serializationStrategyclassLoadermethodargsbaos);
        // toBuffer() is better than toByteArray() since it avoids an
        // array copy.
        final Buffer command = baos.toBuffer();
        // Update the field size.
        BufferEditor editor = command.buffer().bigEndianEditor();
        editor.writeInt(command.length);
        handler.lastRequestSize = command.length;
        queue().execute(new Runnable() {
            public void run() {
                try {
                    TransportPool pool = .get(address);
                    if (pool == null) {
                        pool = new InvokerTransportPool(addressqueue());
                        .put(addresspool);
                        pool.start();
                    }
                    .put(correlationfuture);
                    pool.offer(command);
                } catch (Exception e) {
                    .info("Error while sending request"e);
                }
            }
        });
        // TODO: make that configurable, that's only for tests
        return future.get(.);
    }
    private void writeBuffer(DataByteArrayOutputStream baosBuffer valuethrows IOException {
        baos.writeVarInt(value.length);
        baos.write(value);
    }
    protected class ProxyInvocationHandler implements InvocationHandler {
        final String address;
        final UTF8Buffer service;
        final ClassLoader classLoader;
        int lastRequestSize = 250;
        public ProxyInvocationHandler(String addressString serviceClassLoader classLoader) {
            this. = address;
            this. = new UTF8Buffer(service);
            this. = classLoader;
        }
        public Object invoke(Object proxyMethod methodObject[] argsthrows Throwable {
            return request(thismethodargs);
        }
    }
    protected class InvokerTransportPool extends TransportPool {
        public InvokerTransportPool(String uriDispatchQueue queue) {
            super(uriqueue. << 1);
        }
        @Override
        protected Transport createTransport(String urithrows Exception {
            return new TcpTransportFactory().connect(uri);
        }
        @Override
        protected ProtocolCodec createCodec() {
            return new LengthPrefixedCodec();
        }
        @Override
        protected void onCommand(Object command) {
            ClientInvokerImpl.this.onCommand(command);
        }
    }
New to GrepCode? Check out our FAQ X