Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
   * 
   * Licensed under the Apache License, Version 2.0 (the "License"). You may not
   * use this file except in compliance with the License. A copy of the License is
   * located at
   * 
   * http://aws.amazon.com/apache2.0
   * 
  * or in the "license" file accompanying this file. This file is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
  * express or implied. See the License for the specific language governing
  * permissions and limitations under the License.
  */
 package com.amazonaws.services.simpleworkflow.flow.worker;
 
 import java.util.List;
 
 
 
 class AsyncDecider {
 
     private static abstract class WorkflowAsyncScope extends AsyncScope {
 
         public WorkflowAsyncScope() {
             super(falsetrue);
         }
 
         public abstract Promise<StringgetOutput();
 
     }
 
     private final class WorkflowExecuteAsyncScope extends WorkflowAsyncScope {
 
         private final WorkflowExecutionStartedEventAttributes attributes;
 
         private Promise<Stringoutput;
 
         public WorkflowExecuteAsyncScope(HistoryEvent event) {
             assert event.getEventType().equals(..toString());
             this. = event.getWorkflowExecutionStartedEventAttributes();
         }
 
         @Override
         protected void doAsync() throws Throwable {
              = .execute(.getInput());
         }
 
         @Override
         public Promise<StringgetOutput() {
             return ;
         }
 
     }
 
     private final class UnhandledSignalAsyncScope extends WorkflowAsyncScope {
 
         private final Promise<Stringoutput;
 
         private Throwable failure;
 
         private boolean cancellation;
 
         public UnhandledSignalAsyncScope(Promise<StringoutputThrowable failureboolean cancellation) {
             this. = output;
             this. = failure;
             this. = cancellation;
         }
 
         @Override
         protected void doAsync() throws Throwable {
         }
 
         @Override
         public Promise<StringgetOutput() {
            return ;
        }
        @Override
        public boolean isCancelRequested() {
            return super.isCancelRequested() || ;
        }
        @Override
        public Throwable getFailure() {
            Throwable result = super.getFailure();
            if ( != null) {
                result = ;
            }
            return result;
        }
        @Override
        public boolean eventLoop() throws Throwable {
            boolean completed = super.eventLoop();
            if (completed &&  != null) {
                throw ;
            }
            return completed;
        }
    }
    private static final Log log = LogFactory.getLog(AsyncDecider.class);
    private final HistoryHelper historyHelper;
    private final DecisionsHelper decisionsHelper;
    private final WorkflowClockImpl workflowClock;
    private final DecisionContext context;
    private boolean cancelRequested;
    private boolean unhandledDecision;
    private boolean completed;
    private Throwable failure;
    public AsyncDecider(WorkflowDefinitionFactory workflowDefinitionFactoryHistoryHelper historyHelper,
            DecisionsHelper decisionsHelperthrows Exception {
        this. = workflowDefinitionFactory;
        this. = historyHelper;
        this. = decisionsHelper;
        this. = new GenericActivityClientImpl(decisionsHelper);
        DecisionTask decisionTask = historyHelper.getDecisionTask();
         = new WorkfowContextImpl(decisionTask);
        this. = new GenericWorkflowClientImpl(decisionsHelper);
        this. = new WorkflowClockImpl(decisionsHelper);
    }
    public boolean isCancelRequested() {
        return ;
    }
    private void handleWorkflowExecutionStarted(HistoryEvent event) {
         = new WorkflowExecuteAsyncScope(event);
    }
    private void processEvent(HistoryEvent eventEventType eventTypethrows Throwable {
        switch (eventType) {
        case :
            .handleActivityTaskCanceled(event);
            break;
        case :
            .handleActivityTaskCompleted(event);
            break;
        case :
            .handleActivityTaskFailed(event);
            break;
        case :
            break;
        case :
            .handleActivityTaskTimedOut(event);
            break;
            break;
        case :
            break;
        case :
            break;
        case :
            break;
        case :
            break;
            break;
        case :
            break;
        case :
            handleDecisionTaskCompleted(event);
            break;
        case :
            // NOOP
            break;
        case :
            handleDecisionTaskStarted(event);
            break;
        case :
            // Handled in the processEvent(event)
            break;
            break;
        case :
            .handleScheduleActivityTaskFailed(event);
            break;
            break;
            break;
        case :
            handleStartTimerFailed(event);
            break;
        case :
            handleTimerFired(event);
            break;
            handleWorkflowExecutionCancelRequested(event);
            break;
        case :
            handleWorkflowExecutionSignaled(event);
            break;
        case :
            handleWorkflowExecutionStarted(event);
            break;
        case :
            // NOOP
            break;
        case :
            // NOOP
            break;
        case :
            .handleActivityTaskScheduled(event);
            break;
        case :
            break;
        case :
            break;
        case :
            break;
        case :
        	break;
        case :
            break;
        case :
             = true;
            break;
        case :
            break;
        case :
             = true;
            break;
        case :
            break;
        case :
             = true;
            break;
        case :
            break;
             = true;
            break;
        case :
            handleTimerStarted(event);
            break;
        case :
            .handleTimerCanceled(event);
            break;
            break;
            break;
            break;
            break;
        case :
            .handleCancelTimerFailed(event);
        }
    }
    private void eventLoop(HistoryEvent eventthrows Throwable {
        if () {
            return;
        }
        try {
             = .eventLoop();
        }
        catch (CancellationException e) {
            if (!) {
                 = e;
            }
             = true;
        }
        catch (Throwable e) {
             = e;
             = true;
        }
    }
    private void completeWorkflow() {
        if ( && !) {
            if ( != null) {
                .failWorkflowExecution();
            }
            else if () {
                .cancelWorkflowExecution();
            }
            else {
                ContinueAsNewWorkflowExecutionParameters continueAsNewOnCompletion = .getContinueAsNewOnCompletion();
                if (continueAsNewOnCompletion != null) {
                    .continueAsNewWorkflowExecution(continueAsNewOnCompletion);
                }
                else {
                    Promise<Stringoutput = .getOutput();
                    if (output != null && output.isReady()) {
                        String workflowOutput = output.get();
                        .completeWorkflowExecution(workflowOutput);
                    }
                    else {
                        .completeWorkflowExecution(null);
                    }
                }
            }
        }
    }
    private void handleDecisionTaskStarted(HistoryEvent eventthrows Throwable {
    }
    private void handleWorkflowExecutionCancelRequested(HistoryEvent eventthrows Throwable {
        .setCancelRequested(true);
         = true;
    }
    private void handleStartTimerFailed(HistoryEvent event) {
        String timerId = attributes.getTimerId();
        if (timerId.equals(.)) {
            return;
        }
    }
    private void handleTimerFired(HistoryEvent eventthrows Throwable {
        TimerFiredEventAttributes attributes = event.getTimerFiredEventAttributes();
        String timerId = attributes.getTimerId();
        if (timerId.equals(.)) {
            return;
        }
        .handleTimerFired(event.getEventId(), attributes);
    }
    private void handleTimerStarted(HistoryEvent event) {
        TimerStartedEventAttributes attributes = event.getTimerStartedEventAttributes();
        String timerId = attributes.getTimerId();
        if (timerId.equals(.)) {
            return;
        }
        .handleTimerStarted(event);
    }
    private void handleWorkflowExecutionSignaled(HistoryEvent eventthrows Throwable {
        assert (event.getEventType().equals(..toString()));
        if () {
                    .isCancelRequested());
             = false;
        }
        // This task is attached to the root context of the workflowAsyncScope
        new Task() {
            @Override
            protected void doExecute() throws Throwable {
                .signalRecieved(signalAttributes.getSignalName(), signalAttributes.getInput());
            }
        };
    }
    private void handleDecisionTaskCompleted(HistoryEvent event) {
    }
    public void decide() throws Exception {
        try {
            if ( == null) {
                throw new IllegalStateException("Unknown workflow type: " + .getWorkflowContext().getWorkflowType());
            }
            long lastNonReplayedEventId = .getLastNonReplayEventId();
            // Buffer events until the next DecisionTaskStarted and then process them
            // setting current time to the time of DecisionTaskStarted event
            EventsIterator eventsIterator = .getEvents();
            List<HistoryEventreordered = null;
            do {
                List<HistoryEventdecisionStartToCompletionEvents = new ArrayList<HistoryEvent>();
                List<HistoryEventdecisionCompletionToStartEvents = new ArrayList<HistoryEvent>();
                boolean concurrentToDecision = true;
                int lastDecisionIndex = -1;
                while (eventsIterator.hasNext()) {
                    HistoryEvent event = eventsIterator.next();
                    EventType eventType = EventType.valueOf(event.getEventType());
                    if (eventType == .) {
                        concurrentToDecision = false;
                    }
                    else if (eventType == .) {
                        .handleDecisionTaskStartedEvent();
                        if (!eventsIterator.isNextDecisionTimedOut()) {
                            long replayCurrentTimeMilliseconds = event.getEventTimestamp().getTime();
                            .setReplayCurrentTimeMilliseconds(replayCurrentTimeMilliseconds);
                            break;
                        }
                    }
                    else if (eventType == . || eventType == .) {
                        // skip
                    }
                    else {
                        if (concurrentToDecision) {
                            decisionStartToCompletionEvents.add(event);
                        }
                        else {
                            if (isDecisionEvent(eventType)) {
                                lastDecisionIndex = decisionCompletionToStartEvents.size();
                            }
                            decisionCompletionToStartEvents.add(event);
                        }
                    }
                }
                int size = decisionStartToCompletionEvents.size() + decisionStartToCompletionEvents.size();
                // Reorder events to correspond to the order that decider sees them. 
                // The main difference is that events that were added during decision task execution 
                // should be processed after events that correspond to the decisions. 
                // Otherwise the replay is going to break.
                reordered = new ArrayList<HistoryEvent>(size);
                // First are events that correspond to the previous task decisions
                if (lastDecisionIndex >= 0) {
                    reordered.addAll(decisionCompletionToStartEvents.subList(0, lastDecisionIndex + 1));
                }
                // Second are events that were added during previous task execution
                reordered.addAll(decisionStartToCompletionEvents);
                // The last are events that were added after previous task completion
                if (decisionCompletionToStartEvents.size() > lastDecisionIndex + 1) {
                    reordered.addAll(decisionCompletionToStartEvents.subList(lastDecisionIndex + 1,
                            decisionCompletionToStartEvents.size()));
                }
                for (HistoryEvent event : reordered) {
                    if (event.getEventId() >= lastNonReplayedEventId) {
                        .setReplaying(false);
                    }
                    EventType eventType = EventType.valueOf(event.getEventType());
                    processEvent(eventeventType);
                    eventLoop(event);
                }
                completeWorkflow();
            }
            while (eventsIterator.hasNext());
            if () {
                 = false;
                completeWorkflow();
            }
        }
        catch (AmazonServiceException e) {
            // We don't want to fail workflow on service exceptions like 500 or throttling
            // Throwing from here drops decision task which is OK as it is rescheduled after its StartToClose timeout.
            if (e.getErrorType() == . && !"ThrottlingException".equals(e.getErrorCode())) {
                if (.isErrorEnabled()) {
                    .error("Failing workflow " + .getWorkflowExecution(), e);
                }
                .failWorkflowDueToUnexpectedError(e);
            }
            else {
                throw e;
            }
        }
        catch (Throwable e) {
            if (.isErrorEnabled()) {
                .error("Failing workflow " + .getWorkflowExecution(), e);
            }
        }
        finally {
            try {
            }
            catch (WorkflowException e) {
                .setWorkflowContextData(e.getDetails());
            }
            catch (Throwable e) {
                .setWorkflowContextData(e.getMessage());
            }
        }
    }
    private boolean isDecisionEvent(EventType eventType) {
        switch (eventType) {
        case :
        case :
        case :
        case :
        case :
        case :
        case :
        case :
        case :
        case :
        case :
        case :
        case :
        case :
        case :
        case :
        case :
            return true;
        default:
            return false;
        }
    }
    }
    }
    private void checkAsynchronousThreadDumpState() {
        if ( == null) {
            throw new IllegalStateException("workflow hasn't started yet");
        }
        if (.isWorkflowFailed()) {
            throw new IllegalStateException("Cannot get AsynchronousThreadDump of a failed workflow",
                    .getWorkflowFailureCause());
        }
    }
        return ;
    }
        return ;
    }
New to GrepCode? Check out our FAQ X