Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
   /*
    * Licensed under the Apache License, Version 2.0 (the "License");
    * you may not use this file except in compliance with the License.
    * You may obtain a copy of the License at
    *
    * http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  package com.gh.bmd.jrt.routine;
  
  
  import java.util.Arrays;
  import java.util.List;
  
  
  
  import static com.gh.bmd.jrt.time.TimeDuration.INFINITY;
  import static com.gh.bmd.jrt.time.TimeDuration.ZERO;
  import static com.gh.bmd.jrt.time.TimeDuration.fromUnit;

Class handling the routine output.

This class centralizes the managing of data passing through the routine output and result channels, since, logically, the two objects are part of the same entity. In fact, on one end the result channel puts data into the output queue and, on the other end, the output channel reads them from the same queue.

Created by davide on 9/24/14.

Parameters:
<OUTPUT> the output data type.
  
  class DefaultResultChannel<OUTPUT> implements ResultChannel<OUTPUT> {
  
      private static final WeakIdentityHashMap<OutputConsumer<?>, ObjectsMutexMap =
              new WeakIdentityHashMap<OutputConsumer<?>, Object>();
  
      private final ArrayList<OutputChannel<?>> mBoundChannels = new ArrayList<OutputChannel<?>>();
  
      private final Object mFlushMutex = new Object();
  
      private final AbortHandler mHandler;
  
      private final Check mHasOutputs;
  
      private final Logger mLogger;
  
      private final int mMaxOutput;
  
      private final Object mMutex = new Object();
  
      private final TimeDuration mOutputTimeout;
  
      private final TimeDuration mReadTimeout;
  
      private final Runner mRunner;
  
      private final TimeoutAction mTimeoutAction;
  
      private Throwable mAbortException;
  
      private Object mConsumerMutex;
  
      private boolean mIsException;
  
      private OutputConsumer<? super OUTPUT> mOutputConsumer;
  
      private int mOutputCount;
  
      private Check mOutputHasNext;
 
     private Check mOutputNotEmpty;
 
     private NestedQueue<ObjectmOutputQueue;
 
     private int mPendingOutputCount;
 
     private TimeDuration mResultDelay = ;
 
     private ChannelState mState = .;

    
Constructor.

Parameters:
configuration the routine configuration.
handler the abort handler.
runner the runner instance.
logger the logger instance.
Throws:
java.lang.NullPointerException if one of the parameters is null.
 
     @SuppressWarnings("ConstantConditions")
     DefaultResultChannel(@Nonnull final RoutineConfiguration configuration,
             @Nonnull final AbortHandler handler, @Nonnull final Runner runner,
             @Nonnull final Logger logger) {
 
         if (handler == null) {
 
             throw new NullPointerException("the abort handler must not be null");
         }
 
         if (runner == null) {
 
             throw new NullPointerException("the runner instance must not be null");
         }
 
          = logger.subContextLogger(this);
          = handler;
          = runner;
          = configuration.getReadTimeoutOr();
          = configuration.getReadTimeoutActionOr(.);
          = configuration.getOutputSizeOr(.);
          = configuration.getOutputTimeoutOr();
          = (configuration.getOutputOrderOr(.) == .)
                 ? new SimpleNestedQueue<Object>() : new OrderedNestedQueue<Object>();
 
         final int maxOutputSize = ;
          = new Check() {
 
             public boolean isTrue() {
 
                 return ( <= maxOutputSize);
             }
         };
     }
 
     @Nonnull
     private static Object getMutex(@Nonnull final OutputConsumer<?> consumer) {
 
         synchronized () {
 
             final WeakIdentityHashMap<OutputConsumer<?>, ObjectmutexMap = ;
             Object mutex = mutexMap.get(consumer);
 
             if (mutex == null) {
 
                 mutex = new Object();
                 mutexMap.put(consumermutex);
             }
 
             return mutex;
         }
     }
 
     public boolean abort() {
 
         return abort(null);
     }
 
     @Nonnull
     @SuppressWarnings("ConstantConditions")
     public ResultChannel<OUTPUT> after(@Nonnull final TimeDuration delay) {
 
         synchronized () {
 
             verifyOutput();
 
             if (delay == null) {
 
                 .err("invalid null delay");
                 throw new NullPointerException("the input delay must not be null");
             }
 
              = delay;
         }
 
         return this;
     }
 
     @Nonnull
     public ResultChannel<OUTPUT> after(final long delay, @Nonnull final TimeUnit timeUnit) {
 
         return after(fromUnit(delaytimeUnit));
     }
 
     @Nonnull
     public ResultChannel<OUTPUT> now() {
 
         return after();
     }
 
     @Nonnull
     public ResultChannel<OUTPUT> pass(@Nullable final OutputChannel<? extends OUTPUT> channel) {
 
         final TimeDuration delay;
         final DefaultOutputConsumer consumer;
 
         synchronized () {
 
             verifyOutput();
 
             if (channel == null) {
 
                 .wrn("passing null channel");
                 return this;
             }
 
             .add(channel);
             delay = ;
             ++;
             .dbg("passing channel: %s"channel);
 
             consumer = new DefaultOutputConsumer(delay);
         }
 
         channel.bind(consumer);
 
         return this;
     }
 
     @Nonnull
     public ResultChannel<OUTPUT> pass(@Nullable final Iterable<? extends OUTPUT> outputs) {
 
         NestedQueue<ObjectoutputQueue;
         ArrayList<OUTPUT> list = null;
         final TimeDuration delay;
 
         synchronized () {
 
             verifyOutput();
 
             if (outputs == null) {
 
                 .wrn("passing null iterable");
                 return this;
             }
 
             outputQueue = ;
             delay = ;
 
             int count = 0;
 
             if (delay.isZero()) {
 
                 for (final OUTPUT output : outputs) {
 
                     outputQueue.add(output);
                     ++count;
                 }
 
             } else {
 
                 outputQueue = outputQueue.addNested();
                 list = new ArrayList<OUTPUT>();
 
                 for (final OUTPUT output : outputs) {
 
                     list.add(output);
                 }
 
                 count = list.size();
                 ++;
             }
 
             .dbg("passing iterable [#%d+%d]: %s [%s]"countoutputsdelay);
             addOutputs(count);
         }
 
         if (delay.isZero()) {
 
             flushOutput(false);
 
         } else {
 
             .run(new DelayedListOutputExecution(outputQueuelist), delay.timedelay.unit);
         }
 
         return this;
     }
 
     @Nonnull
     public ResultChannel<OUTPUT> pass(@Nullable final OUTPUT output) {
 
         NestedQueue<ObjectoutputQueue;
         final TimeDuration delay;
 
         synchronized () {
 
             verifyOutput();
 
             outputQueue = ;
             delay = ;
 
             if (delay.isZero()) {
 
                 outputQueue.add(output);
 
             } else {
 
                 outputQueue = outputQueue.addNested();
                 ++;
             }
 
             .dbg("passing output [#%d+1]: %s [%s]"outputdelay);
             addOutputs(1);
         }
 
         if (delay.isZero()) {
 
             flushOutput(false);
 
         } else {
 
             .run(new DelayedOutputExecution(outputQueueoutput), delay.timedelay.unit);
         }
 
         return this;
     }
 
     @Nonnull
     public ResultChannel<OUTPUT> pass(@Nullable final OUTPUT... outputs) {
 
         synchronized () {
 
             verifyOutput();
 
             if (outputs == null) {
 
                 .wrn("passing null output array");
                 return this;
             }
         }
 
         return pass(Arrays.asList(outputs));
     }

    
Aborts immediately the execution.

Parameters:
throwable the reason of the abortion.
See also:
com.gh.bmd.jrt.channel.Channel.abort(java.lang.Throwable)
 
     void abortImmediately(@Nullable final Throwable throwable) {
 
         abort(throwabletrue);
     }

    
Closes this channel with the specified exception.

Parameters:
throwable the exception.
 
     void close(@Nullable final Throwable throwable) {
 
         final ArrayList<OutputChannel<?>> channels;
 
         synchronized () {
 
             .dbg(throwable"aborting result channel");
 
             channels = new ArrayList<OutputChannel<?>>();
             .clear();
             .add(RoutineExceptionWrapper.wrap(throwable));
              = true;
 
             if ( == null) {
 
                  = throwable;
             }
 
              = .;
             .notifyAll();
         }
 
         for (final OutputChannel<?> channel : channels) {
 
             channel.abort(throwable);
         }
 
         flushOutput(false);
     }

    
Closes this channel successfully.
 
     void close() {
 
         boolean isFlush = false;
 
         synchronized () {
 
             .dbg("closing result channel [#%d]");
 
             if ( == .) {
 
                 isFlush = true;
 
                 if ( > 0) {
 
                      = .;
 
                 } else {
 
                      = .;
                 }
 
             } else {
 
                 .dbg("avoiding closing result channel since already closed");
             }
         }
 
         if (isFlush) {
 
             flushOutput(false);
         }
     }

    
Returns the output channel reading the data pushed into this channel.

Returns:
the output channel.
 
     @Nonnull
     OutputChannel<OUTPUT> getOutput() {
 
         final TimeoutAction action = ;
         final OutputChannel<OUTPUT> outputChannel =
                 new DefaultOutputChannel().afterMax();
 
         if (action == .) {
 
             outputChannel.eventuallyExit();
 
         } else if (action == .) {
 
             outputChannel.eventuallyAbort();
         }
 
         return outputChannel;
     }
 
     private boolean abort(@Nullable final Throwable throwablefinal boolean isImmediate) {
 
         final TimeDuration delay;
 
         synchronized () {
 
             if (isResultComplete()) {
 
                 .dbg(throwable"avoiding aborting since channel is closed");
                 return false;
             }
 
             delay = (isImmediate) ?  : ;
 
             if (delay.isZero()) {
 
                 .dbg(throwable"aborting channel");
                 .clear();
                  = true;
                  = (isImmediate || (throwable instanceof AbortException)) ? throwable
                         : new AbortException(throwable);
                  = .;
             }
         }
 
         if (delay.isZero()) {
 
             .onAbort(throwable, 0, .);
 
         } else {
 
             .run(new DelayedAbortExecution(throwable), delay.timedelay.unit);
         }
 
         return true;
     }
 
     private void addOutputs(final int count) {
 
          += count;
 
         try {
 
             if (!.waitTrue()) {
 
                 throw new OutputDeadlockException(
                         "deadlock while waiting for room in the output channel");
             }
 
         } catch (final InterruptedException e) {
 
             throw new InvocationInterruptedException(e);
         }
     }
 
     @SuppressFBWarnings(value = "UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR",
             justification = "cannot be called if output consumer is null")
     private void closeConsumer(final ChannelState state) {
 
         if (state != .) {
 
             final Logger logger = ;
             final OutputConsumer<? super OUTPUT> consumer = ;
 
             try {
 
                 logger.dbg("closing consumer (%s)"consumer);
                 consumer.onComplete();
 
             } catch (final InvocationInterruptedException e) {
 
                 throw e;
 
             } catch (final Throwable t) {
 
                 logger.wrn(t"ignoring consumer exception (%s)"consumer);
             }
         }
 
         synchronized () {
 
             if (!isOutputPending()) {
 
                  = .;
                 .notifyAll();
             }
         }
     }
 
     @SuppressWarnings({"SynchronizeOnNonFinalField""unchecked"})
     private void flushOutput(final boolean forceClose) {
 
         Throwable abortException = null;
 
         synchronized () {
 
             final Logger logger = ;
             final ArrayList<Objectoutputs;
             final OutputConsumer<? super OUTPUT> consumer;
             final ChannelState state;
 
             synchronized () {
 
                 consumer = ;
 
                 if (consumer == null) {
 
                     logger.dbg("avoiding flushing output since channel is not bound");
 
                     if (!isOutputPending()) {
 
                          = .;
                     }
 
                     .notifyAll();
                     return;
                 }
 
                 outputs = new ArrayList<Object>();
                 .moveTo(outputs);
                 state = ;
                  = 0;
                 .notifyAll();
             }
 
             synchronized () {
 
                 try {
 
                     for (final Object output : outputs) {
 
                         if (output instanceof RoutineExceptionWrapper) {
 
                             try {
 
                                 logger.dbg("aborting consumer (%s): %s"consumeroutput);
                                 consumer.onError(((RoutineExceptionWrapperoutput).getCause());
 
                             } catch (final InvocationInterruptedException e) {
 
                                 throw e;
 
                             } catch (final Throwable t) {
 
                                 logger.wrn(t"ignoring consumer exception (%s)"consumer);
                             }
 
                             break;
 
                         } else {
 
                             logger.dbg("output consumer (%s): %s"consumeroutput);
                             consumer.onOutput((OUTPUT) output);
                         }
                     }
 
                     if (forceClose || !isOutputPending(state)) {
 
                         closeConsumer(state);
                     }
 
                 } catch (final InvocationInterruptedException e) {
 
                     throw e;
 
                 } catch (final Throwable t) {
 
                     boolean isClose = false;
                     final ChannelState finalState;
 
                     synchronized () {
 
                         logger.wrn(t"consumer exception (%s)");
                         finalState = ;
 
                         if (forceClose || !isOutputPending(finalState)) {
 
                             isClose = true;
 
                         } else if (finalState != .) {
 
                             logger.wrn(t"aborting on consumer exception (%s)");
                             abortException = t;
 
                             .clear();
                              = true;
                              = t;
                              = .;
                         }
                     }
 
                     if (isClose) {
 
                         closeConsumer(finalState);
                     }
                 }
             }
         }
 
         if (abortException != null) {
 
             .onAbort(abortException, 0, .);
         }
     }
 
     private boolean isOutputOpen() {
 
         synchronized () {
 
             return !.isEmpty() || ( != .);
         }
     }
 
     private boolean isOutputPending(@Nonnull final ChannelState state) {
 
         return (state != .) && (state != .);
     }
 
     private boolean isResultComplete() {
 
         return (.ordinal() >= ..ordinal());
     }
 
     private boolean isResultOpen() {
 
         synchronized () {
 
             return ( == .);
         }
     }
 
     @Nullable
     @SuppressWarnings("unchecked")
     private OUTPUT nextOutput(@Nonnull final TimeDuration timeout) {
 
         final Object result = .removeFirst();
         .dbg("reading output [#%d]: %s [%s]"resulttimeout);
 
         RoutineExceptionWrapper.raise(result);
 
         final int maxOutput = ;
         final int prevOutputCount = ;
 
         if ((-- < maxOutput) && (prevOutputCount >= maxOutput)) {
 
             .notifyAll();
         }
 
         return (OUTPUT) result;
     }
 
     @Nullable
     private OUTPUT readQueue(@Nonnull final TimeDuration timeout,
             @Nonnull final TimeoutAction action) {
 
         verifyBound();
 
         final Logger logger = ;
         final NestedQueue<ObjectoutputQueue = ;
 
         if (timeout.isZero() || !outputQueue.isEmpty()) {
 
             if (outputQueue.isEmpty()) {
 
                 logger.wrn("reading output timeout: [%s] => [%s]"timeoutaction);
 
                 if (action == .) {
 
                     throw new ReadDeadlockException("deadlock while waiting for outputs");
                 }
             }
 
             return nextOutput(timeout);
         }
 
         if ( == null) {
 
              = new Check() {
 
                 public boolean isTrue() {
 
                     return !outputQueue.isEmpty();
                 }
             };
         }
 
         final boolean isTimeout;
 
         try {
 
             isTimeout = !timeout.waitTrue();
 
         } catch (final InterruptedException e) {
 
             throw new InvocationInterruptedException(e);
         }
 
         if (isTimeout) {
 
             logger.wrn("reading output timeout: [%s] => [%s]"timeoutaction);
 
             if (action == .) {
 
                 throw new ReadDeadlockException("deadlock while waiting for outputs");
             }
         }
 
         return nextOutput(timeout);
     }
 
     private void verifyBound() {
 
         if ( != null) {
 
             .err("invalid call on bound channel");
             throw new IllegalStateException("the channel is already bound");
         }
     }
 
     private void verifyOutput() {
 
         if () {
 
             final Throwable throwable = ;
             .dbg(throwable"abort exception");
             throw RoutineExceptionWrapper.wrap(throwable).raise();
         }
 
         if (!isResultOpen()) {
 
             .err("invalid call on closed channel");
             throw new IllegalStateException("the channel is closed");
         }
     }

    
Enumeration identifying the channel internal state.
 
     private enum ChannelState {
 
         OUTPUT,     // result channel is open
         RESULT,     // result channel is closed
         FLUSH,      // no more pending outputs
         EXCEPTION,  // abort issued
         ABORTED,    // invocation aborted
         DONE        // output is closed
     }

    
Interface defining an abort handler.
 
     public interface AbortHandler {

        
Called on an abort.

Parameters:
reason the reason of the abortion.
delay the abortion delay.
timeUnit the delay time unit.
 
         void onAbort(@Nullable Throwable reasonlong delay, @Nonnull TimeUnit timeUnit);
     }

    
Default implementation of an output channel iterator.
 
     private class DefaultIterator implements Iterator<OUTPUT> {
 
         private final TimeoutAction mAction;
 
         private final Logger mSubLogger = .subContextLogger(this);
 
         private final TimeDuration mTimeout;
 
         private boolean mRemoved = true;

        
Constructor.

Parameters:
timeout the output timeout.
action the timeout action.
 
         private DefaultIterator(@Nonnull final TimeDuration timeout,
                 @Nonnull final TimeoutAction action) {
 
              = timeout;
              = action;
         }
 
         public boolean hasNext() {
 
             boolean isAbort = false;
 
             synchronized () {
 
                 verifyBound();
 
                 final Logger logger = ;
                 final TimeDuration timeout = ;
                 final NestedQueue<ObjectoutputQueue = ;
 
                 if (timeout.isZero() || ( == .)) {
 
                     final boolean hasNext = !outputQueue.isEmpty();
 
                     if (!hasNext && ( != .)) {
 
                         final TimeoutAction action = ;
                         logger.wrn("has output timeout: [%s] => [%s]"timeoutaction);
 
                         if (action == .) {
 
                             throw new ReadDeadlockException(
                                     "deadlock while waiting to know if more outputs are coming");
 
                         } else {
 
                             isAbort = (action == .);
                         }
                     }
 
                 } else {
 
                     if ( == null) {
 
                          = new Check() {
 
                             public boolean isTrue() {
 
                                 return !outputQueue.isEmpty() || ( == .);
                             }
                         };
                     }
 
                     final boolean isTimeout;
 
                     try {
 
                         isTimeout = !timeout.waitTrue();
 
                     } catch (final InterruptedException e) {
 
                         throw new InvocationInterruptedException(e);
                     }
 
                     if (isTimeout) {
 
                         final TimeoutAction action = ;
                         logger.wrn("has output timeout: [%s] => [%s]"timeoutaction);
 
                         if (action == .) {
 
                             throw new ReadDeadlockException(
                                     "deadlock while waiting to know if more outputs are coming");
 
                         } else {
 
                             isAbort = (action == .);
                         }
                     }
                 }
 
                 if (!isAbort) {
 
                     final boolean hasNext = !outputQueue.isEmpty();
                     logger.dbg("has output: %s [%s]"hasNexttimeout);
                     return hasNext;
                 }
             }
 
             abort();
             throw new AbortException(null);
         }
 
         @Nullable
         @SuppressFBWarnings(value = "IT_NO_SUCH_ELEMENT",
                 justification = "NestedQueue.removeFirst() actually throws it")
         public OUTPUT next() {
 
             boolean isAbort = false;
 
             try {
 
                 synchronized () {
 
                     final TimeoutAction action = ;
                     isAbort = (action == .);
 
                     final OUTPUT next = readQueue(action);
                      = false;
                     return next;
                 }
 
             } catch (final NoSuchElementException e) {
 
                 if (isAbort) {
 
                     abort();
                     throw new AbortException(null);
                 }
 
                 throw e;
             }
         }
 
         public void remove() {
 
             synchronized () {
 
                 verifyBound();
 
                 if () {
 
                     .err("invalid output remove");
                     throw new IllegalStateException("the element has been already removed");
                 }
 
                  = true;
             }
         }
     }

    
Default implementation of a routine output channel.
 
     private class DefaultOutputChannel implements OutputChannel<OUTPUT> {
 
         private final Logger mSubLogger = .subContextLogger(this);
 
         private TimeDuration mReadTimeout = ;
 
         private TimeoutAction mTimeoutAction = .;
 
         @Nonnull
         @SuppressWarnings("ConstantConditions")
         public OutputChannel<OUTPUT> afterMax(@Nonnull final TimeDuration timeout) {
 
             synchronized () {
 
                if (timeout == null) {
                    .err("invalid null timeout");
                    throw new NullPointerException("the output timeout must not be null");
                }
                 = timeout;
            }
            return this;
        }
        @Nonnull
        public OutputChannel<OUTPUT> afterMax(final long timeout,
                @Nonnull final TimeUnit timeUnit) {
            return afterMax(fromUnit(timeouttimeUnit));
        }
        @Nonnull
        @SuppressWarnings("ConstantConditions")
        public OutputChannel<OUTPUT> bind(@Nonnull final OutputConsumer<? super OUTPUT> consumer) {
            final boolean forceClose;
            final ChannelState state;
            synchronized () {
                verifyBound();
                if (consumer == null) {
                    .err("invalid null consumer");
                    throw new NullPointerException("the output consumer must not be null");
                }
                state = ;
                forceClose = (state == .);
                 = consumer;
                 = getMutex(consumer);
            }
            flushOutput(forceClose);
            return this;
        }
        public boolean checkComplete() {
            final boolean isDone;
            synchronized () {
                final TimeDuration timeout = ;
                try {
                    isDone = timeout.waitTrue(new Check() {
                        public boolean isTrue() {
                            return ( == .);
                        }
                    });
                } catch (final InterruptedException e) {
                    throw new InvocationInterruptedException(e);
                }
                if (!isDone) {
                    .wrn("waiting complete timeout: [%s]"timeout);
                }
            }
            return isDone;
        }
        @Nonnull
        public OutputChannel<OUTPUT> eventually() {
            return afterMax();
        }
        @Nonnull
        public OutputChannel<OUTPUT> eventuallyAbort() {
            synchronized () {
                 = .;
            }
            return this;
        }
        @Nonnull
        public OutputChannel<OUTPUT> eventuallyDeadlock() {
            synchronized () {
                 = .;
            }
            return this;
        }
        @Nonnull
        public OutputChannel<OUTPUT> eventuallyExit() {
            synchronized () {
                 = .;
            }
            return this;
        }
        @Nonnull
        public OutputChannel<OUTPUT> immediately() {
            return afterMax();
        }
        public boolean isBound() {
            synchronized () {
                return ( != null);
            }
        }
        @Nonnull
        public List<OUTPUT> readAll() {
            final ArrayList<OUTPUT> results = new ArrayList<OUTPUT>();
            readAllInto(results);
            return results;
        }
        @Nonnull
        @SuppressWarnings({"unchecked""ConstantConditions"})
        public OutputChannel<OUTPUT> readAllInto(
                @Nonnull final Collection<? super OUTPUT> results) {
            boolean isAbort = false;
            synchronized () {
                verifyBound();
                final Logger logger = ;
                if (results == null) {
                    logger.err("invalid null output list");
                    throw new NullPointerException("the result list must not be null");
                }
                final NestedQueue<ObjectoutputQueue = ;
                final TimeDuration timeout = ;
                if (timeout.isZero() || ( == .)) {
                    while (!outputQueue.isEmpty()) {
                        final OUTPUT result = nextOutput(timeout);
                        logger.dbg("adding output to list: %s [%s]"resulttimeout);
                        results.add(result);
                    }
                    if ( != .) {
                        final TimeoutAction action = ;
                        logger.wrn("list output timeout: [%s] => [%s]"timeoutaction);
                        if (action == .) {
                            throw new ReadDeadlockException(
                                    "deadlock while waiting to collect all outputs");
                        } else {