Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   * http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.gh.bmd.jrt.routine;
 
 
 
 
 
 import static com.gh.bmd.jrt.builder.RoutineConfiguration.RunnerType;
 import static com.gh.bmd.jrt.time.TimeDuration.ZERO;

Basic abstract implementation of a routine.

This class provides a default implementation of all the routine functionalities. The inheriting class just need to create invocation objects when required.

Created by davide on 9/7/14.

Parameters:
<INPUT> the input data type.
<OUTPUT> the output data type.
 
 public abstract class AbstractRoutine<INPUT, OUTPUT> extends TemplateRoutine<INPUT, OUTPUT> {
 
     private static final TimeDuration DEFAULT_AVAIL_TIMEOUT = ;
 
     private static final int DEFAULT_CORE_INVOCATIONS = 10;
 
     private static final int DEFAULT_MAX_INVOCATIONS = .;
 
     private final LinkedList<Invocation<INPUT, OUTPUT>> mAsyncInvocations =
             new LinkedList<Invocation<INPUT, OUTPUT>>();
 
     private final Runner mAsyncRunner;
 
     private final TimeDuration mAvailTimeout;
 
     private final RoutineConfiguration mConfiguration;
 
     private final int mCoreInvocations;
 
     private final Logger mLogger;
 
     private final int mMaxInvocations;
 
     private final Object mMutex = new Object();
 
     private final Object mParallelMutex = new Object();
 
     private final LinkedList<Invocation<INPUT, OUTPUT>> mSyncInvocations =
             new LinkedList<Invocation<INPUT, OUTPUT>>();
 
     private final Runner mSyncRunner;
 
     private volatile DefaultInvocationManager mAsyncManager;
 
     private AbstractRoutine<INPUT, OUTPUT> mParallelRoutine;
 
     private int mRunningCount;
 
     private final Check mIsInvocationAvailable = new Check() {
 
         public boolean isTrue() {
 
             return  < ;
         }
     };
 
     private volatile DefaultInvocationManager mSyncManager;

    
Constructor.

Parameters:
configuration the routine configuration.
Throws:
java.lang.IllegalArgumentException if at least one of the parameter is invalid.
java.lang.NullPointerException if the specified configuration is null.
    @SuppressWarnings("ConstantConditions")
    protected AbstractRoutine(@Nonnull final RoutineConfiguration configuration) {
         = configuration;
         = (configuration.getSyncRunnerOr(.) == .)
                ? Runners.queuedRunner() : Runners.sequentialRunner();
         = configuration.getRunnerOr(Runners.sharedRunner());
         = configuration.getAvailTimeoutOr();
         = Logger.newLogger(configurationthis);
        .dbg("building routine with configuration: %s"configuration);
    }

    
Constructor.

Parameters:
configuration the routine configuration.
syncRunner the runner used for synchronous invocation.
asyncRunner the runner used for asynchronous invocation.
logger the logger instance.
    private AbstractRoutine(@Nonnull final RoutineConfiguration configuration,
            @Nonnull final Runner syncRunner, @Nonnull final Runner asyncRunner,
            @Nonnull final Logger logger) {
         = configuration;
         = syncRunner;
         = asyncRunner;
         = logger.subContextLogger(this);
    }
    @Nonnull
    public ParameterChannel<INPUT, OUTPUT> invokeAsync() {
        return invoke(true);
    }
    @Nonnull
    public ParameterChannel<INPUT, OUTPUT> invokeParallel() {
        synchronized () {
            .dbg("invoking routine: parallel");
            if ( == null) {
                 = new AbstractRoutine<INPUT, OUTPUT>(,
                                                                      ) {
                    @Nonnull
                    @Override
                    protected Invocation<INPUT, OUTPUT> newInvocation(final boolean async) {
                        return new ParallelInvocation<INPUT, OUTPUT>(AbstractRoutine.this);
                    }
                };
            }
        }
        return .invokeAsync();
    }
    @Nonnull
    public ParameterChannel<INPUT, OUTPUT> invokeSync() {
        return invoke(false);
    }
    @Override
    public void purge() {
        synchronized () {
            final Logger logger = ;
            final LinkedList<Invocation<INPUT, OUTPUT>> syncInvocations = ;
            for (final Invocation<INPUT, OUTPUT> invocation : syncInvocations) {
                try {
                    invocation.onDestroy();
                } catch (final InvocationInterruptedException e) {
                    throw e;
                } catch (final Throwable t) {
                    logger.wrn(t"ignoring exception while destroying invocation instance");
                }
            }
            syncInvocations.clear();
            final LinkedList<Invocation<INPUT, OUTPUT>> asyncInvocations = ;
            for (final Invocation<INPUT, OUTPUT> invocation : asyncInvocations) {
                try {
                    invocation.onDestroy();
                } catch (final InvocationInterruptedException e) {
                    throw e;
                } catch (final Throwable t) {
                    logger.wrn(t"ignoring exception while destroying invocation instance");
                }
            }
            asyncInvocations.clear();
        }
    }

    
Converts an invocation instance from synchronous to asynchronous or the contrary.

Parameters:
async whether the converted invocation is asynchronous.
invocation the invocation to convert.
Returns:
the converted invocation.
    @Nonnull
    @SuppressWarnings("UnusedParameters")
    protected Invocation<INPUT, OUTPUT> convertInvocation(final boolean async,
            @Nonnull final Invocation<INPUT, OUTPUT> invocation) {
        return invocation;
    }

    
Returns the routine logger.

Returns:
the logger instance.
    @Nonnull
    protected Logger getLogger() {
        return ;
    }

    
Creates a new invocation instance.

Parameters:
async whether the invocation is asynchronous.
Returns:
the invocation instance.
    @Nonnull
    protected abstract Invocation<INPUT, OUTPUT> newInvocation(boolean async);
    @Nonnull
    private DefaultInvocationManager getInvocationManager(final boolean async) {
        if (async) {
            if ( == null) {
                 = new DefaultInvocationManager(true);
            }
            return ;
        }
        if ( == null) {
             = new DefaultInvocationManager(false);
        }
        return ;
    }
    @Nonnull
    private ParameterChannel<INPUT, OUTPUT> invoke(final boolean async) {
        final Logger logger = ;
        logger.dbg("invoking routine: %ssync", (async) ? "a" : "");
        return new DefaultParameterChannel<INPUT, OUTPUT>(,
                                                          getInvocationManager(async),
                                                          (async) ?  : ,
                                                          logger);
    }

    
Implementation of an invocation handling parallel mode.

Parameters:
<INPUT> the input data type.
<OUTPUT> the output data type.
    private static class ParallelInvocation<INPUT, OUTPUT>
            extends TemplateInvocation<INPUT, OUTPUT> {
        private final Routine<INPUT, OUTPUT> mRoutine;

        
Constructor.

Parameters:
routine the routine to invoke in parallel mode.
        private ParallelInvocation(@Nonnull final Routine<INPUT, OUTPUT> routine) {
             = routine;
        }
        @Override
        public void onInput(final INPUT input, @Nonnull final ResultChannel<OUTPUT> result) {
            result.pass(.callAsync(input));
        }
    }

    
Default implementation of an invocation manager supporting recycling of invocation instances.
    private class DefaultInvocationManager implements InvocationManager<INPUT, OUTPUT> {
        private final boolean mAsync;

        
Constructor.

Parameters:
async whether the invocation is asynchronous.
        private DefaultInvocationManager(final boolean async) {
             = async;
        }
        @Nonnull
        public Invocation<INPUT, OUTPUT> create() {
            synchronized () {
                final boolean isTimeout;
                try {
                    isTimeout = !.waitTrue();
                } catch (final InterruptedException e) {
                    .err(e"waiting for available instances interrupted [#%d]",
                                );
                    throw new InvocationInterruptedException(e);
                }
                if (isTimeout) {
                    .wrn("routine instance not available after timeout [#%d]: %s",
                                );
                    throw new RoutineDeadlockException(
                            "deadlock while waiting for an available invocation instance");
                }
                ++;
                final boolean async = ;
                final LinkedList<Invocation<INPUT, OUTPUT>> invocations =
                        (async) ?  : ;
                if (!invocations.isEmpty()) {
                    final Invocation<INPUT, OUTPUT> invocation = invocations.removeFirst();
                    .dbg("reusing %ssync invocation instance [%d/%d]: %s",
                                (async) ? "a" : ""invocations.size() + 1, ,
                                invocation);
                    return invocation;
                } else {
                    final LinkedList<Invocation<INPUT, OUTPUT>> fallbackInvocations =
                            (async) ?  : ;
                    if (!fallbackInvocations.isEmpty()) {
                        final Invocation<INPUT, OUTPUT> invocation =
                                fallbackInvocations.removeFirst();
                        .dbg("converting %ssync invocation instance [%d/%d]: %s",
                                    (async) ? "a" : ""invocations.size() + 1, ,
                                    invocation);
                        return convertInvocation(asyncinvocation);
                    }
                }
                .dbg("creating %ssync invocation instance [1/%d]", (async) ? "a" : "",
                            );
                return newInvocation(async);
            }
        }
        @SuppressFBWarnings(value = "NO_NOTIFY_NOT_NOTIFYALL",
                justification = "only one invocation is released")
        public void discard(@Nonnull final Invocation<INPUT, OUTPUT> invocation) {
            synchronized () {
                final Logger logger = ;
                logger.wrn("discarding invocation instance after error: %s"invocation);
                try {
                    invocation.onDestroy();
                } catch (final InvocationInterruptedException e) {
                    throw e;
                } catch (final Throwable t) {
                    logger.wrn(t"ignoring exception while destroying invocation instance");
                }
                --;
                .notify();
            }
        }
        @SuppressFBWarnings(value = "NO_NOTIFY_NOT_NOTIFYALL",
                justification = "only one invocation is released")
        public void recycle(@Nonnull final Invocation<INPUT, OUTPUT> invocation) {
            synchronized () {
                final Logger logger = ;
                final boolean async = ;
                final LinkedList<Invocation<INPUT, OUTPUT>> syncInvocations = ;
                final LinkedList<Invocation<INPUT, OUTPUT>> asyncInvocations = ;
                if ((syncInvocations.size() + asyncInvocations.size()) < ) {
                    final LinkedList<Invocation<INPUT, OUTPUT>> invocations =
                            (async) ? asyncInvocations : syncInvocations;
                    logger.dbg("recycling %ssync invocation instance [%d/%d]: %s",
                               (async) ? "a" : ""invocations.size() + 1, ,
                               invocation);
                    invocations.add(invocation);
                } else {
                    logger.wrn("discarding %ssync invocation instance [%d/%d]: %s",
                               (async) ? "a" : ""invocation);
                    try {
                        invocation.onDestroy();
                    } catch (final InvocationInterruptedException e) {
                        throw e;
                    } catch (final Throwable t) {
                        logger.wrn(t"ignoring exception while destroying invocation instance");
                    }
                }
                --;
                .notify();
            }
        }
    }
New to GrepCode? Check out our FAQ X