Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
   /*
    * Licensed under the Apache License, Version 2.0 (the "License");
    * you may not use this file except in compliance with the License.
    * You may obtain a copy of the License at
    *
    *     http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  package com.facebook.presto.execution;
  
  
  
  import java.net.URI;
  import java.util.HashSet;
  import java.util.List;
  import java.util.Map;
  import java.util.Objects;
  import java.util.Set;
  
  import static com.facebook.presto.OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS;
  import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE;
  import static com.facebook.presto.util.Failures.checkCondition;
  import static com.facebook.presto.util.Failures.toFailures;
  import static com.facebook.presto.util.ImmutableCollectors.toImmutableList;
  import static com.google.common.base.MoreObjects.toStringHelper;
  import static com.google.common.base.Preconditions.checkArgument;
  import static com.google.common.base.Preconditions.checkNotNull;
  import static com.google.common.base.Preconditions.checkState;
  import static com.google.common.base.Predicates.equalTo;
  import static com.google.common.collect.Iterables.all;
  import static com.google.common.collect.Iterables.any;
  import static com.google.common.collect.Iterables.transform;
  import static com.google.common.collect.Sets.newConcurrentHashSet;
  import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
  import static io.airlift.units.DataSize.Unit.BYTE;
  import static java.util.concurrent.TimeUnit.NANOSECONDS;
  
  public final class SqlStageExecution
         implements StageExecutionNode
 {
     private static final Logger log = Logger.get(SqlStageExecution.class);
 
     // NOTE: DO NOT call methods on the parent while holding a lock on the child.  Locks
     // are always acquired top down in the tree, so calling a method on the parent while
     // holding a lock on the 'this' could cause a deadlock.
     // This is only here to aid in debugging
     @Nullable
     @SuppressWarnings({"FieldCanBeLocal""unused"})
     private final StageExecutionNode parent;
     private final StageId stageId;
     private final URI location;
     private final PlanFragment fragment;
     private final Set<PlanNodeIdallSources;
     private final Map<PlanFragmentIdStageExecutionNodesubStages;
 
     private final Multimap<NodeTaskIdlocalNodeTaskMap = HashMultimap.create();
     private final ConcurrentMap<TaskIdRemoteTasktasks = new ConcurrentHashMap<>();
 
     private final Optional<SplitSourcedataSource;
     private final RemoteTaskFactory remoteTaskFactory;
     private final Session session// only used for remote task factory
     private final int splitBatchSize;
 
     private final int initialHashPartitions;
 
     private final StateMachine<StageStatestageState;
 
     private final LinkedBlockingQueue<ThrowablefailureCauses = new LinkedBlockingQueue<>();
 
     private final Set<PlanNodeIdcompleteSources = newConcurrentHashSet();
 
     @GuardedBy("this")
     @GuardedBy("this")
     private OutputBuffers nextOutputBuffers;
 
     private final ExecutorService executor;
 
     private final AtomicReference<DateTimeschedulingComplete = new AtomicReference<>();
 
     private final Distribution getSplitDistribution = new Distribution();
     private final Distribution scheduleTaskDistribution = new Distribution();
     private final Distribution addSplitDistribution = new Distribution();
 
     private final NodeSelector nodeSelector;
     private final NodeTaskMap nodeTaskMap;
 
     // Note: atomic is needed to assure thread safety between constructor and scheduler thread
     private final AtomicReference<Multimap<PlanNodeIdURI>> exchangeLocations = new AtomicReference<>(ImmutableMultimap.<PlanNodeIdURI>of());
 
     public SqlStageExecution(QueryId queryId,
             LocationFactory locationFactory,
             StageExecutionPlan plan,
             NodeScheduler nodeScheduler,
             RemoteTaskFactory remoteTaskFactory,
             Session session,
             int splitBatchSize,
             int initialHashPartitions,
             ExecutorService executor,
             NodeTaskMap nodeTaskMap,
             OutputBuffers nextOutputBuffers)
     {
         this(null,
                 queryId,
                 new AtomicInteger(),
                 locationFactory,
                 plan,
                 nodeScheduler,
                 remoteTaskFactory,
                 session,
                 splitBatchSize,
                 initialHashPartitions,
                 executor,
                 nodeTaskMap);
 
         // add a single output buffer
         this. = nextOutputBuffers;
     }
 
     private SqlStageExecution(@Nullable StageExecutionNode parent,
             QueryId queryId,
             AtomicInteger nextStageId,
             LocationFactory locationFactory,
             StageExecutionPlan plan,
             NodeScheduler nodeScheduler,
             RemoteTaskFactory remoteTaskFactory,
             Session session,
             int splitBatchSize,
             int initialHashPartitions,
             ExecutorService executor,
             NodeTaskMap nodeTaskMap)
     {
         checkNotNull(queryId"queryId is null");
         checkNotNull(nextStageId"nextStageId is null");
         checkNotNull(locationFactory"locationFactory is null");
         checkNotNull(plan"plan is null");
         checkNotNull(nodeScheduler"nodeScheduler is null");
         checkNotNull(remoteTaskFactory"remoteTaskFactory is null");
         checkNotNull(session"session is null");
         checkArgument(initialHashPartitions > 0, "initialHashPartitions must be greater than 0");
         checkNotNull(executor"executor is null");
         checkNotNull(nodeTaskMap"nodeTaskMap is null");
 
         this. = new StageId(queryId, String.valueOf(nextStageId.getAndIncrement()));
         try (SetThreadName ignored = new SetThreadName("Stage-%s")) {
             this. = parent;
             this. = locationFactory.createStageLocation();
             this. = plan.getFragment();
             this. = plan.getDataSource();
             this. = remoteTaskFactory;
             this. = session;
             this. = splitBatchSize;
             this. = initialHashPartitions;
             this. = executor;
 
             this. = Stream.concat(
                     Stream.of(.getPartitionedSource()),
                     .getRemoteSourceNodes().stream()
                             .map(RemoteSourceNode::getId))
                     .filter(Objects::nonNull)
                     .collect(Collectors.toSet());
 
             ImmutableMap.Builder<PlanFragmentIdStageExecutionNodesubStages = ImmutableMap.builder();
             for (StageExecutionPlan subStagePlan : plan.getSubStages()) {
                 PlanFragmentId subStageFragmentId = subStagePlan.getFragment().getId();
                 StageExecutionNode subStage = new SqlStageExecution(this,
                         queryId,
                         nextStageId,
                         locationFactory,
                         subStagePlan,
                         nodeScheduler,
                         remoteTaskFactory,
                         session,
                         splitBatchSize,
                         initialHashPartitions,
                         executor,
                         nodeTaskMap);
 
                 subStage.addStateChangeListener(stageState -> doUpdateState());
 
                 subStages.put(subStageFragmentIdsubStage);
             }
             this. = subStages.build();
 
             String dataSourceName = .isPresent() ? .get().getDataSourceName() : null;
             this. = nodeScheduler.createNodeSelector(dataSourceName);
             this. = nodeTaskMap;
              = new StateMachine<>("stage " + this..);
             .addStateChangeListener(state -> .debug("Stage %s is %s"state));
         }
     }
 
     @Override
     public void cancelStage(StageId stageId)
     {
         try (SetThreadName ignored = new SetThreadName("Stage-%s"stageId)) {
             if (stageId.equals(this.)) {
                 cancel();
             }
             else {
                 for (StageExecutionNode subStage : .values()) {
                     subStage.cancelStage(stageId);
                 }
             }
         }
     }
 
     @Override
     public StageState getState()
     {
         try (SetThreadName ignored = new SetThreadName("Stage-%s")) {
             return .get();
         }
     }
 
     @Override
     public long getTotalMemoryReservation()
     {
         long memory = 0;
         for (RemoteTask task : .values()) {
             memory += task.getTaskInfo().getStats().getMemoryReservation().toBytes();
         }
         for (StageExecutionNode subStage : .values()) {
             memory += subStage.getTotalMemoryReservation();
         }
         return memory;
     }
 
     @Override
     public StageInfo getStageInfo()
     {
         try (SetThreadName ignored = new SetThreadName("Stage-%s")) {
             // stage state must be captured first in order to provide a
             // consistent view of the stage For example, building this
             // information, the stage could finish, and the task states would
             // never be visible.
             StageState state = .get();
 
             List<TaskInfotaskInfos = .values().stream()
                     .map(RemoteTask::getTaskInfo)
                     .collect(toImmutableList());
 
             List<StageInfosubStageInfos = .values().stream()
                     .map(StageExecutionNode::getStageInfo)
                     .collect(toImmutableList());
 
             int totalTasks = taskInfos.size();
             int runningTasks = 0;
             int completedTasks = 0;
 
             int totalDrivers = 0;
             int queuedDrivers = 0;
             int runningDrivers = 0;
             int completedDrivers = 0;
 
             long totalMemoryReservation = 0;
 
             long totalScheduledTime = 0;
             long totalCpuTime = 0;
             long totalUserTime = 0;
             long totalBlockedTime = 0;
 
             long rawInputDataSize = 0;
             long rawInputPositions = 0;
 
             long processedInputDataSize = 0;
             long processedInputPositions = 0;
 
             long outputDataSize = 0;
             long outputPositions = 0;
 
             for (TaskInfo taskInfo : taskInfos) {
                 if (taskInfo.getState().isDone()) {
                     completedTasks++;
                 }
                 else {
                     runningTasks++;
                 }
 
                 TaskStats taskStats = taskInfo.getStats();
 
                 totalDrivers += taskStats.getTotalDrivers();
                 queuedDrivers += taskStats.getQueuedDrivers();
                 runningDrivers += taskStats.getRunningDrivers();
                 completedDrivers += taskStats.getCompletedDrivers();
 
                 totalMemoryReservation += taskStats.getMemoryReservation().toBytes();
 
                 totalScheduledTime += taskStats.getTotalScheduledTime().roundTo();
                 totalCpuTime += taskStats.getTotalCpuTime().roundTo();
                 totalUserTime += taskStats.getTotalUserTime().roundTo();
                 totalBlockedTime += taskStats.getTotalBlockedTime().roundTo();
 
                 rawInputDataSize += taskStats.getRawInputDataSize().toBytes();
                 rawInputPositions += taskStats.getRawInputPositions();
 
                 processedInputDataSize += taskStats.getProcessedInputDataSize().toBytes();
                 processedInputPositions += taskStats.getProcessedInputPositions();
 
                 outputDataSize += taskStats.getOutputDataSize().toBytes();
                 outputPositions += taskStats.getOutputPositions();
             }
 
             StageStats stageStats = new StageStats(
                     .get(),
                     .snapshot(),
                     .snapshot(),
                     .snapshot(),
 
                     totalTasks,
                     runningTasks,
                     completedTasks,
 
                     totalDrivers,
                     queuedDrivers,
                     runningDrivers,
                     completedDrivers,
 
                     new DataSize(totalMemoryReservation).convertToMostSuccinctDataSize(),
                     new Duration(totalScheduledTime).convertToMostSuccinctTimeUnit(),
                     new Duration(totalCpuTime).convertToMostSuccinctTimeUnit(),
                     new Duration(totalUserTime).convertToMostSuccinctTimeUnit(),
                     new Duration(totalBlockedTime).convertToMostSuccinctTimeUnit(),
                     new DataSize(rawInputDataSize).convertToMostSuccinctDataSize(),
                     rawInputPositions,
                     new DataSize(processedInputDataSize).convertToMostSuccinctDataSize(),
                     processedInputPositions,
                     new DataSize(outputDataSize).convertToMostSuccinctDataSize(),
                     outputPositions);
 
             return new StageInfo(,
                     state,
                     ,
                     ,
                     .getTypes(),
                     stageStats,
                     taskInfos,
                     subStageInfos,
                     toFailures());
         }
     }
 
     @Override
     public synchronized void parentTasksAdded(List<TaskIdparentTasksboolean noMoreParentNodes)
     {
         checkNotNull(parentTasks"parentTasks is null");
 
         // get the current buffers
         OutputBuffers startingOutputBuffers =  != null ?  : ;
 
         // add new buffers
         OutputBuffers newOutputBuffers;
         if (.getOutputPartitioning() == .) {
             ImmutableMap.Builder<TaskIdPagePartitionFunctionnewBuffers = ImmutableMap.builder();
             for (TaskId taskId : parentTasks) {
                 newBuffers.put(taskIdnew UnpartitionedPagePartitionFunction());
             }
             newOutputBuffers = startingOutputBuffers.withBuffers(newBuffers.build());
 
             // no more flag
             if (noMoreParentNodes) {
                 newOutputBuffers = newOutputBuffers.withNoMoreBufferIds();
             }
         }
         else if (.getOutputPartitioning() == .) {
             checkArgument(noMoreParentNodes"Hash partitioned output requires all parent nodes be added in a single call");
 
             ImmutableMap.Builder<TaskIdPagePartitionFunctionbuffers = ImmutableMap.builder();
             for (int nodeIndex = 0; nodeIndex < parentTasks.size(); nodeIndex++) {
                 TaskId taskId = parentTasks.get(nodeIndex);
                 buffers.put(taskIdnew HashPagePartitionFunction(nodeIndexparentTasks.size(), getPartitioningChannels(), getHashChannel(), .getTypes()));
             }
 
             newOutputBuffers = startingOutputBuffers
                     .withBuffers(buffers.build())
                     .withNoMoreBufferIds();
         }
         else {
             throw new UnsupportedOperationException("Unsupported output partitioning " + .getOutputPartitioning());
         }
 
         // only notify scheduler and tasks if the buffers changed
         if (newOutputBuffers.getVersion() != startingOutputBuffers.getVersion()) {
             this. = newOutputBuffers;
             this.notifyAll();
         }
     }
 
     public synchronized OutputBuffers getCurrentOutputBuffers()
     {
         return ;
     }
 
     public synchronized OutputBuffers updateToNextOutputBuffers()
     {
         if ( == null) {
             return ;
         }
 
          = null;
         this.notifyAll();
         return ;
     }
 
     @Override
     public void addStateChangeListener(StateChangeListener<StageStatestateChangeListener)
     {
         try (SetThreadName ignored = new SetThreadName("Stage-%s")) {
             .addStateChangeListener(stateChangeListener::stateChanged);
         }
     }
 
     {
         Multimap<PlanNodeIdURIexchangeLocations = this..get();
 
         ImmutableMultimap.Builder<PlanNodeIdURInewExchangeLocations = ImmutableMultimap.builder();
         for (RemoteSourceNode remoteSourceNode : .getRemoteSourceNodes()) {
             for (PlanFragmentId planFragmentId : remoteSourceNode.getSourceFragmentIds()) {
                 StageExecutionNode subStage = .get(planFragmentId);
                 checkState(subStage != null"Unknown sub stage %s, known stages %s"planFragmentId.keySet());
 
                 // add new task locations
                 for (URI taskLocation : subStage.getTaskLocations()) {
                     if (!exchangeLocations.containsEntry(remoteSourceNode.getId(), taskLocation)) {
                         newExchangeLocations.putAll(remoteSourceNode.getId(), taskLocation);
                     }
                 }
             }
         }
         return newExchangeLocations.build();
     }
 
     @Override
     public synchronized List<URIgetTaskLocations()
     {
         try (SetThreadName ignored = new SetThreadName("Stage-%s")) {
             ImmutableList.Builder<URIlocations = ImmutableList.builder();
             for (RemoteTask task : .values()) {
                 locations.add(task.getTaskInfo().getSelf());
             }
             return locations.build();
         }
     }
 
     public List<RemoteTaskgetAllTasks()
     {
         return ImmutableList.copyOf(.values());
     }
 
     public List<RemoteTaskgetTasks(Node node)
     {
         return FluentIterable.from(.get(node)).transform(Functions.forMap()).toList();
     }
 
     public Future<?> start()
     {
         try (SetThreadName ignored = new SetThreadName("Stage-%s")) {
             return scheduleStartTasks();
         }
     }
 
     @Override
     public Future<?> scheduleStartTasks()
     {
         try (SetThreadName ignored = new SetThreadName("Stage-%s")) {
             // start sub-stages (starts bottom-up)
             .values().forEach(StageExecutionNode::scheduleStartTasks);
             return .submit(this::startTasks);
         }
     }
 
     private void startTasks()
     {
         try (SetThreadName ignored = new SetThreadName("Stage-%s")) {
             try {
                 checkState(!Thread.holdsLock(this), "Can not start while holding a lock on this");
 
                 // transition to scheduling
                 synchronized (this) {
                     if (!.compareAndSet(..)) {
                         // stage has already been started, has been canceled or has no tasks due to partition pruning
                         return;
                     }
                 }
 
                 // schedule tasks
                 if (.getDistribution() == .) {
                     scheduleFixedNodeCount(1);
                 }
                 else if (.getDistribution() == .) {
                     scheduleFixedNodeCount();
                 }
                 else if (.getDistribution() == .) {
                     scheduleSourcePartitionedNodes();
                 }
                 else if (.getDistribution() == .) {
                     scheduleOnCurrentNode();
                 }
                 else {
                     throw new IllegalStateException("Unsupported partitioning: " + .getDistribution());
                 }
 
                 .set(DateTime.now());
                 .set(.);
 
                 // add the missing exchanges output buffers
                 updateNewExchangesAndBuffers(true);
             }
             catch (Throwable e) {
                 // some exceptions can occur when the query finishes early
                 if (!getState().isDone()) {
                     synchronized (this) {
                         .add(e);
                         .set(.);
                     }
                     .error(e"Error while starting stage %s");
                     abort();
                     if (e instanceof InterruptedException) {
                         Thread.currentThread().interrupt();
                     }
                     throw Throwables.propagate(e);
                 }
                 Throwables.propagateIfInstanceOf(eError.class);
                 .debug(e"Error while starting stage in done query %s");
             }
             finally {
                 doUpdateState();
             }
         }
     }
 
     private void scheduleFixedNodeCount(int nodeCount)
     {
         // create tasks on "nodeCount" random nodes
         List<Nodenodes = .selectRandomNodes(nodeCount);
         checkCondition(!nodes.isEmpty(), "No worker nodes available");
         ImmutableList.Builder<TaskIdtasks = ImmutableList.builder();
         for (int taskId = 0; taskId < nodes.size(); taskId++) {
             Node node = nodes.get(taskId);
             RemoteTask task = scheduleTask(taskIdnode);
             tasks.add(task.getTaskInfo().getTaskId());
         }
 
         // tell sub stages about all nodes and that there will not be more nodes
         for (StageExecutionNode subStage : .values()) {
             subStage.parentTasksAdded(tasks.build(), true);
         }
     }
 
     private void scheduleOnCurrentNode()
     {
         // create task on current node
         Node node = .selectCurrentNode();
         RemoteTask task = scheduleTask(0, node);
 
         // tell sub stages about all nodes and that there will not be more nodes
         for (StageExecutionNode subStage : .values()) {
             subStage.parentTasksAdded(ImmutableList.of(task.getTaskInfo().getTaskId()), true);
         }
     }
 
     private void scheduleSourcePartitionedNodes()
             throws InterruptedException
     {
         AtomicInteger nextTaskId = new AtomicInteger(0);
 
         try (SplitSource splitSource = this..get()) {
             while (!splitSource.isFinished()) {
                 // if query has been canceled, exit cleanly; query will never run regardless
                 if (getState().isDone()) {
                     break;
                 }
 
                 long start = System.nanoTime();
                 Set<SplitpendingSplits = ImmutableSet.copyOf(splitSource.getNextBatch());
                 .add(System.nanoTime() - start);
 
                 while (!pendingSplits.isEmpty() && !getState().isDone()) {
                     Multimap<NodeSplitsplitAssignment = .computeAssignments(pendingSplits.values());
                     pendingSplits = ImmutableSet.copyOf(Sets.difference(pendingSplits, ImmutableSet.copyOf(splitAssignment.values())));
 
                     assignSplits(nextTaskIdsplitAssignment);
 
                     if (!pendingSplits.isEmpty()) {
                         waitForFreeNode(nextTaskId);
                     }
                 }
             }
         }
 
         for (RemoteTask task : .values()) {
             task.noMoreSplits(.getPartitionedSource());
         }
 
         // tell sub stages there will be no more output buffers
         setNoMoreStageNodes();
     }
 
     private void assignSplits(AtomicInteger nextTaskIdMultimap<NodeSplitsplitAssignment)
     {
         for (Entry<NodeCollection<Split>> taskSplits : splitAssignment.asMap().entrySet()) {
             long scheduleSplitStart = System.nanoTime();
             Node node = taskSplits.getKey();
 
             TaskId taskId = Iterables.getOnlyElement(.get(node), null);
             RemoteTask task = taskId != null ? .get(taskId) : null;
             if (task == null) {
                 RemoteTask remoteTask = scheduleTask(nextTaskId.getAndIncrement(), node.getPartitionedSource(), taskSplits.getValue());
 
                 // tell the sub stages to create a buffer for this task
                 addStageNode(remoteTask.getTaskInfo().getTaskId());
 
                 .add(System.nanoTime() - scheduleSplitStart);
             }
             else {
                 task.addSplits(.getPartitionedSource(), taskSplits.getValue());
                 .add(System.nanoTime() - scheduleSplitStart);
             }
         }
     }
 
     private void waitForFreeNode(AtomicInteger nextTaskId)
     {
         // if we have sub stages...
         if (!.isEmpty()) {
             // before we block, we need to create all possible output buffers on the sub stages, or they can deadlock
             // waiting for the "noMoreBuffers" call
             .lockDownNodes();
             for (Node node : Sets.difference(new HashSet<>(.allNodes()), .keySet())) {
                 RemoteTask remoteTask = scheduleTask(nextTaskId.getAndIncrement(), node);
 
                 // tell the sub stages to create a buffer for this task
                 addStageNode(remoteTask.getTaskInfo().getTaskId());
             }
             // tell sub stages there will be no more output buffers
             setNoMoreStageNodes();
         }
 
         synchronized (this) {
             // otherwise wait for some tasks to complete
             try {
                 // todo this adds latency: replace this wait with an event listener
                 ..timedWait(this, 100);
             }
             catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 throw Throwables.propagate(e);
             }
         }
         updateNewExchangesAndBuffers(false);
     }
 
     private void addStageNode(TaskId task)
     {
         for (StageExecutionNode subStage : .values()) {
             subStage.parentTasksAdded(ImmutableList.of(task), false);
         }
     }
 
     private void setNoMoreStageNodes()
     {
         for (StageExecutionNode subStage : .values()) {
             subStage.parentTasksAdded(ImmutableList.<TaskId>of(), true);
         }
     }
 
     private RemoteTask scheduleTask(int idNode node)
     {
         return scheduleTask(idnodenull, ImmutableList.<Split>of());
     }
 
     private RemoteTask scheduleTask(int idNode nodePlanNodeId sourceIdIterable<? extends SplitsourceSplits)
     {
         // before scheduling a new task update all existing tasks with new exchanges and output buffers
         addNewExchangesAndBuffers();
 
         TaskId taskId = new TaskId(, String.valueOf(id));
 
         ImmutableMultimap.Builder<PlanNodeIdSplitinitialSplits = ImmutableMultimap.builder();
         for (Split sourceSplit : sourceSplits) {
             initialSplits.put(sourceIdsourceSplit);
         }
         for (Entry<PlanNodeIdURIentry : .get().entries()) {
             initialSplits.put(entry.getKey(), createRemoteSplitFor(taskIdentry.getValue()));
         }
 
                 taskId,
                 node,
                 ,
                 initialSplits.build(),
                 getCurrentOutputBuffers());
 
         task.addStateChangeListener(taskInfo -> doUpdateState());
 
         // create and update task
         task.start();
 
         // record this task
         .put(task.getTaskInfo().getTaskId(), task);
         .put(nodetask.getTaskInfo().getTaskId());
         .addTask(nodetask);
 
         // update in case task finished before listener was registered
         doUpdateState();
 
         return task;
     }
 
     private void updateNewExchangesAndBuffers(boolean waitUntilFinished)
     {
         checkState(!Thread.holdsLock(this), "Can not add exchanges or buffers to tasks while holding a lock on this");
 
         while (!getState().isDone()) {
             boolean finished = addNewExchangesAndBuffers();
 
             if (finished || !waitUntilFinished) {
                 return;
             }
 
             synchronized (this) {
                 // wait for a state change
                 //
                 // NOTE this must be a wait with a timeout since there is no notification
                 // for new exchanges from the child stages
                 try {
                     ..timedWait(this, 100);
                 }
                 catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     throw Throwables.propagate(e);
                 }
             }
         }
     }
 
     private boolean addNewExchangesAndBuffers()
     {
         // get new exchanges and update exchange state
         Set<PlanNodeIdcompleteSources = updateCompleteSources();
         boolean allSourceComplete = completeSources.containsAll();
         Multimap<PlanNodeIdURInewExchangeLocations = getNewExchangeLocations();
         .set(ImmutableMultimap.<PlanNodeIdURI>builder()
                 .putAll(.get())
                 .putAll(newExchangeLocations)
                 .build());
 
         // get new output buffer and update output buffer state
         OutputBuffers outputBuffers = updateToNextOutputBuffers();
 
         // finished state must be decided before update to avoid race conditions
         boolean finished = allSourceComplete && outputBuffers.isNoMoreBufferIds();
 
         // update tasks
         for (RemoteTask task : .values()) {
             for (Entry<PlanNodeIdURIentry : newExchangeLocations.entries()) {
                 Split remoteSplit = createRemoteSplitFor(task.getTaskInfo().getTaskId(), entry.getValue());
                 task.addSplits(entry.getKey(), ImmutableList.of(remoteSplit));
             }
             task.setOutputBuffers(outputBuffers);
             completeSources.forEach(task::noMoreSplits);
         }
 
         return finished;
     }
 
     private Set<PlanNodeIdupdateCompleteSources()
     {
         for (RemoteSourceNode remoteSourceNode : .getRemoteSourceNodes()) {
             if (!.contains(remoteSourceNode.getId())) {
                 boolean exchangeFinished = true;
                 for (PlanFragmentId planFragmentId : remoteSourceNode.getSourceFragmentIds()) {
                     StageExecutionNode subStage = .get(planFragmentId);
                     switch (subStage.getState()) {
                         case :
                         case :
                             exchangeFinished = false;
                             break;
                     }
                 }
                 if (exchangeFinished) {
                     .add(remoteSourceNode.getId());
                 }
             }
         }
         return ;
     }
 
     @SuppressWarnings("NakedNotify")
     public void doUpdateState()
     {
         checkState(!Thread.holdsLock(this), "Can not doUpdateState while holding a lock on this");
 
         try (SetThreadName ignored = new SetThreadName("Stage-%s")) {
             synchronized (this) {
                 // wake up worker thread waiting for state changes
                 this.notifyAll();
 
                 StageState currentState = .get();
                 if (currentState.isDone()) {
                     return;
                 }
 
                 List<StageStatesubStageStates = ImmutableList.copyOf(transform(.values(), StageExecutionNode::getState));
                 if (subStageStates.stream().anyMatch(StageState::isFailure)) {
                     .set(.);
                 }
                 else {
                     List<TaskStatetaskStates = ImmutableList.copyOf(transform(transform(.values(), RemoteTask::getTaskInfo), TaskInfo::getState));
                     if (any(taskStatesequalTo(.))) {
                         .set(.);
                     }
                     else if (any(taskStatesequalTo(.))) {
                         // A task should only be in the aborted state if the STAGE is done (ABORTED or FAILED)
                         .set(.);
                         .add(new PrestoException(."A task is in the ABORTED state but stage is " + currentState));
                     }
                     else if (currentState != . && currentState != .) {
                         // all tasks are now scheduled, so we can check the finished state
                         if (all(taskStates, TaskState::isDone)) {
                             .set(.);
                         }
                         else if (any(taskStatesequalTo(.))) {
                             .set(.);
                         }
                     }
                 }
             }
 
             // finish tasks and stages if stage is complete
             StageState stageState = this..get();
             if (stageState == .) {
                 abort();
             }
             else if (stageState.isDone()) {
                 cancel();
             }
         }
     }
 
     @Override
     public void cancel()
     {
         checkState(!Thread.holdsLock(this), "Can not cancel while holding a lock on this");
 
         try (SetThreadName ignored = new SetThreadName("Stage-%s")) {
             // check if the stage already completed naturally
             doUpdateState();
             synchronized (this) {
                 if (!.get().isDone()) {
                     .debug("Cancelling stage %s");
                     .set(.);
                 }
             }
 
             // cancel all tasks
             .values().forEach(RemoteTask::cancel);
 
             // propagate cancel to sub-stages
             .values().forEach(StageExecutionNode::cancel);
         }
     }
 
     @Override
     public void abort()
     {
         checkState(!Thread.holdsLock(this), "Can not abort while holding a lock on this");
 
         try (SetThreadName ignored = new SetThreadName("Stage-%s")) {
             // transition to aborted state, only if not already finished
             doUpdateState();
             synchronized (this) {
                 if (!.get().isDone()) {
                     .debug("Aborting stage %s");
                     .set(.);
                 }
             }
 
             // abort all tasks
             .values().forEach(RemoteTask::abort);
 
             // propagate abort to sub-stages
             .values().forEach(StageExecutionNode::abort);
         }
     }
 
     private static Split createRemoteSplitFor(TaskId taskIdURI taskLocation)
     {
         URI splitLocation = uriBuilderFrom(taskLocation).appendPath("results").appendPath(taskId.toString()).build();
         return new Split("remote"new RemoteSplit(splitLocation));
     }
 
     @Override
     public String toString()
     {
         return toStringHelper(this)
                 .add("stageId")
                 .add("location")
                 .add("stageState".get())
                 .toString();
     }
 
     private static Optional<IntegergetHashChannel(PlanFragment fragment)
     {
         return fragment.getHash().map(symbol -> fragment.getOutputLayout().indexOf(symbol));
     }
 
     private static List<IntegergetPartitioningChannels(PlanFragment fragment)
     {
         checkState(fragment.getOutputPartitioning() == ."fragment is not hash partitioned");
         // We can convert the symbols directly into channels, because the root must be a sink and therefore the layout is fixed
         return fragment.getPartitionBy().stream()
                 .map(symbol -> fragment.getOutputLayout().indexOf(symbol))
                 .collect(toImmutableList());
     }
 }
 
 /*
  * Since the execution is a tree of SqlStateExecutions, each stage can directly access
  * the private fields and methods of stages up and down the tree.  To prevent accidental
  * errors, each stage reference parents and children using this interface so direct