Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2012 Amazon.com, Inc. or its affiliates. All Rights Reserved.
   *
   * Licensed under the Apache License, Version 2.0 (the "License").
   * You may not use this file except in compliance with the License.
   * A copy of the License is located at
   *
   *  http://aws.amazon.com/apache2.0
   *
  * or in the "license" file accompanying this file. This file is distributed
  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
  * express or implied. See the License for the specific language governing
  * permissions and limitations under the License.
  */
 package com.amazonaws.services.simpleworkflow.flow.worker;
 
 
 
 
 
     private static final Log log = LogFactory.getLog(ActivityTaskPoller.class);
 
 
     protected Semaphore pollSemaphore;
 
 
         @Override
         public void uncaughtException(Thread tThrowable e) {
             .error("Failure in thread " + t.getName(), e);
         }
     };
 
     public ActivityTaskPoller(AmazonSimpleWorkflow serviceString domainString pollTaskList,
             ActivityImplementationFactory activityImplementationFactoryThreadPoolExecutor taskExecutorService) {
         super(servicedomainpollTaskListactivityImplementationFactory);
         setTaskExecutorService(taskExecutorService);
     }
 
         return ;
     }
 
     public void setTaskExecutorService(ThreadPoolExecutor taskExecutorService) {
         this. = taskExecutorService;
          = new Semaphore(taskExecutorService.getMaximumPoolSize());
     }

    
Poll for a activity task and execute correspondent implementation using provided executor service.

Returns:
true if task was polled and decided upon, false if poll timed out
Throws:
java.lang.Exception
 
     @Override
     public boolean pollAndProcessSingleTask() throws Exception {
         boolean semaphoreNeedsRelease = false;
         try {
             // Without semaphore task that was polled from a service
             // can end up waiting on taskExecutor.execute(...) for a long
             // time leading to timeouts and other problems
             if ( != null) {
                 .acquire();
             }
             // we will release the semaphore in a finally clause
             semaphoreNeedsRelease = true;
             final ActivityTask task = poll();
             if (task == null) {
                 return false;
             }
             semaphoreNeedsRelease = false;
             try {
                 .execute(new Runnable() {
 
                     @Override
                     public void run() {
                         try {
                             execute(task);
                         }
                         catch (Throwable ee) {
                             .uncaughtException(Thread.currentThread(), wrapFailure(taskee));
                         }
                         finally {
                             .release();
                         }
                    }
                });
            }
            catch (Exception e) {
                semaphoreNeedsRelease = true;
                throw e;
            } catch (Error e) {
                semaphoreNeedsRelease = true;
                throw e;
            }
        }
        finally {
            if (semaphoreNeedsRelease) {
                .release();
            }
        }
        return true;
    }
    private Exception wrapFailure(final ActivityTask taskThrowable failure) {
        WorkflowExecution execution = task.getWorkflowExecution();
        RuntimeException e2 = new RuntimeException(
                "Failure taskId=\"" + task.getStartedEventId() + "\" workflowExecutionRunId=\"" + execution.getRunId()
                        + "\" workflowExecutionId=\"" + execution.getWorkflowId() + "\""failure);
        return e2;
    }
    @Override
    public void shutdown() {
        .shutdown();
    }
    @Override
    public void shutdownNow() {
    }
    @Override
    public boolean awaitTermination(long timeoutTimeUnit unitthrows InterruptedException {
        return .awaitTermination(timeoutunit);
    }
New to GrepCode? Check out our FAQ X