Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Copyright (C) FuseSource, Inc. http://fusesource.com Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 package org.fusesource.fabric.dosgi.tcp;
 
 

Author(s):
Hiram Chirino
 
 public class AsyncInvocationStrategy implements InvocationStrategy {
 
     public static final AsyncInvocationStrategy INSTANCE = new AsyncInvocationStrategy();
 
     static public boolean isAsyncMethod(Method method) {
         Class<?>[] types = method.getParameterTypes();
         return types.length != 0 && types[types.length - 1] == AsyncCallback.class;
     }
 
 
     private class AsyncResponseFuture implements ResponseFuture {
 
         private final ClassLoader loader;
         private final Method method;
         private final AsyncCallback callback;
         private final SerializationStrategy serializationStrategy;
         private final DispatchQueue queue;
 
         public AsyncResponseFuture(ClassLoader loaderMethod methodAsyncCallback callbackSerializationStrategy serializationStrategyDispatchQueue queue) {
             this. = loader;
             this. = method;
             this. = callback;
             this. = serializationStrategy;
             this. = queue;
         }
 
         public void set(final DataByteArrayInputStream source) {
             if!=null ) {
                 .execute(new Runnable() {
                     public void run() {
                         decodeIt(source);
                     }
                 });
             } else {
                 decodeIt(source);
             }
         }
 
         private void decodeIt(DataByteArrayInputStream source) {
             try {
                 .decodeResponse(getResultType(), source);
             } catch (Throwable e) {
                 e.printStackTrace();
             }
         }
 
         public Object get(long timeoutTimeUnit unitthrows InterruptedExceptionExecutionExceptionTimeoutException {
             // TODO: we could store the timeout so we can time out the async request...
             return null;
         }
 
         @Override
         public void fail(Throwable throwable) {
             .onFailure(throwable);
         }
     }
 
     public ResponseFuture request(SerializationStrategy serializationStrategyClassLoader loaderMethod methodObject[] argsDataByteArrayOutputStream targetthrows Exception {
         if(!isAsyncMethod(method)) {
            throw new IllegalArgumentException("Invalid async method declaration: last argument is not a RequestCallback");
        }
        Class[] new_types = payloadTypes(method);
        Object[] new_args = new Object[args.length-1];
        System.arraycopy(args, 0, new_args, 0, new_args.length);
        serializationStrategy.encodeRequest(loadernew_typesnew_argstarget);
        return new AsyncResponseFuture(loadermethod, (AsyncCallbackargs[args.length-1], serializationStrategy, Dispatch.getCurrentQueue());
    }
    static private Class<?>[] payloadTypes(Method method) {
        Class<?>[] types = method.getParameterTypes();
        Class<?>[] new_types = new Class<?>[types.length-1];
        System.arraycopy(types, 0, new_types, 0, new_types.length);
        return new_types;
    }
    static private Class getResultType(Method method) {
        Type[] types = method.getGenericParameterTypes();
        ParameterizedType t = (ParameterizedTypetypes[types.length-1];
        return (Classt.getActualTypeArguments()[0];
    }
    class ServiceResponse {
        private final ClassLoader loader;
        private final Method method;
        private final DataByteArrayOutputStream responseStream;
        private final Runnable onComplete;
        private final SerializationStrategy serializationStrategy;
        private final int pos;
        // Used to protect against sending multiple responses.
        final AtomicBoolean responded = new AtomicBoolean(false);
        public ServiceResponse(ClassLoader loaderMethod methodDataByteArrayOutputStream responseStreamRunnable onCompleteSerializationStrategy serializationStrategy) {
            this. = loader;
            this. = method;
            this. = responseStream;
            this. = onComplete;
            this. = serializationStrategy;
             = responseStream.position();
        }
        public void send(Throwable errorObject value) {
            if.compareAndSet(falsetrue) ) {
                Class resultType = getResultType();
                try {
                    .encodeResponse(resultTypevalueerror);
                } catch (Exception e) {
                    // we failed to encode the response.. reposition and write that error.
                    try {
                        .position();
                        .encodeResponse(resultTypevaluenew RemoteException(e.toString()), );
                    } catch (Exception unexpected) {
                        unexpected.printStackTrace();
                    }
                } finally {
                    .run();
                }
            }
        }
    }
    public void service(SerializationStrategy serializationStrategyClassLoader loaderMethod methodObject targetDataByteArrayInputStream requestStreamfinal DataByteArrayOutputStream responseStreamfinal Runnable onComplete) {
        final ServiceResponse helper = new ServiceResponse(loadermethodresponseStreamonCompleteserializationStrategy);
        try {
            Object[] new_args = new Object[method.getParameterTypes().length];
            serializationStrategy.decodeRequest(loaderpayloadTypes(method), requestStreamnew_args);
            new_args[new_args.length-1] = new AsyncCallback<Object>() {
                public void onSuccess(Object result) {
                    helper.send(nullresult);
                }
                public void onFailure(Throwable failure) {
                    helper.send(failurenull);
                }
            };
            method.invoke(targetnew_args);
        } catch (Throwable t) {
            helper.send(tnull);
        }
    }
New to GrepCode? Check out our FAQ X