Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
   /*
    * Licensed under the Apache License, Version 2.0 (the "License");
    * you may not use this file except in compliance with the License.
    * You may obtain a copy of the License at
    *
    * http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  package com.gh.bmd.jrt.core;
  
  
  import java.util.List;
  
  
  
  import static com.gh.bmd.jrt.util.TimeDuration.ZERO;
  import static com.gh.bmd.jrt.util.TimeDuration.fromUnit;

Default implementation of a invocation input channel.

Created by davide-maestroni on 11/06/15.

Parameters:
<INPUT> the input data type.
<OUTPUT> the output data type.
  
  class DefaultInvocationChannel<INPUT, OUTPUT> implements InvocationChannel<INPUT, OUTPUT> {
  
      private final ArrayList<OutputChannel<?>> mBoundChannels = new ArrayList<OutputChannel<?>>();
  
      private final DefaultExecution<INPUT, OUTPUT> mExecution;
  
      private final Check mHasInputs;
  
      private final NestedQueue<INPUT> mInputQueue;
  
      private final TimeDuration mInputTimeout;
  
      private final Logger mLogger;
  
      private final int mMaxInput;
  
      private final Object mMutex = new Object();
  
      private final DefaultResultChannel<OUTPUT> mResultChanel;
  
      private final Runner mRunner;
  
      private RoutineException mAbortException;
  
      private int mInputCount;
  
      private TimeDuration mInputDelay = ;
  
      private OrderType mInputOrder;
  
      private boolean mIsConsuming;
  
      private boolean mIsPendingExecution;
  
      private int mPendingExecutionCount;
  
      private InputChannelState mState;

    
Constructor.

Parameters:
configuration the invocation configuration.
manager the invocation manager.
runner the runner instance.
logger the logger instance.
 
             @Nonnull final InvocationManager<INPUT, OUTPUT> manager, @Nonnull final Runner runner,
             @Nonnull final Logger logger) {
 
          = logger.subContextLogger(this);
          = runner;
          = configuration.getInputOrderTypeOr(.);
          = configuration.getInputMaxSizeOr(.);
          = configuration.getInputTimeoutOr();
          = new NestedQueue<INPUT>() {
 
             @Override
             public void close() {
 
                 // prevents closing
             }
         };
         final int maxInputSize = ;
          = new Check() {
 
             public boolean isTrue() {
 
                 return ( <= maxInputSize);
             }
         };
          = new DefaultResultChannel<OUTPUT>(configurationnew AbortHandler() {
 
             public void onAbort(@Nullable final RoutineException reasonfinal long delay,
                     @Nonnull final TimeUnit timeUnit) {
 
                 final Execution execution;
 
                 synchronized () {
 
                     execution = .onHandlerAbort(reason);
                 }
 
                 if (execution != null) {
 
                     .run(executiondelaytimeUnit);
                 }
             }
         }, runnerlogger);
          = new DefaultExecution<INPUT, OUTPUT>(managernew DefaultInputIterator(),
                                                          logger);
          = new InputChannelState();
     }
 
     public boolean abort() {
 
         return abort(null);
     }
 
     public boolean abort(@Nullable final Throwable reason) {
 
         final TimeDuration delay;
         final Execution execution;
 
         synchronized () {
 
             delay = ;
             execution = .abortInvocation(reason);
         }
 
         if (execution != null) {
 
             .run(executiondelay.timedelay.unit);
             return true;
         }
 
         return false;
     }
 
     public boolean isOpen() {
 
         synchronized () {
 
             return .isChannelOpen();
         }
     }
 
     @Nonnull
     public InvocationChannel<INPUT, OUTPUT> after(@Nonnull final TimeDuration delay) {
 
         synchronized () {
 
             .after(delay);
         }
 
         return this;
     }
 
     @Nonnull
     public InvocationChannel<INPUT, OUTPUT> after(final long delay,
             @Nonnull final TimeUnit timeUnit) {
 
         return after(fromUnit(delaytimeUnit));
     }
 
     @Nonnull
     public InvocationChannel<INPUT, OUTPUT> now() {
 
         return after();
     }
 
     @Nonnull
     public InvocationChannel<INPUT, OUTPUT> orderByCall() {
 
         synchronized () {
 
             .orderBy(.);
         }
 
         return this;
     }
 
     @Nonnull
     public InvocationChannel<INPUT, OUTPUT> orderByChance() {
 
         synchronized () {
 
             .orderBy(.);
         }
 
         return this;
     }
 
     @Nonnull
     public InvocationChannel<INPUT, OUTPUT> orderByDelay() {
 
         synchronized () {
 
             .orderBy(.);
         }
 
         return this;
     }
 
     @Nonnull
     public InvocationChannel<INPUT, OUTPUT> pass(
             @Nullable final OutputChannel<? extends INPUT> channel) {
 
         final OutputConsumer<INPUT> consumer;
 
         synchronized () {
 
             consumer = .pass(channel);
         }
 
         if ((consumer != null) && (channel != null)) {
 
             channel.passTo(consumer);
         }
 
         return this;
     }
 
     @Nonnull
     public InvocationChannel<INPUT, OUTPUT> pass(@Nullable final Iterable<? extends INPUT> inputs) {
 
         final Execution execution;
         final TimeDuration delay;
 
         synchronized () {
 
             delay = ;
             execution = .pass(inputs);
         }
 
         if (execution != null) {
 
             .run(executiondelay.timedelay.unit);
         }
 
         return this;
     }
 
     @Nonnull
     public InvocationChannel<INPUT, OUTPUT> pass(@Nullable final INPUT input) {
 
         final TimeDuration delay;
         final Execution execution;
 
         synchronized () {
 
             delay = ;
             execution = .pass(input);
         }
 
         if (execution != null) {
 
             .run(executiondelay.timedelay.unit);
         }
 
         return this;
     }
 
     @Nonnull
     public InvocationChannel<INPUT, OUTPUT> pass(@Nullable final INPUT... inputs) {
 
         final TimeDuration delay;
         final Execution execution;
 
         synchronized () {
 
             delay = ;
             execution = .pass(inputs);
         }
 
         if (execution != null) {
 
             .run(executiondelay.timedelay.unit);
         }
 
         return this;
     }
 
     @Nonnull
     public OutputChannel<OUTPUT> result() {
 
         final TimeDuration delay;
         final Execution execution;
         final OutputChannel<OUTPUT> result;
 
         synchronized () {
 
             final InputChannelState state = ;
             delay = ;
             execution = state.onResult();
             result = state.getOutputChannel();
         }
 
         if (execution != null) {
 
             .run(executiondelay.timedelay.unit);
         }
 
         return result;
     }
 
     private void waitInputs(final int count) {
 
         if (.isZero()) {
 
              -= count;
             throw new InputTimeoutException("timeout while waiting for room in the input channel");
         }
 
         if (.isExecutionThread()) {
 
              -= count;
             throw new InputDeadlockException("cannot wait on the invocation runner thread");
         }
 
         try {
 
             if (!.waitTrue()) {
 
                  -= count;
                 throw new InputTimeoutException(
                         "timeout while waiting for room in the input channel");
             }
 
         } catch (final InterruptedException e) {
 
              -= count;
             throw new InvocationInterruptedException(e);
         }
     }

    
Interface defining an object managing the creation and the recycling of invocation instances.

Parameters:
<INPUT> the input data type.
<OUTPUT> the output data type.
 
     interface InvocationManager<INPUT, OUTPUT> {

        
Creates a new invocation instance.

Parameters:
observer the invocation observer.
 
         void create(@Nonnull InvocationObserver<INPUT, OUTPUT> observer);

        
Discards the specified invocation.

Parameters:
invocation the invocation instance.
 
         void discard(@Nonnull Invocation<INPUT, OUTPUT> invocation);

        
Recycles the specified invocation.

Parameters:
invocation the invocation instance.
 
         void recycle(@Nonnull Invocation<INPUT, OUTPUT> invocation);
     }

    
Interface defining an observer of invocation instances.

Parameters:
<INPUT> the input data type.
<OUTPUT> the output data type.
 
     interface InvocationObserver<INPUT, OUTPUT> {

        
Called when a new invocation instances is available.

Parameters:
invocation the invocation.
 
         void onCreate(@Nonnull Invocation<INPUT, OUTPUT> invocation);

        
Called when an error occurs during the invocation instantiation.

Parameters:
error the error.
 
         void onError(@Nonnull Throwable error);
     }

    
Implementation of an execution handling the abortion of the result channel.
 
     private class AbortResultExecution extends TemplateExecution {
 
         private final Throwable mAbortException;

        
Constructor.

Parameters:
reason the reason of the abortion.
 
         private AbortResultExecution(@Nullable final Throwable reason) {
 
              = reason;
         }
 
         public void run() {
 
             .close();
         }
     }

    
The invocation has been explicitly aborted.
 
     private class AbortedChannelState extends ExceptionChannelState {
 
         @Nonnull
         @Override
         OutputChannel<OUTPUT> getOutputChannel() {
 
             throw super.exception();
         }
     }

    
Default implementation of an input iterator.
 
     private class DefaultInputIterator implements InputIterator<INPUT> {
 
         @Nullable
         public RoutineException getAbortException() {
 
             synchronized () {
 
                 return .getAbortException();
             }
         }
 
         public boolean hasInput() {
 
             synchronized () {
 
                 return .hasInput();
             }
         }
 
         public boolean isAborting() {
 
             synchronized () {
 
                 return .isAborting();
             }
         }
 
         @Nullable
         public INPUT nextInput() {
 
             synchronized () {
 
                 return .nextInput();
             }
         }
 
         public void onAbortComplete() {
 
             final Throwable abortException;
             final List<OutputChannel<?>> channels;
 
             synchronized () {
 
                 abortException = ;
                 .dbg(abortException"aborting bound channels [%d]".size());
                 channels = new ArrayList<OutputChannel<?>>();
                 .clear();
             }
 
             for (final OutputChannel<?> channel : channels) {
 
                 channel.abort(abortException);
             }
         }
 
         public boolean onConsumeComplete() {
 
             synchronized () {
 
                 return .onConsumeComplete();
             }
         }
 
         public void onConsumeStart() {
 
             synchronized () {
 
                 .onConsumeStart();
             }
         }
 
         public void onInvocationComplete() {
 
             synchronized () {
 
                 .onInvocationComplete();
             }
         }
     }

    
Default implementation of an output consumer pushing the data to consume into the input channel queue.
 
     private class DefaultOutputConsumer implements OutputConsumer<INPUT> {
 
         private final TimeDuration mDelay;
 
         private final OrderType mOrderType;
 
         private final NestedQueue<INPUT> mQueue;

        
Constructor.
 
         private DefaultOutputConsumer() {
 
             final TimeDuration delay = ( = );
             final OrderType order = ( = );
              = ((order == .) || ((order == .)
                     && delay.isZero())) ? .addNested() : ;
         }
 
         public void onComplete() {
 
             final Execution execution;
 
             synchronized () {
 
                 execution = .onConsumerComplete();
             }
 
             if (execution != null) {
 
                 .run(execution, 0, .);
             }
         }
 
         public void onError(@Nullable final RoutineException error) {
 
             final Execution execution;
 
             synchronized () {
 
                 execution = .onConsumerError(error);
             }
 
             if (execution != null) {
 
                 final TimeDuration delay = ;
                 .run(executiondelay.timedelay.unit);
             }
         }
 
         public void onOutput(final INPUT output) {
 
             final Execution execution;
             final TimeDuration delay = ;
 
             synchronized () {
 
                 execution = .onConsumerOutput(outputdelay);
             }
 
             if (execution != null) {
 
                 .run(executiondelay.timedelay.unit);
             }
         }
     }

    
Implementation of an execution handling a delayed abortion.
 
     private class DelayedAbortExecution extends TemplateExecution {
 
         private final RoutineException mAbortException;

        
Constructor.

Parameters:
reason the reason of the abortion.
 
         private DelayedAbortExecution(@Nullable final RoutineException reason) {
 
              = reason;
         }
 
         public void run() {
 
             final Execution execution;
 
             synchronized () {
 
                 execution = .delayedAbortInvocation();
             }
 
             if (execution != null) {
 
                 execution.run();
             }
         }
     }

    
Implementation of an execution handling a delayed input.
 
     private class DelayedInputExecution extends TemplateExecution {
 
         private final INPUT mInput;
 
         private final NestedQueue<INPUT> mQueue;

        
Constructor.

Parameters:
queue the input queue.
input the input.
 
         private DelayedInputExecution(@Nonnull final NestedQueue<INPUT> queue,
                 @Nullable final INPUT input) {
 
              = queue;
              = input;
         }
 
         public void run() {
 
             final Execution execution;
 
             synchronized () {
 
                 execution = .delayedInput();
             }
 
             if (execution != null) {
 
                 execution.run();
             }
         }
     }

    
Implementation of an execution handling a delayed input of a list of data.
 
     private class DelayedListInputExecution extends TemplateExecution {
 
         private final ArrayList<INPUT> mInputs;
 
         private final NestedQueue<INPUT> mQueue;

        
Constructor.

Parameters:
queue the input queue.
inputs the list of input data.
 
         private DelayedListInputExecution(@Nonnull final NestedQueue<INPUT> queue,
                 final ArrayList<INPUT> inputs) {
 
              = inputs;
              = queue;
         }
 
         public void run() {
 
             final Execution execution;
 
             synchronized () {
 
                 execution = .delayedInputs();
             }
 
             if (execution != null) {
 
                 execution.run();
             }
         }
     }

    
Exception thrown during invocation execution.
 
     private class ExceptionChannelState extends ResultChannelState {
 
         private final Logger mSubLogger = .subContextLogger(this);
 
         @Override
         void after(@Nonnull final TimeDuration delay) {
 
             throw exception();
         }
 
         @Nullable
         @Override
         Execution onConsumerOutput(final INPUT input, @Nonnull final NestedQueue<INPUT> queue,
                 @Nonnull final TimeDuration delay, @Nonnull final OrderType orderType) {
 
             throw consumerException();
         }
 
         @Nullable
         @Override
         Execution onHandlerAbort(@Nullable final RoutineException reason) {
 
             .wrn("avoiding aborting result channel since invocation is aborted");
             return null;
         }
 
         @Nullable
         @Override
         Execution onConsumerComplete(@Nonnull final NestedQueue<INPUT> queue) {
 
             throw consumerException();
         }
 
         @Nonnull
         private RoutineException consumerException() {
 
             final RoutineException abortException = ;
             .dbg(abortException"consumer abort exception");
             return abortException;
         }
 
         @Nonnull
         private RoutineException exception() {
 
             final RoutineException abortException = ;
             .dbg(abortException"abort exception");
             throw abortException;
         }
 
         @Override
         void orderBy(@Nonnull final OrderType orderType) {
 
             throw exception();
         }
 
         @Override
         public boolean isAborting() {
 
             return true;
         }
 
         @Override
         public void onInvocationComplete() {
 
         }
 
         @Nullable
         @Override
         OutputConsumer<INPUT> pass(@Nullable final OutputChannel<? extends INPUT> channel) {
 
             throw exception();
         }
 
         @Nullable
         @Override
         Execution pass(@Nullable final Iterable<? extends INPUT> inputs) {
 
             throw exception();
         }
 
         @Nullable
         @Override
         Execution pass(@Nullable final INPUT input) {
 
             throw exception();
         }
 
         @Nullable
         @Override
         Execution pass(@Nullable final INPUT... inputs) {
 
             throw exception();
         }
 
         @Nonnull
         @Override
         OutputChannel<OUTPUT> getOutputChannel() {
 
              = new AbortedChannelState();
             final OutputChannel<OUTPUT> outputChannel = .getOutput();
             outputChannel.abort();
             return outputChannel;
         }
     }

    
Invocation channel internal state (using "state" design pattern).
 
     private class InputChannelState implements InputIterator<INPUT> {
 
         private final Logger mSubLogger = .subContextLogger(this);

        
Called when the invocation is aborted.

Parameters:
reason the reason of the abortion.
Returns:
the execution to run or null.
 
         @Nullable
         Execution abortInvocation(@Nullable final Throwable reason) {
 
             final RoutineException abortException = AbortException.wrapIfNeeded(reason);
 
             if (.isZero()) {
 
                 .dbg(reason"aborting channel");
                  = abortException;
                  = new AbortedChannelState();
                 .cancel();
                 return .abort();
             }
 
             return new DelayedAbortExecution(abortException);
         }

        
Called to set the specified input delay.

Parameters:
delay the delay.
 
         @SuppressWarnings("ConstantConditions")
         void after(@Nonnull final TimeDuration delay) {
 
             if (delay == null) {
 
                 .err("invalid null delay");
                 throw new NullPointerException("the input delay must not be null");
             }
 
              = delay;
         }

        
Called when the invocation is aborted after a delay.

Parameters:
reason the reason of the abortion.
Returns:
the execution to run or null.
 
         @Nullable
         Execution delayedAbortInvocation(@Nullable final RoutineException reason) {
 
             .dbg(reason"aborting channel");
              = reason;
              = new AbortedChannelState();
             .cancel();
             return .abort();
         }

        
Called when an input is passed to the invocation after a delay.

Parameters:
queue the input queue.
input the input.
Returns:
the execution to run or null.
 
         @Nullable
         Execution delayedInput(@Nonnull final NestedQueue<INPUT> queue,
                 @Nullable final INPUT input) {
 
             .dbg("delayed input execution: %s"input);
             queue.add(input);
             queue.close();
             return ;
         }

        
Called when some inputs are passed to the invocation after a delay.

Parameters:
queue the input queue.
inputs the inputs.
Returns:
the execution to run or null.
 
         @Nullable
         Execution delayedInputs(@Nonnull final NestedQueue<INPUT> queuefinal List<INPUT> inputs) {
 
             .dbg("delayed input execution: %s"inputs);
             queue.addAll(inputs);
             queue.close();
             return ;
         }

        
Called to get the output channel.

Returns:
the output channel.
 
         @Nonnull
         OutputChannel<OUTPUT> getOutputChannel() {
 
              = new OutputChannelState();
             return .getOutput();
         }

        
Called to know if the channel is open.

Returns:
whether the channel is open.
 
         boolean isChannelOpen() {
 
             return true;
         }

        
Called when the feeding consumer completes.

Parameters:
queue the input queue.
Returns:
the execution to run or null.
 
         @Nullable
         Execution onConsumerComplete(@Nonnull final NestedQueue<INPUT> queue) {
 
             .dbg("closing consumer");
             queue.close();
 
             if (! && !) {
 
                  = true;
                 return ;
 
             } else {
 
                 --;
             }
 
             return null;
         }

        
Called when the feeding consumer receives an error.

Parameters:
error the error.
Returns:
the execution to run or null.
 
         @Nullable
         Execution onConsumerError(@Nullable final RoutineException error) {
 
             .dbg("aborting consumer");
              = error;
              = new ExceptionChannelState();
             .cancel();
             return .abort();
         }

        
Called when the feeding consumer receives an output.

Parameters:
input the input.
queue the input queue.
delay the input delay.
orderType the input order type.
Returns:
the execution to run or null.
        @Nullable
        Execution onConsumerOutput(final INPUT input, @Nonnull final NestedQueue<INPUT> queue,
                @Nonnull final TimeDuration delay, @Nonnull final OrderType orderType) {
            .dbg("consumer input [#%d+1]: %s [%s]"inputdelay);
            ++;
            if (!.isTrue()) {
                waitInputs(1);
                if ( != this) {
                    --;
                    return .onConsumerOutput(inputqueuedelayorderType);
                }
            }
            if (delay.isZero()) {
                queue.add(input);
                if (!) {
                    ++;
                     = true;
                    return ;
                }
                return null;
            }
            ++;
            return new DelayedInputExecution(
                    (orderType != .) ? queue.addNested() : queueinput);
        }

        
Called when the invocation is aborted through the registered handler.

Parameters:
reason the reason of the abortion.
Returns:
the execution to run or null.
        @Nullable
        Execution onHandlerAbort(@Nullable final RoutineException reason) {
            .dbg("aborting result channel");
             = reason;
             = new ExceptionChannelState();
            .cancel();
            return .abort();
        }

        
Called when the inputs are complete.

Returns:
the execution to run or null.
        @Nullable
        Execution onResult() {
            .dbg("closing input channel");
            if (! && !) {
                ++;
                 = true;
                return ;
            }
            return null;
        }

        
Called to set the input delivery order.

Parameters:
orderType the input order type.
        void orderBy(@Nonnull final OrderType orderType) {
             = orderType;
        }

        
Called when an output channel is passed to the invocation.

Parameters:
channel the channel instance.
Returns:
the output consumer to bind or null.
        @Nullable
        OutputConsumer<INPUT> pass(@Nullable final OutputChannel<? extends INPUT> channel) {
            if (channel == null) {
                .wrn("passing null channel");
                return null;
            }
            .add(channel);
            ++;
            .dbg("passing channel: %s"channel);
            return new DefaultOutputConsumer();
        }

        
Called when some inputs are passed to the invocation.

Parameters:
inputs the inputs.
Returns:
the execution to run or null.
        @Nullable
        Execution pass(@Nullable final Iterable<? extends INPUT> inputs) {
            if (inputs == null) {
                .wrn("passing null iterable");
                return null;
            }
            final ArrayList<INPUT> list = new ArrayList<INPUT>();
            for (final INPUT input : inputs) {
                list.add(input);
            }
            final int size = list.size();
            if (size > ) {
                throw new InputTimeoutException(
                        "inputs exceed maximum channel size [" + size + "/" +  + "]");
            }
            final TimeDuration delay = ;
            .dbg("passing iterable [#%d+%d]: %s [%s]"sizeinputsdelay);
             += size;
            if (!.isTrue()) {
                waitInputs(size);
                if ( != this) {
                     -= size;
                    return .pass(inputs);
                }
            }
            if (delay.isZero()) {
                .addAll(list);
                if (!) {
                    ++;
                     = true;
                    return ;
                }
                return null;
            }
            ++;
            return new DelayedListInputExecution(
                    ( != .) ? .addNested() : ,
                    list);
        }

        
Called when an input is passed to the invocation.

Parameters:
input the input.
Returns:
the execution to run or null.
        @Nullable
        Execution pass(@Nullable final INPUT input) {
            final TimeDuration delay = ;
            .dbg("passing input [#%d+1]: %s [%s]"inputdelay);
            ++;
            if (!.isTrue()) {
                waitInputs(1);
                if ( != this) {
                    --;
                    return .pass(input);
                }
            }
            if (delay.isZero()) {
                .add(input);
                if (!) {
                    ++;
                     = true;
                    return ;
                }
                return null;
            }
            ++;
            return new DelayedInputExecution(
                    ( != .) ? .addNested() : ,
                    input);
        }

        
Called when some inputs are passed to the invocation.

Parameters:
inputs the inputs.
Returns:
the execution to run or null.
        @Nullable
        Execution pass(@Nullable final INPUT... inputs) {
            if (inputs == null) {
                .wrn("passing null input array");
                return null;
            }
            final int size = inputs.length;
            if (size > ) {
                throw new InputTimeoutException(
                        "inputs exceed maximum channel size [" + size + "/" +  + "]");
            }
            final TimeDuration delay = ;
            .dbg("passing array [#%d+%d]: %s [%s]"sizeinputsdelay);
             += size;
            if (!.isTrue()) {
                waitInputs(size);
                if ( != this) {
                     -= size;
                    return .pass(inputs);
                }
            }
            final ArrayList<INPUT> list = new ArrayList<INPUT>(size);
            Collections.addAll(listinputs);
            if (delay.isZero()) {
                .addAll(list);
                if (!) {
                    ++;
                     = true;
                    return ;
                }
                return null;
            }