Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.execution;
 
 
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
 import static com.google.common.base.MoreObjects.toStringHelper;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static java.lang.Math.max;
 import static java.lang.String.format;
 
 public class SqlTaskExecution
 {
     private final TaskId taskId;
     private final TaskStateMachine taskStateMachine;
     private final TaskContext taskContext;
     private final SharedBuffer sharedBuffer;
 
     private final TaskHandle taskHandle;
     private final TaskExecutor taskExecutor;
 
     private final Executor notificationExecutor;
 
     private final QueryMonitor queryMonitor;
 
     private final List<WeakReference<Driver>> drivers = new CopyOnWriteArrayList<>();

    
Number of drivers that have been sent to the TaskExecutor that have not finished.
 
     private final AtomicInteger remainingDrivers = new AtomicInteger();
 
     // guarded for update only
     @GuardedBy("this")
     private final ConcurrentMap<PlanNodeIdTaskSourceunpartitionedSources = new ConcurrentHashMap<>();
 
     @GuardedBy("this")
     private long maxAcknowledgedSplit = .;
 
     private final PlanNodeId partitionedSourceId;
            TaskStateMachine taskStateMachine,
            TaskContext taskContext,
            SharedBuffer sharedBuffer,
            PlanFragment fragment,
            List<TaskSourcesources,
            LocalExecutionPlanner planner,
            TaskExecutor taskExecutor,
            Executor notificationExecutor,
            QueryMonitor queryMonitor)
    {
        SqlTaskExecution task = new SqlTaskExecution(
                taskStateMachinetaskContextsharedBufferfragment,
                planner,
                taskExecutor,
                queryMonitor,
                notificationExecutor
        );
        try (SetThreadName ignored = new SetThreadName("Task-%s"task.getTaskId())) {
            task.start();
            task.addSources(sources);
            return task;
        }
    }
    private SqlTaskExecution(
            TaskStateMachine taskStateMachine,
            TaskContext taskContext,
            SharedBuffer sharedBuffer,
            PlanFragment fragment,
            LocalExecutionPlanner planner,
            TaskExecutor taskExecutor,
            QueryMonitor queryMonitor,
            Executor notificationExecutor)
    {
        this. = checkNotNull(taskStateMachine"taskStateMachine is null");
        this. = taskStateMachine.getTaskId();
        this. = checkNotNull(taskContext"taskContext is null");
        this. = checkNotNull(sharedBuffer"sharedBuffer is null");
        this. = checkNotNull(taskExecutor"driverExecutor is null");
        this. = checkNotNull(notificationExecutor"notificationExecutor is null");
        this. = checkNotNull(queryMonitor"queryMonitor is null");
        try (SetThreadName ignored = new SetThreadName("Task-%s")) {
            List<DriverFactorydriverFactories;
            try {
                OutputFactory outputOperatorFactory;
                if (fragment.getOutputPartitioning() == .) {
                    outputOperatorFactory = new TaskOutputFactory(sharedBuffer);
                }
                else if (fragment.getOutputPartitioning() == .) {
                    outputOperatorFactory = new PartitionedOutputFactory(sharedBuffer);
                }
                else {
                    throw new PrestoException(format("OutputPartitioning %s is not supported"fragment.getOutputPartitioning()));
                }
                LocalExecutionPlan localExecutionPlan = planner.plan(
                        taskContext.getSession(),
                        fragment.getRoot(),
                        fragment.getOutputLayout(),
                        fragment.getSymbols(),
                        fragment.getDistribution(),
                        outputOperatorFactory);
                driverFactories = localExecutionPlan.getDriverFactories();
            }
            catch (Throwable e) {
                // planning failed
                taskStateMachine.failed(e);
                throw Throwables.propagate(e);
            }
            // index driver factories
            DriverSplitRunnerFactory partitionedDriverFactory = null;
            ImmutableList.Builder<DriverSplitRunnerFactoryunpartitionedDriverFactories = ImmutableList.builder();
            for (DriverFactory driverFactory : driverFactories) {
                if (driverFactory.getSourceIds().contains(fragment.getPartitionedSource())) {
                    checkState(partitionedDriverFactory == null"multiple partitioned sources are not supported");
                    partitionedDriverFactory = new DriverSplitRunnerFactory(driverFactory);
                }
                else {
                    unpartitionedDriverFactories.add(new DriverSplitRunnerFactory(driverFactory));
                }
            }
            this. = unpartitionedDriverFactories.build();
            if (fragment.getDistribution() == .) {
                checkArgument(partitionedDriverFactory != null"Fragment is partitioned, but no partitioned driver found");
            }
            this. = fragment.getPartitionedSource();
            this. = partitionedDriverFactory;
            // don't register the task if it is already completed (most likely failed during planning above)
            if (!taskStateMachine.getState().isDone()) {
                 = taskExecutor.addTask();
                taskStateMachine.addStateChangeListener(new RemoveTaskHandleWhenDone(taskExecutor));
            }
            else {
                 = null;
            }
            sharedBuffer.addStateChangeListener(new CheckTaskCompletionOnBufferFinish(SqlTaskExecution.this));
        }
    }
    //
    // This code starts registers a callback with access to this class, and this
    // call back is access from another thread, so this code can not be placed in the constructor
    private void start()
    {
        // start unpartitioned drivers
        List<DriverSplitRunnerrunners = new ArrayList<>();
        for (DriverSplitRunnerFactory driverFactory : ) {
            for (int i = 0; i < driverFactory.getDriverInstances(); i++) {
                runners.add(driverFactory.createDriverRunner(nullfalse));
            }
            driverFactory.setNoMoreSplits();
        }
        enqueueDrivers(truerunners);
    }
    public TaskId getTaskId()
    {
        return ;
    }
    public TaskContext getTaskContext()
    {
        return ;
    }
    public void addSources(List<TaskSourcesources)
    {
        checkNotNull(sources"sources is null");
        checkState(!Thread.holdsLock(this), "Can not add sources while holding a lock on the %s"getClass().getSimpleName());
        try (SetThreadName ignored = new SetThreadName("Task-%s")) {
            // update our record of sources and schedule drivers for new partitioned splits
            Map<PlanNodeIdTaskSourceupdatedUnpartitionedSources = updateSources(sources);
            // tell existing drivers about the new splits; it is safe to update drivers
            // multiple times and out of order because sources contain full record of
            // the unpartitioned splits
            for (TaskSource source : updatedUnpartitionedSources.values()) {
                // tell all the existing drivers this source is finished
                for (WeakReference<DriverdriverReference : ) {
                    Driver driver = driverReference.get();
                    // the driver can be GCed due to a failure or a limit
                    if (driver != null) {
                        driver.updateSource(source);
                    }
                    else {
                        // remove the weak reference from the list to avoid a memory leak
                        // NOTE: this is a concurrent safe operation on a CopyOnWriteArrayList
                        .remove(driverReference);
                    }
                }
            }
            // we may have transitioned to no more splits, so check for completion
            checkTaskCompletion();
        }
    }
    private synchronized Map<PlanNodeIdTaskSourceupdateSources(List<TaskSourcesources)
    {
        Map<PlanNodeIdTaskSourceupdatedUnpartitionedSources = new HashMap<>();
        // don't update maxAcknowledgedSplit until the end because task sources may not
        // be in sorted order and if we updated early we could skip splits
        long newMaxAcknowledgedSplit = ;
        for (TaskSource source : sources) {
            PlanNodeId sourceId = source.getPlanNodeId();
            if (sourceId.equals()) {
                // partitioned split
                ImmutableList.Builder<DriverSplitRunnerrunners = ImmutableList.builder();
                for (ScheduledSplit scheduledSplit : source.getSplits()) {
                    // only add a split if we have not already scheduled it
                    if (scheduledSplit.getSequenceId() > ) {
                        // create a new driver for the split
                        runners.add(.createDriverRunner(scheduledSplittrue));
                        newMaxAcknowledgedSplit = max(scheduledSplit.getSequenceId(), newMaxAcknowledgedSplit);
                    }
                }
                enqueueDrivers(falserunners.build());
                if (source.isNoMoreSplits()) {
                    .setNoMoreSplits();
                }
            }
            else {
                // unpartitioned split
                // update newMaxAcknowledgedSplit
                for (ScheduledSplit scheduledSplit : source.getSplits()) {
                    newMaxAcknowledgedSplit = max(scheduledSplit.getSequenceId(), newMaxAcknowledgedSplit);
                }
                // create new source
                TaskSource newSource;
                TaskSource currentSource = .get(sourceId);
                if (currentSource == null) {
                    newSource = source;
                }
                else {
                    newSource = currentSource.update(source);
                }
                // only record new source if something changed
                if (newSource != currentSource) {
                    .put(sourceIdnewSource);
                    updatedUnpartitionedSources.put(sourceIdnewSource);
                }
            }
        }
         = newMaxAcknowledgedSplit;
        return updatedUnpartitionedSources;
    }
    private synchronized void enqueueDrivers(boolean forceRunSplitList<DriverSplitRunnerrunners)
    {
        // schedule driver to be executed
        List<ListenableFuture<?>> finishedFutures = .enqueueSplits(forceRunSplitrunners);
        checkState(finishedFutures.size() == runners.size(), "Expected %s futures but got %s"runners.size(), finishedFutures.size());
        // record new driver
        .addAndGet(finishedFutures.size());
        // when driver completes, update state and fire events
        for (int i = 0; i < finishedFutures.size(); i++) {
            ListenableFuture<?> finishedFuture = finishedFutures.get(i);
            final DriverSplitRunner splitRunner = runners.get(i);
            Futures.addCallback(finishedFuturenew FutureCallback<Object>()
            {
                @Override
                public void onSuccess(Object result)
                {
                    try (SetThreadName ignored = new SetThreadName("Task-%s")) {
                        // record driver is finished
                        .decrementAndGet();
                        checkTaskCompletion();
                        .splitCompletionEvent(getDriverStats());
                    }
                }
                @Override
                public void onFailure(Throwable cause)
                {
                    try (SetThreadName ignored = new SetThreadName("Task-%s")) {
                        .failed(cause);
                        // record driver is finished
                        .decrementAndGet();
                        // fire failed event with cause
                        .splitFailedEvent(getDriverStats(), cause);
                    }
                }
                private DriverStats getDriverStats()
                {
                    DriverContext driverContext = splitRunner.getDriverContext();
                    DriverStats driverStats;
                    if (driverContext != null) {
                        driverStats = driverContext.getDriverStats();
                    }
                    else {
                        // split runner did not start successfully
                        driverStats = new DriverStats();
                    }
                    return driverStats;
                }
            }, );
        }
    }
    public Set<PlanNodeIdgetNoMoreSplits()
    {
        ImmutableSet.Builder<PlanNodeIdnoMoreSplits = ImmutableSet.builder();
        if ( != null && .isNoMoreSplits()) {
            noMoreSplits.add();
        }
        for (TaskSource taskSource : .values()) {
            if (taskSource.isNoMoreSplits()) {
                noMoreSplits.add(taskSource.getPlanNodeId());
            }
        }
        return noMoreSplits.build();
    }
    private synchronized void checkTaskCompletion()
    {
        if (.getState().isDone()) {
            return;
        }
        // are there more partition splits expected?
        if ( != null && !.isNoMoreSplits()) {
            return;
        }
        // do we still have running tasks?
        if (.get() != 0) {
            return;
        }
        // no more output will be created
        .setNoMorePages();
        // are there still pages in the output buffer
        if (!.isFinished()) {
            return;
        }
        // Cool! All done!
        .finished();
    }
    public void cancel()
    {
        // todo this should finish all input sources and let the task finish naturally
        try (SetThreadName ignored = new SetThreadName("Task-%s")) {
            .cancel();
        }
    }
    public void fail(Throwable cause)
    {
        try (SetThreadName ignored = new SetThreadName("Task-%s")) {
            .failed(cause);
        }
    }
    @Override
    public String toString()
    {
        return toStringHelper(this)
                .add("taskId")
                .add("remainingDrivers")
                .add("unpartitionedSources")
                .toString();
    }
    private class DriverSplitRunnerFactory
    {
        private final DriverFactory driverFactory;
        private final PipelineContext pipelineContext;
        private final AtomicInteger pendingCreation = new AtomicInteger();
        private final AtomicBoolean noMoreSplits = new AtomicBoolean();
        private DriverSplitRunnerFactory(DriverFactory driverFactory)
        {
            this. = driverFactory;
            this. = .addPipelineContext(driverFactory.isInputDriver(), driverFactory.isOutputDriver());
        }
        private DriverSplitRunner createDriverRunner(@Nullable ScheduledSplit partitionedSplitboolean partitioned)
        {
            .incrementAndGet();
            // create driver context immediately so the driver existence is recorded in the stats
            // the number of drivers is used to balance work across nodes
            DriverContext driverContext = .addDriverContext(partitioned);
            return new DriverSplitRunner(thisdriverContextpartitionedSplit);
        }
        private Driver createDriver(DriverContext driverContext, @Nullable ScheduledSplit partitionedSplit)
        {
            Driver driver = .createDriver(driverContext);
            // record driver so other threads add unpartitioned sources can see the driver
            // NOTE: this MUST be done before reading unpartitionedSources, so we see a consistent view of the unpartitioned sources
            .add(new WeakReference<>(driver));
            if (partitionedSplit != null) {
                // TableScanOperator requires partitioned split to be added before the first call to process
                driver.updateSource(new TaskSource(, ImmutableSet.of(partitionedSplit), true));
            }
            // add unpartitioned sources
            for (TaskSource source : .values()) {
                driver.updateSource(source);
            }
            .decrementAndGet();
            closeDriverFactoryIfFullyCreated();
            return driver;
        }
        private boolean isNoMoreSplits()
        {
            return .get();
        }
        private void setNoMoreSplits()
        {
            .set(true);
            closeDriverFactoryIfFullyCreated();
        }
        private void closeDriverFactoryIfFullyCreated()
        {
            if (isNoMoreSplits() && .get() <= 0) {
                .close();
            }
        }
        public int getDriverInstances()
        {
            return .getDriverInstances();
        }
    }
    private static class DriverSplitRunner
            implements SplitRunner
    {
        private final DriverSplitRunnerFactory driverSplitRunnerFactory;
        private final DriverContext driverContext;
        @GuardedBy("this")
        private boolean closed;
        @Nullable
        private final ScheduledSplit partitionedSplit;
        @GuardedBy("this")
        private Driver driver;
        private DriverSplitRunner(DriverSplitRunnerFactory driverSplitRunnerFactoryDriverContext driverContext, @Nullable ScheduledSplit partitionedSplit)
        {
            this. = checkNotNull(driverSplitRunnerFactory"driverFactory is null");
            this. = checkNotNull(driverContext"driverContext is null");
            this. = partitionedSplit;
        }
        public synchronized DriverContext getDriverContext()
        {
            if ( == null) {
                return null;
            }
            return .getDriverContext();
        }
        @Override
        public synchronized boolean isFinished()
        {
            if () {
                return true;
            }
            if ( == null) {
                return false;
            }
            return .isFinished();
        }
        @Override
        public ListenableFuture<?> processFor(Duration duration)
        {
            Driver driver;
            synchronized (this) {
                // if close() was called before we get here, there's not point in even creating the driver
                if () {
                    return Futures.immediateFuture(null);
                }
                if (this. == null) {
                    this. = .createDriver();
                }
                driver = this.;
            }
            return driver.processFor(duration);
        }
        @Override
        public void close()
        {
            Driver driver;
            synchronized (this) {
                 = true;
                driver = this.;
            }
            if (driver != null) {
                driver.close();
            }
        }
    }
    private static final class RemoveTaskHandleWhenDone
            implements StateChangeListener<TaskState>
    {
        private final TaskExecutor taskExecutor;
        private final TaskHandle taskHandle;
        private RemoveTaskHandleWhenDone(TaskExecutor taskExecutorTaskHandle taskHandle)
        {
            this. = checkNotNull(taskExecutor"taskExecutor is null");
            this. = checkNotNull(taskHandle"taskHandle is null");
        }
        @Override
        public void stateChanged(TaskState newState)
        {
            if (newState.isDone()) {
                .removeTask();
            }
        }
    }
    private static final class CheckTaskCompletionOnBufferFinish
            implements StateChangeListener<BufferState>
    {
        public CheckTaskCompletionOnBufferFinish(SqlTaskExecution sqlTaskExecution)
        {
            // we are only checking for completion of the task, so don't hold up GC if the task is dead
            this. = new WeakReference<>(sqlTaskExecution);
        }
        @Override
        public void stateChanged(BufferState newState)
        {
            if (newState == .) {
                SqlTaskExecution sqlTaskExecution = .get();
                if (sqlTaskExecution != null) {
                    sqlTaskExecution.checkTaskCompletion();
                }
            }
        }
    }
New to GrepCode? Check out our FAQ X