Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Copyright (C) 2011, FuseSource Corp. All rights reserved. http://fusesource.com The software in this package is published under the terms of the CDDL license a copy of which has been included with this distribution in the license.txt file.
  
  package org.fusesource.fabric.dosgi.tcp;
 
 

Author(s):
Hiram Chirino
 
 public class AsyncInvocationStrategy implements InvocationStrategy {
 
     public static final AsyncInvocationStrategy INSTANCE = new AsyncInvocationStrategy();
 
     static public boolean isAsyncMethod(Method method) {
         Class<?>[] types = method.getParameterTypes();
         return types.length != 0 && types[types.length - 1] == AsyncCallback.class;
     }
 
 
     private class AsyncResponseFuture implements ResponseFuture {
 
         private final ClassLoader loader;
         private final Method method;
         private final AsyncCallback callback;
         private final SerializationStrategy serializationStrategy;
         private final DispatchQueue queue;
 
         public AsyncResponseFuture(ClassLoader loaderMethod methodAsyncCallback callbackSerializationStrategy serializationStrategyDispatchQueue queue) {
             this. = loader;
             this. = method;
             this. = callback;
             this. = serializationStrategy;
             this. = queue;
         }
 
         public void set(final DataByteArrayInputStream source) {
             if!=null ) {
                 .execute(new Runnable() {
                     public void run() {
                         decodeIt(source);
                     }
                 });
             } else {
                 decodeIt(source);
             }
         }
 
         private void decodeIt(DataByteArrayInputStream source) {
             try {
                 .decodeResponse(getResultType(), source);
             } catch (Throwable e) {
                 e.printStackTrace();
             }
         }
 
         public Object get(long timeoutTimeUnit unitthrows InterruptedExceptionExecutionExceptionTimeoutException {
             // TODO: we could store the timeout so we can time out the async request...
             return null;
         }
 
     }
 
     public ResponseFuture request(SerializationStrategy serializationStrategyClassLoader loaderMethod methodObject[] argsDataByteArrayOutputStream targetthrows Exception {
         if(!isAsyncMethod(method)) {
             throw new IllegalArgumentException("Invalid async method declaration: last argument is not a RequestCallback");
         }
 
         Class[] new_types = payloadTypes(method);
         Object[] new_args = new Object[args.length-1];
         System.arraycopy(args, 0, new_args, 0, new_args.length);
 
         serializationStrategy.encodeRequest(loadernew_typesnew_argstarget);
 
         return new AsyncResponseFuture(loadermethod, (AsyncCallbackargs[args.length-1], serializationStrategy, Dispatch.getCurrentQueue());
     }
    static private Class<?>[] payloadTypes(Method method) {
        Class<?>[] types = method.getParameterTypes();
        Class<?>[] new_types = new Class<?>[types.length-1];
        System.arraycopy(types, 0, new_types, 0, new_types.length);
        return new_types;
    }
    static private Class getResultType(Method method) {
        Type[] types = method.getGenericParameterTypes();
        ParameterizedType t = (ParameterizedTypetypes[types.length-1];
        return (Classt.getActualTypeArguments()[0];
    }
    class ServiceResponse {
        private final ClassLoader loader;
        private final Method method;
        private final DataByteArrayOutputStream responseStream;
        private final Runnable onComplete;
        private final SerializationStrategy serializationStrategy;
        private final int pos;
        // Used to protect against sending multiple responses.
        final AtomicBoolean responded = new AtomicBoolean(false);
        public ServiceResponse(ClassLoader loaderMethod methodDataByteArrayOutputStream responseStreamRunnable onCompleteSerializationStrategy serializationStrategy) {
            this. = loader;
            this. = method;
            this. = responseStream;
            this. = onComplete;
            this. = serializationStrategy;
             = responseStream.position();
        }
        public void send(Throwable errorObject value) {
            if.compareAndSet(falsetrue) ) {
                Class resultType = getResultType();
                try {
                    .encodeResponse(resultTypevalueerror);
                } catch (Exception e) {
                    // we failed to encode the response.. reposition and write that error.
                    try {
                        .position();
                        .encodeResponse(resultTypevaluenew RemoteException(e.toString()), );
                    } catch (Exception unexpected) {
                        unexpected.printStackTrace();
                    }
                } finally {
                    .run();
                }
            }
        }
    }
    public void service(SerializationStrategy serializationStrategyClassLoader loaderMethod methodObject targetDataByteArrayInputStream requestStreamfinal DataByteArrayOutputStream responseStreamfinal Runnable onComplete) {
        final ServiceResponse helper = new ServiceResponse(loadermethodresponseStreamonCompleteserializationStrategy);
        try {
            Object[] new_args = new Object[method.getParameterTypes().length];
            serializationStrategy.decodeRequest(loaderpayloadTypes(method), requestStreamnew_args);
            new_args[new_args.length-1] = new AsyncCallback<Object>() {
                public void onSuccess(Object result) {
                    helper.send(nullresult);
                }
                public void onFailure(Throwable failure) {
                    helper.send(failurenull);
                }
            };
            method.invoke(targetnew_args);
        } catch (Throwable t) {
            helper.send(tnull);
        }
    }
New to GrepCode? Check out our FAQ X