Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
   * 
   * Licensed under the Apache License, Version 2.0 (the "License"). You may not
   * use this file except in compliance with the License. A copy of the License is
   * located at
   * 
   * http://aws.amazon.com/apache2.0
   * 
  * or in the "license" file accompanying this file. This file is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
  * express or implied. See the License for the specific language governing
  * permissions and limitations under the License.
  */
 package com.amazonaws.services.simpleworkflow.flow.worker;
 
 
 
 
 public abstract class GenericWorker implements WorkerBase {
 
     class ExecutorThreadFactory implements ThreadFactory {
 
         private AtomicInteger threadIndex = new AtomicInteger();
 
         private final String threadPrefix;
 
         public ExecutorThreadFactory(String threadPrefix) {
             this. = threadPrefix;
         }
 
         @Override
         public Thread newThread(Runnable r) {
             Thread result = new Thread(r);
             result.setName( + (.incrementAndGet()));
             result.setUncaughtExceptionHandler();
             return result;
         }
     }
 
     private class PollServiceTask implements Runnable {
 
         private final TaskPoller poller;
 
         PollServiceTask(TaskPoller poller) {
             this. = poller;
         }
 
         @Override
         public void run() {
             try {
                 if (.isDebugEnabled()) {
                     .debug("poll task begin");
                 }
 
                 if (.isTerminating()) {
                     return;
                 }
                 .throttle();
                 if (.isTerminating()) {
                     return;
                 }
                 if ( != null) {
                     .throttle();
                 }
 
                 CountDownLatch suspender = GenericWorker.this..get();
                 if (suspender != null) {
                     if (.isDebugEnabled()) {
                         .debug("poll task suspending latchCount=" + suspender.getCount());
                     }
                     suspender.await();
                 }
 
                 if (.isTerminating()) {
                     return;
                 }
                 .pollAndProcessSingleTask();
                 .success();
             }
             catch (Throwable e) {
                 .failure();
                 if (!(e.getCause() instanceof InterruptedException)) {
                    .uncaughtException(Thread.currentThread(), e);
                }
            }
            finally {
                // Resubmit itself back to pollExecutor
                if (!.isShutdown()) {
                    .execute(this);
                }
            }
        }
    }
    private static final Log log = LogFactory.getLog(GenericWorker.class);
    protected static final int MAX_IDENTITY_LENGTH = 256;
    protected AmazonSimpleWorkflow service;
    protected String domain;
    protected boolean registerDomain;
    protected long domainRetentionPeriodInDays = .;
    private String taskListToPoll;
    private int maximumPollRateIntervalMilliseconds = 1000;
    private double maximumPollRatePerSecond;
    private double pollBackoffCoefficient = 2;
    private long pollBackoffInitialInterval = 100;
    private long pollBackoffMaximumInterval = 60000;
    private boolean disableTypeRegitrationOnStart;
    private boolean disableServiceShutdownOnStop;
    private String identity = ManagementFactory.getRuntimeMXBean().getName();
    private int pollThreadCount = 1;
    private Throttler pollRateThrottler;
        @Override
        public void uncaughtException(Thread tThrowable e) {
            .error("Failure in thread " + t.getName(), e);
        }
    };
    private TaskPoller poller;
    public GenericWorker(AmazonSimpleWorkflow serviceString domainString taskListToPoll) {
        this. = service;
        this. = domain;
        this. = taskListToPoll;
    }
    public GenericWorker() {
         = ManagementFactory.getRuntimeMXBean().getName();
        int length = Math.min(.length(), .);
         = .substring(0, length);
    }
    @Override
    public AmazonSimpleWorkflow getService() {
        return ;
    }
    public void setService(AmazonSimpleWorkflow service) {
        this. = service;
    }
    @Override
    public String getDomain() {
        return ;
    }
    public void setDomain(String domain) {
        this. = domain;
    }
    @Override
    public boolean isRegisterDomain() {
        return ;
    }

    
Should domain be registered on startup. Default is false. When enabled setDomainRetentionPeriodInDays(long) property is required.
    @Override
    public void setRegisterDomain(boolean registerDomain) {
        this. = registerDomain;
    }
    @Override
    public long getDomainRetentionPeriodInDays() {
        return ;
    }
    @Override
    public void setDomainRetentionPeriodInDays(long domainRetentionPeriodInDays) {
        this. = domainRetentionPeriodInDays;
    }
    @Override
    public String getTaskListToPoll() {
        return ;
    }
    public void setTaskListToPoll(String taskListToPoll) {
        this. = taskListToPoll;
    }
    @Override
    public double getMaximumPollRatePerSecond() {
        return ;
    }
    @Override
    public void setMaximumPollRatePerSecond(double maximumPollRatePerSecond) {
        this. = maximumPollRatePerSecond;
    }
    @Override
    }
    @Override
    public void setMaximumPollRateIntervalMilliseconds(int maximumPollRateIntervalMilliseconds) {
        this. = maximumPollRateIntervalMilliseconds;
    }
    @Override
        return ;
    }
    @Override
    public void setUncaughtExceptionHandler(UncaughtExceptionHandler uncaughtExceptionHandler) {
        this. = uncaughtExceptionHandler;
    }
    @Override
    public String getIdentity() {
        return ;
    }
    @Override
    public void setIdentity(String identity) {
        this. = identity;
    }
    @Override
    public long getPollBackoffInitialInterval() {
        return ;
    }
    @Override
    public void setPollBackoffInitialInterval(long backoffInitialInterval) {
        if (backoffInitialInterval < 0) {
            throw new IllegalArgumentException("expected value should be positive or 0: " + backoffInitialInterval);
        }
        this. = backoffInitialInterval;
    }
    @Override
    public long getPollBackoffMaximumInterval() {
        return ;
    }
    @Override
    public void setPollBackoffMaximumInterval(long backoffMaximumInterval) {
        if (backoffMaximumInterval <= 0) {
            throw new IllegalArgumentException("expected value should be positive: " + backoffMaximumInterval);
        }
        this. = backoffMaximumInterval;
    }

    
    @Override
    public boolean isDisableServiceShutdownOnStop() {
        return ;
    }

    
By default when

Parameters:
disableServiceShutdownOnStop true means do not call com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow.shutdown()
:
link shutdown() or @{link shutdownNow() is called the worker calls com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow.shutdown() on the service instance it is configured with before shutting down internal thread pools. Otherwise threads that are waiting on a poll request might block shutdown for the duration of a poll. This flag allows disabling this behavior.
    @Override
    public void setDisableServiceShutdownOnStop(boolean disableServiceShutdownOnStop) {
        this. = disableServiceShutdownOnStop;
    }
    @Override
    public double getPollBackoffCoefficient() {
        return ;
    }
    @Override
    public void setPollBackoffCoefficient(double backoffCoefficient) {
        if (backoffCoefficient < 1.0) {
            throw new IllegalArgumentException("expected value should be bigger or equal to 1.0: " + backoffCoefficient);
        }
        this. = backoffCoefficient;
    }
    @Override
    public int getPollThreadCount() {
        return ;
    }
    @Override
    public void setPollThreadCount(int threadCount) {
        checkStarted();
        this. = threadCount;
    }
    @Override
    public void setDisableTypeRegistrationOnStart(boolean disableTypeRegistrationOnStart) {
        this. = disableTypeRegistrationOnStart;
    }
    @Override
    public boolean isDisableTypeRegistrationOnStart() {
        return ;
    }
    @Override
    public void start() {
        if (.isInfoEnabled()) {
            .info("start: " + toString());
        }
        checkStarted();
        checkRequiredProperty("service");
        checkRequiredProperty("domain");
        checkRequiredProperty("taskListToPoll");
        checkRequredProperties();
        if () {
            registerDomain();
        }
        if (!) {
            registerTypesToPoll();
        }
        if ( > 0.0) {
             = new Throttler("pollRateThrottler " + ,
                    );
        }
                new LinkedBlockingQueue<Runnable>());
        ExecutorThreadFactory pollExecutorThreadFactory = getExecutorThreadFactory();
        .setThreadFactory(pollExecutorThreadFactory);
                );
         = createPoller();
        for (int i = 0; i < i++) {
            .execute(new PollServiceTask());
        }
    }
        ExecutorThreadFactory pollExecutorThreadFactory = new ExecutorThreadFactory(getPollThreadNamePrefix());
        return pollExecutorThreadFactory;
    }
    protected abstract String getPollThreadNamePrefix();
    protected abstract TaskPoller createPoller();
    protected abstract void checkRequredProperties();
    private void registerDomain() {
        if ( == .) {
            throw new IllegalStateException("required property domainRetentionPeriodInSeconds is not set");
        }
        try {
                    String.valueOf()));
        }
        catch (DomainAlreadyExistsException e) {
            if (.isTraceEnabled()) {
                .trace("Domain is already registered: " + );
            }
        }
    }
    protected void checkRequiredProperty(Object valueString name) {
        if (value == null) {
            throw new IllegalStateException("required property " + name + " is not set");
        }
    }
    protected void checkStarted() {
        if (isStarted()) {
            throw new IllegalStateException("started");
        }
    }
    private boolean isStarted() {
        return  != null;
    }
    @Override
    public void shutdown() {
        if (.isInfoEnabled()) {
            .info("shutdown");
        }
        if (!isStarted()) {
            return;
        }
        if (!) {
            .shutdown();
        }
        .shutdown();
        .shutdown();
    }
    @Override
    public void shutdownNow() {
        if (.isInfoEnabled()) {
            .info("shutdownNow");
        }
        if (!isStarted()) {
            return;
        }
        if (!) {
            .shutdown();
        }
        .shutdownNow();
        .shutdownNow();
    }
    @Override
    public boolean awaitTermination(long timeoutTimeUnit unitthrows InterruptedException {
        long start = System.currentTimeMillis();
        boolean terminated = .awaitTermination(timeoutunit);
        long elapsed = System.currentTimeMillis() - start;
        long left = ..convert(timeoutunit) - elapsed;
        return .awaitTermination(left.) && terminated;
    }
    @Override
    public boolean shutdownAndAwaitTermination(long timeoutTimeUnit unitthrows InterruptedException {
        if (!isStarted()) {
            return true;
        }
        long start = System.currentTimeMillis();
        if (!) {
            .shutdown();
        }
        .shutdownNow();
        try {
            .awaitTermination(timeoutunit);
        }
        finally {
            .shutdown();
        }
        long elapsed = System.currentTimeMillis() - start;
        long left = ..convert(timeoutunit) - elapsed;
        return awaitTermination(left.);
    }
    @Override
    public String toString() {
        return this.getClass().getSimpleName() + "[service=" +  + ", domain=" +  + ", taskListToPoll="
                +  + ", identity=" +  + ", backoffInitialInterval=" + 
                + ", backoffMaximumInterval=" +  + ", backoffCoefficient=" + 
                + "]";
    }
    @Override
    public boolean isRunning() {
        return isStarted() && !.isTerminated();
    }
    @Override
    public void suspendPolling() {
        if (.isInfoEnabled()) {
            .info("suspendPolling");
        }
        .set(new CountDownLatch(1));
    }
    @Override
    public void resumePolling() {
        if (.isInfoEnabled()) {
            .info("resumePolling");
        }
        CountDownLatch existing = .getAndSet(null);
        if (existing != null) {
            existing.countDown();
        }
    }
New to GrepCode? Check out our FAQ X