Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   * http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.gh.bmd.jrt.routine;
 
 
 
 
 
 import static com.gh.bmd.jrt.time.TimeDuration.ZERO;
 import static com.gh.bmd.jrt.time.TimeDuration.fromUnit;

Default implementation of a parameter input channel.

Created by davide on 9/24/14.

Parameters:
<INPUT> the input data type.
<OUTPUT> the output data type.
 
 class DefaultParameterChannel<INPUT, OUTPUT> implements ParameterChannel<INPUT, OUTPUT> {
 
     private final ArrayList<OutputChannel<?>> mBoundChannels = new ArrayList<OutputChannel<?>>();
 
     private final DefaultExecution<INPUT, OUTPUT> mExecution;
 
     private final Check mHasInputs;
 
     private final NestedQueue<INPUT> mInputQueue;
 
     private final TimeDuration mInputTimeout;
 
     private final Logger mLogger;
 
     private final int mMaxInput;
 
     private final Object mMutex = new Object();
 
     private final DefaultResultChannel<OUTPUT> mResultChanel;
 
     private final Runner mRunner;
 
     private Throwable mAbortException;
 
     private int mInputCount;
 
     private TimeDuration mInputDelay = ;
 
     private boolean mIsConsuming;
 
     private boolean mIsPendingExecution;
 
     private int mPendingExecutionCount;
 
     private ChannelState mState = .;

    
Constructor.

Parameters:
configuration the routine configuration.
manager the invocation manager.
runner the runner instance.
logger the logger instance.
Throws:
java.lang.IllegalArgumentException if at least one of the parameter is invalid.
java.lang.NullPointerException if one of the parameters is null.
 
     DefaultParameterChannel(@Nonnull final RoutineConfiguration configuration,
            @Nonnull final InvocationManager<INPUT, OUTPUT> manager, @Nonnull final Runner runner,
            @Nonnull final Logger logger) {
         = logger.subContextLogger(this);
         = runner;
         = configuration.getInputSizeOr(.);
         = configuration.getInputTimeoutOr();
        if ( == null) {
            throw new NullPointerException("the input timeout must not be null");
        }
        final int maxInputSize = ;
        if (maxInputSize < 1) {
            throw new IllegalArgumentException("the input buffer size cannot be 0 or negative");
        }
         = (configuration.getInputOrderOr(.) == .)
                ? new SimpleNestedQueue<INPUT>() : new OrderedNestedQueue<INPUT>();
         = new Check() {
            public boolean isTrue() {
                return ( <= maxInputSize);
            }
        };
         = new DefaultResultChannel<OUTPUT>(configurationnew AbortHandler() {
            public void onAbort(@Nullable final Throwable reasonfinal long delay,
                    @Nonnull final TimeUnit timeUnit) {
                final boolean isDone;
                synchronized () {
                    if ( == .) {
                        .wrn("avoiding aborting result channel since invocation is aborted");
                        return;
                    }
                    isDone = ( == .);
                    if (isDone) {
                        .dbg(
                                "avoiding aborting result channel since invocation is complete");
                    } else {
                        .dbg("aborting result channel");
                         = reason;
                         = .;
                    }
                }
                if (isDone) {
                    .run(new AbortResultExecution(reason), delaytimeUnit);
                } else {
                    .run(.abort(), delaytimeUnit);
                }
            }
        }, runnerlogger);
         = new DefaultExecution<INPUT, OUTPUT>(managernew DefaultInputIterator(),
                                                         logger);
    }
    public boolean abort() {
        return abort(null);
    }
    public boolean abort(@Nullable final Throwable reason) {
        final TimeDuration delay;
        synchronized () {
            if (!isOpen()) {
                .dbg(reason"avoiding aborting since channel is closed");
                return false;
            }
            delay = ;
            if (delay.isZero()) {
                .dbg(reason"aborting channel");
                 =
                        (reason instanceof AbortException) ? reason : new AbortException(reason);
                 = .;
            }
        }
        if (delay.isZero()) {
            .run(.abort(), 0, .);
        } else {
            .run(new DelayedAbortExecution(reason), delay.timedelay.unit);
        }
        return true;
    }
    public boolean isOpen() {
        synchronized () {
            return ( == .);
        }
    }
    @Nonnull
    @SuppressWarnings("ConstantConditions")
    public ParameterChannel<INPUT, OUTPUT> after(@Nonnull final TimeDuration delay) {
        synchronized () {
            verifyOpen();
            if (delay == null) {
                .err("invalid null delay");
                throw new NullPointerException("the input delay must not be null");
            }
             = delay;
        }
        return this;
    }
    @Nonnull
    public ParameterChannel<INPUT, OUTPUT> after(final long delay,
            @Nonnull final TimeUnit timeUnit) {
        return after(fromUnit(delaytimeUnit));
    }
    @Nonnull
    public ParameterChannel<INPUT, OUTPUT> now() {
        return after();
    }
    @Nonnull
    public ParameterChannel<INPUT, OUTPUT> pass(
            @Nullable final OutputChannel<? extends INPUT> channel) {
        final TimeDuration delay;
        final DefaultOutputConsumer consumer;
        synchronized () {
            verifyOpen();
            if (channel == null) {
                .wrn("passing null channel");
                return this;
            }
            .add(channel);
            delay = ;
            ++;
            .dbg("passing channel: %s"channel);
            consumer = new DefaultOutputConsumer(delay);
        }
        channel.bind(consumer);
        return this;
    }
    @Nonnull
    public ParameterChannel<INPUT, OUTPUT> pass(@Nullable final Iterable<? extends INPUT> inputs) {
        NestedQueue<INPUT> inputQueue;
        ArrayList<INPUT> list = null;
        boolean needsExecution = false;
        final TimeDuration delay;
        synchronized () {
            verifyOpen();
            if (inputs == null) {
                .wrn("passing null iterable");
                return this;
            }
            inputQueue = ;
            delay = ;
            int count = 0;
            if (delay.isZero()) {
                for (final INPUT input : inputs) {
                    inputQueue.add(input);
                    ++count;
                }
            } else {
                inputQueue = inputQueue.addNested();
                list = new ArrayList<INPUT>();
                for (final INPUT input : inputs) {
                    list.add(input);
                }
                count = list.size();
            }
            .dbg("passing iterable [#%d+%d]: %s [%s]"countinputsdelay);
            addInputs(count);
            if (delay.isZero()) {
                needsExecution = !;
                if (needsExecution) {
                    ++;
                     = true;
                }
            } else {
                ++;
            }
        }
        if (delay.isZero()) {
            if (needsExecution) {
                .run(, 0, .);
            }
        } else {
            .run(new DelayedListInputExecution(inputQueuelist), delay.timedelay.unit);
        }
        return this;
    }
    @Nonnull
    public ParameterChannel<INPUT, OUTPUT> pass(@Nullable final INPUT input) {
        NestedQueue<INPUT> inputQueue;
        boolean needsExecution = false;
        final TimeDuration delay;
        synchronized () {
            verifyOpen();
            inputQueue = ;
            delay = ;
            if (delay.isZero()) {
                inputQueue.add(input);
            } else {
                inputQueue = inputQueue.addNested();
            }
            .dbg("passing input [#%d+1]: %s [%s]"inputdelay);
            addInputs(1);
            if (delay.isZero()) {
                needsExecution = !;
                if (needsExecution) {
                    ++;
                     = true;
                }
            } else {
                ++;
            }
        }
        if (delay.isZero()) {
            if (needsExecution) {
                .run(, 0, .);
            }
        } else {
            .run(new DelayedInputExecution(inputQueueinput), delay.timedelay.unit);
        }
        return this;
    }
    @Nonnull
    public ParameterChannel<INPUT, OUTPUT> pass(@Nullable final INPUT... inputs) {
        synchronized () {
            verifyOpen();
            if (inputs == null) {
                .wrn("passing null input array");
                return this;
            }
        }
        return pass(Arrays.asList(inputs));
    }
    @Nonnull
    public OutputChannel<OUTPUT> result() {
        final boolean needsExecution;
        synchronized () {
            verifyOpen();
            .dbg("closing input channel");
             = .;
            needsExecution = ! && !;
            if (needsExecution) {
                ++;
                 = true;
            }
        }
        if (needsExecution) {
            .run(, 0, .);
        }
        return .getOutput();
    }
    private void addInputs(final int count) {
         += count;
        try {
            if (!.waitTrue()) {
                throw new InputDeadlockException(
                        "deadlock while waiting for room in the input channel");
            }
        } catch (final InterruptedException e) {
            throw new InvocationInterruptedException(e);
        }
    }
    private boolean isInput() {
        return (( == .) || ( == .));
    }
    private boolean isInputComplete() {
        return ( == .) && ( <= 0) && !;
    }
    private void verifyOpen() {
        if ( == .) {
            final Throwable throwable = ;
            .dbg(throwable"abort exception");
            throw RoutineExceptionWrapper.wrap(throwable).raise();
        }
        if (!isOpen()) {
            .err("invalid call on closed channel");
            throw new IllegalStateException("the input channel is closed");
        }
    }

    
Enumeration identifying the channel internal state.
    private enum ChannelState {
        INPUT,      // input channel is open
        OUTPUT,     // no more inputs
        RESULT,     // result called
        EXCEPTION   // abort issued
    }

    
Interface defining an object managing the creation and the recycling of invocation instances.

Parameters:
<INPUT> the input data type.
<OUTPUT> the output data type.
    interface InvocationManager<INPUT, OUTPUT> {

        
Creates and returns a new invocation instance.

Returns:
the invocation instance.
        @Nonnull
        Invocation<INPUT, OUTPUT> create();

        
Discards the specified invocation.

Parameters:
invocation the invocation instance.
        void discard(@Nonnull Invocation<INPUT, OUTPUT> invocation);

        
Recycles the specified invocation.

Parameters:
invocation the invocation instance.
        void recycle(@Nonnull Invocation<INPUT, OUTPUT> invocation);
    }

    
Implementation of an execution handling the abortion of the result channel.
    private class AbortResultExecution implements Execution {
        private Throwable mReason;

        
Constructor.

Parameters:
reason the reason of the abortion.
        private AbortResultExecution(@Nullable final Throwable reason) {
             = reason;
        }
        public void run() {
            .close();
        }
    }

    
Default implementation of an input iterator.
    private class DefaultInputIterator implements InputIterator<INPUT> {
        @Nullable
        public Throwable getAbortException() {
            synchronized () {
                return ;
            }
        }
        public boolean hasInput() {
            synchronized () {
                return isInput() && !.isEmpty();
            }
        }
        public boolean isAborting() {
            synchronized () {
                return ( == .);
            }
        }
        @Nullable
        @SuppressFBWarnings(value = "NO_NOTIFY_NOT_NOTIFYALL",
                justification = "only one input is released")
        public INPUT nextInput() {
            synchronized () {
                final INPUT input = .removeFirst();
                .dbg("reading input [#%d]: %s"input);
                final int maxInput = ;
                final int prevInputCount = ;
                if ((-- < maxInput) && (prevInputCount >= maxInput)) {
                    .notify();
                }
                return input;
            }
        }
        public void onAbortComplete() {
            final Throwable exception;
            final ArrayList<OutputChannel<?>> channels;
            synchronized () {
                exception = ;
                .dbg(exception"aborting bound channels [%d]".size());
                channels = new ArrayList<OutputChannel<?>>();
                .clear();
            }
            for (final OutputChannel<?> channel : channels) {
                channel.abort(exception);
            }
        }
        public boolean onConsumeComplete() {
            synchronized () {
                 = false;
                return isInputComplete();
            }
        }
        public void onConsumeStart() {
            synchronized () {
                if (!isInput()) {
                    .wrn("avoiding consuming input since invocation is complete [#%d]",
                                );
                    return;
                }
                .dbg("consuming input [#%d]");
                --;
                 = false;
                 = true;
            }
        }
        public void onInvocationComplete() {
            synchronized () {
                if ( != .) {
                     = .;
                }
            }
        }
    }

    
Default implementation of an output consumer pushing the data to consume into the input channel queue.
    private class DefaultOutputConsumer implements OutputConsumer<INPUT> {
        private final TimeDuration mDelay;
        private final NestedQueue<INPUT> mQueue;
        private final Logger mSubLogger = .subContextLogger(this);

        
Constructor.

Parameters:
delay the output delay.
        private DefaultOutputConsumer(@Nonnull final TimeDuration delay) {
             = delay;
             = .addNested();
        }
        public void onComplete() {
            final boolean needsExecution;
            synchronized () {
                verifyInput();
                .dbg("closing consumer");
                .close();
                needsExecution = ! && !;
                if (needsExecution) {
                     = true;
                } else {
                    --;
                }
            }
            if (needsExecution) {
                .run(, 0, .);
            }
        }
        public void onError(@Nullable final Throwable error) {
            synchronized () {
                if (!isInput()) {
                    .wrn("avoiding aborting consumer since channel is closed");
                    return;
                }
                .dbg("aborting consumer");
                 = error;
                 = .;
            }
            final TimeDuration delay = ;
            .run(.abort(), delay.timedelay.unit);
        }
        public void onOutput(final INPUT output) {
            NestedQueue<INPUT> inputQueue;
            boolean needsExecution = false;
            final TimeDuration delay = ;
            synchronized () {
                verifyInput();
                inputQueue = ;
                if (delay.isZero()) {
                    inputQueue.add(output);
                } else {
                    inputQueue = inputQueue.addNested();
                }
                .dbg("consumer input [#%d+1]: %s [%s]"outputdelay);
                addInputs(1);
                if (delay.isZero()) {
                    needsExecution = !;
                    if (needsExecution) {
                        ++;
                         = true;
                    }
                } else {
                    ++;
                }
            }
            if (delay.isZero()) {
                if (needsExecution) {
                    .run(, 0, .);
                }
            } else {
                .run(new DelayedInputExecution(inputQueueoutput), delay.timedelay.unit);
            }
        }
        private void verifyInput() {
            if ( == .) {
                final Throwable throwable = ;
                .dbg(throwable"consumer abort exception");
                throw RoutineExceptionWrapper.wrap(throwable).raise();
            }
            if (!isInput()) {
                .dbg("consumer invalid call on closed channel");
                throw new IllegalStateException("the input channel is closed");
            }
        }
    }

    
Implementation of an execution handling a delayed abort.
    private class DelayedAbortExecution implements Execution {
        private final Throwable mThrowable;

        
Constructor.

Parameters:
throwable the reason of the abort.
        private DelayedAbortExecution(@Nullable final Throwable throwable) {
             = throwable;
        }
        public void run() {
            final Throwable throwable = ;
            synchronized () {
                if (isInputComplete() || ( == .)) {
                    .dbg(throwable"avoiding aborting since channel is closed");
                    return;
                }
                .dbg(throwable"aborting channel");
                 = throwable;
                 = .;
            }
            .run(.abort(), 0, .);
        }
    }

    
Implementation of an execution handling a delayed input.
    private class DelayedInputExecution implements Execution {
        private final INPUT mInput;
        private final NestedQueue<INPUT> mQueue;

        
Constructor.

Parameters:
queue the input queue.
input the input.
        private DelayedInputExecution(@Nonnull final NestedQueue<INPUT> queue,
                @Nullable final INPUT input) {
             = queue;
             = input;
        }
        public void run() {
            synchronized () {
                if (!isInput()) {
                    .dbg("avoiding delayed input execution since channel is closed: %s",
                                );
                    return;
                }
                final NestedQueue<INPUT> queue = ;
                .dbg("delayed input execution: %s");
                queue.add();
                queue.close();
            }
            .run();
        }
    }

    
Implementation of an execution handling a delayed input of a list of data.
    private class DelayedListInputExecution implements Execution {
        private final ArrayList<INPUT> mInputs;
        private final NestedQueue<INPUT> mQueue;

        
Constructor.

Parameters:
queue the input queue.
inputs the list of input data.
        private DelayedListInputExecution(@Nonnull final NestedQueue<INPUT> queue,
                final ArrayList<INPUT> inputs) {
             = inputs;
             = queue;
        }
        public void run() {
            synchronized () {
                if (!isInput()) {
                    .dbg("avoiding delayed input execution since channel is closed: %s",
                                );
                    return;
                }
                final NestedQueue<INPUT> queue = ;
                .dbg("delayed input execution: %s");
                queue.addAll();
                queue.close();
            }
            .run();
        }
    }
New to GrepCode? Check out our FAQ X