Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
   *
   * Licensed under the Apache License, Version 2.0 (the "License").
   * You may not use this file except in compliance with the License.
   * A copy of the License is located at
   *
   *  http://aws.amazon.com/apache2.0
   *
  * or in the "license" file accompanying this file. This file is distributed
  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
  * express or implied. See the License for the specific language governing
  * permissions and limitations under the License.
  */
 package com.amazonaws.services.simpleworkflow.flow.worker;
 
 
 
 
 public class SynchronousActivityTaskPoller implements TaskPoller {
 
     private static final Log log = LogFactory.getLog(SynchronousActivityTaskPoller.class);
 
     private AmazonSimpleWorkflow service;
 
     private String domain;
 
     private String taskListToPoll;
 
 
     private String identity;
 
 
 
     private boolean initialized;
 
     public SynchronousActivityTaskPoller(AmazonSimpleWorkflow serviceString domainString taskListToPoll,
             ActivityImplementationFactory activityImplementationFactory) {
         this();
         this. = service;
         this. = domain;
         this. = taskListToPoll;
         this. = activityImplementationFactory;
     }
 
     public SynchronousActivityTaskPoller() {
          = ManagementFactory.getRuntimeMXBean().getName();
         int length = Math.min(.length(), .);
          = .substring(0, length);
     }
 
     public AmazonSimpleWorkflow getService() {
         return ;
     }
 
     public void setService(AmazonSimpleWorkflow service) {
         this. = service;
     }
 
     public String getDomain() {
         return ;
     }
 
     public void setDomain(String domain) {
         this. = domain;
     }
 
     public String getPollTaskList() {
         return ;
     }
 
     public void setTaskListToPoll(String taskList) {
         this. = taskList;
    }
        return ;
    }
    public void setActivityImplementationFactory(ActivityImplementationFactory activityImplementationFactory) {
        this. = activityImplementationFactory;
    }
    public String getIdentity() {
        return ;
    }
    public void setIdentity(String identity) {
        this. = identity;
    }
    }
    public void setReportCompletionRetryParameters(ExponentialRetryParameters reportCompletionRetryParameters) {
        this. = new SynchronousRetrier(reportCompletionRetryParametersUnknownResourceException.class);
    }
        return .getRetryParameters();
    }
    public void setReportFailureRetryParameters(ExponentialRetryParameters reportFailureRetryParameters) {
        this. = new SynchronousRetrier(reportFailureRetryParametersUnknownResourceException.class);
    }
    public String getTaskListToPoll() {
        return ;
    }

    
Poll for a task using getPollTimeoutInSeconds()

Returns:
null if poll timed out
    public ActivityTask poll() {
        if (!) {
            checkRequiredProperty("service");
            checkRequiredProperty("domain");
            checkRequiredProperty("taskListToPoll");
             = true;
        }
        PollForActivityTaskRequest pollRequest = new PollForActivityTaskRequest();
        pollRequest.setDomain();
        pollRequest.setIdentity();
        pollRequest.setTaskList(new TaskList().withName());
        if (.isDebugEnabled()) {
            .debug("poll request begin: " + pollRequest);
        }
        ActivityTask result = .pollForActivityTask(pollRequest);
        if (result == null || result.getTaskToken() == null) {
            if (.isDebugEnabled()) {
                .debug("poll request returned no task");
            }
            return null;
        }
        if (.isTraceEnabled()) {
            .trace("poll request returned " + result);
        }
        return result;
    }

    
Poll for a activity task and execute correspondent implementation.

Returns:
true if task was polled and decided upon, false if poll timed out
Throws:
java.lang.Exception
    @Override
    public boolean pollAndProcessSingleTask() throws Exception {
        ActivityTask task = poll();
        if (task == null) {
            return false;
        }
        execute(task);
        return true;
    }
    protected void execute(final ActivityTask taskthrows Exception {
        String output = null;
        ActivityType activityType = task.getActivityType();
        try {
            ActivityExecutionContext context = new ActivityExecutionContextImpl(task);
            ActivityImplementation activityImplementation = .getActivityImplementation(activityType);
            if (activityImplementation == null) {
                throw new ActivityFailureException("Unknown activity type: " + activityType);
            }
            output = activityImplementation.execute(context);
            if (!activityImplementation.getExecutionOptions().isManualActivityCompletion()) {
                respondActivityTaskCompletedWithRetry(task.getTaskToken(), output);
            }
        }
        catch (CancellationException e) {
            respondActivityTaskCanceledWithRetry(task.getTaskToken(), null);
            return;
        }
        catch (ActivityFailureException e) {
            if (.isErrorEnabled()) {
                .error("Failure processing activity task with taskId=" + task.getStartedEventId() + ", workflowGenerationId="
                        + task.getWorkflowExecution().getWorkflowId() + ", activity=" + activityType
                        + ", activityInstanceId=" + task.getActivityId(), e);
            }
            respondActivityTaskFailedWithRetry(task.getTaskToken(), e.getReason(), e.getDetails());
        }
        catch (Exception e) {
            if (.isErrorEnabled()) {
                .error("Failure processing activity task with taskId=" + task.getStartedEventId() + ", workflowGenerationId="
                        + task.getWorkflowExecution().getWorkflowId() + ", activity=" + activityType
                        + ", activityInstanceId=" + task.getActivityId(), e);
            }
            String reason = e.getMessage();
            StringWriter sw = new StringWriter();
            e.printStackTrace(new PrintWriter(sw));
            String details = sw.toString();
            respondActivityTaskFailedWithRetry(task.getTaskToken(), reasondetails);
        }
    }
    protected void respondActivityTaskFailedWithRetry(final String taskTokenfinal String reasonfinal String details) {
        if ( == null) {
            respondActivityTaskFailed(taskTokenreasondetails);
        }
        else {
            .retry(new Runnable() {
                @Override
                public void run() {
                    respondActivityTaskFailed(taskTokenreasondetails);
                }
            });
        }
    }
    protected void respondActivityTaskFailed(String taskTokenString reasonString details) {
        RespondActivityTaskFailedRequest failedResponse = new RespondActivityTaskFailedRequest();
        failedResponse.setTaskToken(taskToken);
        failedResponse.setReason(WorkflowExecutionUtils.truncateReason(reason));
        failedResponse.setDetails(details);
        .respondActivityTaskFailed(failedResponse);
    }
    protected void respondActivityTaskCanceledWithRetry(final String taskTokenfinal String details) {
        if ( == null) {
            respondActivityTaskCanceled(taskTokendetails);
        }
        else {
            .retry(new Runnable() {
                @Override
                public void run() {
                    respondActivityTaskCanceled(taskTokendetails);
                }
            });
        }
    }
    protected void respondActivityTaskCanceled(String taskTokenString details) {
        canceledResponse.setTaskToken(taskToken);
        canceledResponse.setDetails(details);
        .respondActivityTaskCanceled(canceledResponse);
    }
    protected void respondActivityTaskCompletedWithRetry(final String taskTokenfinal String output) {
        if ( == null) {
            respondActivityTaskCompleted(taskTokenoutput);
        }
        else {
            .retry(new Runnable() {
                @Override
                public void run() {
                    respondActivityTaskCompleted(taskTokenoutput);
                }
            });
        }
    }
    protected void respondActivityTaskCompleted(String taskTokenString output) {
        completedReponse.setTaskToken(taskToken);
        completedReponse.setResult(output);
        .respondActivityTaskCompleted(completedReponse);
    }
    protected void checkRequiredProperty(Object valueString name) {
        if (value == null) {
            throw new IllegalStateException("required property " + name + " is not set");
        }
    }
    @Override
    public void shutdown() {
    }
    @Override
    public void shutdownNow() {
    }
    @Override
    public boolean awaitTermination(long leftTimeUnit millisecondsthrows InterruptedException {
        //TODO: Waiting for all currently running pollAndProcessSingleTask to complete 
        return true;
    }
New to GrepCode? Check out our FAQ X