Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.operator;
 
 
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import static com.facebook.presto.operator.Operator.NOT_BLOCKED;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 //
 // NOTE:  As a general strategy the methods should "stage" a change and only
 // process the actual change before lock release (DriverLockResult.close()).
 // The assures that only one thread will be working with the operators at a
 // time and state changer threads are not blocked.
 //
 public class Driver
 {
     private static final Logger log = Logger.get(Driver.class);
 
     private final DriverContext driverContext;
     private final List<Operatoroperators;
     private final Map<PlanNodeIdSourceOperatorsourceOperators;
     private final ConcurrentMap<PlanNodeIdTaskSourcenewSources = new ConcurrentHashMap<>();
 
     private final AtomicReference<Statestate = new AtomicReference<>(.);
 
     private final ReentrantLock exclusiveLock = new ReentrantLock();
 
     @GuardedBy("this")
     private Thread lockHolder;
 
     @GuardedBy("exclusiveLock")
     private Map<PlanNodeIdTaskSourcecurrentSources = new ConcurrentHashMap<>();
 
     private enum State
     {
         ALIVE, NEED_DESTRUCTION, DESTROYED
     }
 
     public Driver(DriverContext driverContextOperator firstOperatorOperator... otherOperators)
     {
         this(checkNotNull(driverContext"driverContext is null"),
                 ImmutableList.<Operator>builder()
                         .add(checkNotNull(firstOperator"firstOperator is null"))
                         .add(checkNotNull(otherOperators"otherOperators is null"))
                         .build());
     }
 
     public Driver(DriverContext driverContextList<Operatoroperators)
     {
         this. = checkNotNull(driverContext"driverContext is null");
         this. = ImmutableList.copyOf(checkNotNull(operators"operators is null"));
         checkArgument(!operators.isEmpty(), "There must be at least one operator");
 
         ImmutableMap.Builder<PlanNodeIdSourceOperatorsourceOperators = ImmutableMap.builder();
         for (Operator operator : operators) {
             if (operator instanceof SourceOperator) {
                 SourceOperator sourceOperator = (SourceOperatoroperator;
                 sourceOperators.put(sourceOperator.getSourceId(), sourceOperator);
             }
         }
         this. = sourceOperators.build();
    }
    {
        return ;
    }
    public Set<PlanNodeIdgetSourceIds()
    {
        return .keySet();
    }
    public void close()
    {
        // mark the service for destruction
            return;
        }
        // if we can get the lock, attempt a clean shutdown; otherwise someone else will shutdown
        try (DriverLockResult lockResult = tryLockAndProcessPendingStateChanges(0, .)) {
            // if we did not get the lock, interrupt the lock holder
            if (!lockResult.wasAcquired()) {
                // there is a benign race condition here were the lock holder
                // can be change between attempting to get lock and grabbing
                // the synchronized lock here, but in either case we want to
                // interrupt the lock holder thread
                synchronized (this) {
                    if ( != null) {
                        .interrupt();
                    }
                }
            }
            // clean shutdown is automatically triggered during lock release
        }
    }
    public boolean isFinished()
    {
        checkLockNotHeld("Can not check finished status while holding the driver lock");
        // if we can get the lock, attempt a clean shutdown; otherwise someone else will shutdown
        try (DriverLockResult lockResult = tryLockAndProcessPendingStateChanges(0, .)) {
            if (lockResult.wasAcquired()) {
                return isFinishedInternal();
            }
            else {
                // did not get the lock, so we can't check operators, or destroy
                return .get() != . || .isDone();
            }
        }
    }
    private boolean isFinishedInternal()
    {
        checkLockHeld("Lock must be held to call isFinishedInternal");
        boolean finished = .get() != . || .isDone() || .get(.size() - 1).isFinished();
        if (finished) {
        }
        return finished;
    }
    public void updateSource(TaskSource source)
    {
        checkLockNotHeld("Can not update sources while holding the driver lock");
        // does this driver have an operator for the specified source?
        if (!.containsKey(source.getPlanNodeId())) {
            return;
        }
        // stage the new updates
        while (true) {
            // attempt to update directly to the new source
            TaskSource currentNewSource = .putIfAbsent(source.getPlanNodeId(), source);
            // if update succeeded, just break
            if (currentNewSource == null) {
                break;
            }
            // merge source into the current new source
            TaskSource newSource = currentNewSource.update(source);
            // if this is not a new source, just return
            if (newSource == currentNewSource) {
                break;
            }
            // attempt to replace the currentNewSource with the new source
            if (.replace(source.getPlanNodeId(), currentNewSourcenewSource)) {
                break;
            }
            // someone else updated while we were processing
        }
        // attempt to get the lock and process the updates we staged above
        // updates will be processed in close if and only if we got the lock
    }
    private void processNewSources()
    {
        checkLockHeld("Lock must be held to call processNewSources");
        // only update if the driver is still alive
        if (.get() != .) {
            return;
        }
        // copy the pending sources
        // it is ok to "miss" a source added during the copy as it will be
        // handled on the next call to this method
        Map<PlanNodeIdTaskSourcesources = new HashMap<>();
        for (Entry<PlanNodeIdTaskSourceentry : sources.entrySet()) {
            // Remove the entries we are going to process from the newSources map.
            // It is ok if someone already updated the entry; we will catch it on
            // the next iteration.
            .remove(entry.getKey(), entry.getValue());
            processNewSource(entry.getValue());
        }
    }
    private void processNewSource(TaskSource source)
    {
        checkLockHeld("Lock must be held to call processNewSources");
        // create new source
        Set<ScheduledSplitnewSplits;
        TaskSource currentSource = .get(source.getPlanNodeId());
        if (currentSource == null) {
            newSplits = source.getSplits();
            .put(source.getPlanNodeId(), source);
        }
        else {
            // merge the current source and the specified source
            TaskSource newSource = currentSource.update(source);
            // if this is not a new source, just return
            if (newSource == currentSource) {
                return;
            }
            // find the new splits to add
            newSplits = Sets.difference(newSource.getSplits(), currentSource.getSplits());
            .put(source.getPlanNodeId(), newSource);
        }
        // add new splits
        for (ScheduledSplit newSplit : newSplits) {
            Split split = newSplit.getSplit();
            SourceOperator sourceOperator = .get(source.getPlanNodeId());
            if (sourceOperator != null) {
                sourceOperator.addSplit(split);
            }
        }
        // set no more splits
        if (source.isNoMoreSplits()) {
            .get(source.getPlanNodeId()).noMoreSplits();
        }
    }
    public ListenableFuture<?> processFor(Duration duration)
    {
        checkLockNotHeld("Can not process for a duration while holding the driver lock");
        checkNotNull(duration"duration is null");
        long maxRuntime = duration.roundTo(.);
        try (DriverLockResult lockResult = tryLockAndProcessPendingStateChanges(100, .)) {
            if (lockResult.wasAcquired()) {
                .startProcessTimer();
                try {
                    long start = System.nanoTime();
                    do {
                        ListenableFuture<?> future = processInternal();
                        if (!future.isDone()) {
                            return future;
                        }
                    }
                    while (System.nanoTime() - start < maxRuntime && !isFinishedInternal());
                }
                finally {
                    .recordProcessed();
                }
            }
        }
        return ;
    }
    public ListenableFuture<?> process()
    {
        checkLockNotHeld("Can not process while holding the driver lock");
        try (DriverLockResult lockResult = tryLockAndProcessPendingStateChanges(100, .)) {
            if (!lockResult.wasAcquired()) {
                // this is unlikely to happen unless the driver is being
                // destroyed and in that case the caller should notice notice
                // this state change by calling isFinished
                return ;
            }
            return processInternal();
        }
    }
    private ListenableFuture<?> processInternal()
    {
        checkLockHeld("Lock must be held to call processInternal");
        try {
            if (!.isEmpty()) {
                processNewSources();
            }
            for (int i = 0; i < .size() - 1 && !.isDone(); i++) {
                // check if current operator is blocked
                Operator current = .get(i);
                ListenableFuture<?> blocked = current.isBlocked();
                if (!blocked.isDone()) {
                    current.getOperatorContext().recordBlocked(blocked);
                    return blocked;
                }
                // check if next operator is blocked
                Operator next = .get(i + 1);
                blocked = next.isBlocked();
                if (!blocked.isDone()) {
                    next.getOperatorContext().recordBlocked(blocked);
                    return blocked;
                }
                // if the current operator is not finished and next operator needs input...
                if (!current.isFinished() && next.needsInput()) {
                    // get an output page from current operator
                    current.getOperatorContext().startIntervalTimer();
                    Page page = current.getOutput();
                    current.getOperatorContext().recordGetOutput(page);
                    // if we got an output page, add it to the next operator
                    if (page != null) {
                        next.getOperatorContext().startIntervalTimer();
                        next.addInput(page);
                        next.getOperatorContext().recordAddInput(page);
                    }
                }
                // if current operator is finished...
                if (current.isFinished()) {
                    // let next operator know there will be no more data
                    next.getOperatorContext().startIntervalTimer();
                    next.finish();
                    next.getOperatorContext().recordFinish();
                }
            }
            return ;
        }
        catch (Throwable t) {
            .failed(t);
            throw t;
        }
    }
    private void destroyIfNecessary()
    {
        checkLockHeld("Lock must be held to call destroyIfNecessary");
            return;
        }
        // record the current interrupted status (and clear the flag); we'll reset it later
        boolean wasInterrupted = Thread.interrupted();
        // if we get an error while closing a driver, record it and we will throw it at the end
        Throwable inFlightException = null;
        try {
            for (Operator operator : ) {
                try {
                    operator.close();
                }
                catch (InterruptedException t) {
                    // don't record the stack
                    wasInterrupted = true;
                }
                catch (Throwable t) {
                    inFlightException = addSuppressedException(
                            inFlightException,
                            t,
                            "Error closing operator %s for task %s",
                            operator.getOperatorContext().getOperatorId(),
                            .getTaskId());
                }
            }
            .finished();
        }
        catch (Throwable t) {
            // this shouldn't happen but be safe
            inFlightException = addSuppressedException(
                    inFlightException,
                    t,
                    "Error destroying driver for task %s",
                    .getTaskId());
        }
        finally {
            // reset the interrupted flag
            if (wasInterrupted) {
                Thread.currentThread().interrupt();
            }
        }
        if (inFlightException != null) {
            // this will always be an Error or Runtime
            throw Throwables.propagate(inFlightException);
        }
    }
    private Throwable addSuppressedException(Throwable inFlightExceptionThrowable newExceptionString messageObject... args)
    {
        if (newException instanceof Error) {
            if (inFlightException == null) {
                inFlightException = newException;
            }
            else {
                // Self-suppression not permitted
                if (inFlightException != newException) {
                    inFlightException.addSuppressed(newException);
                }
            }
        }
        else {
            // log normal exceptions instead of rethrowing them
            .error(newExceptionmessageargs);
        }
        return inFlightException;
    }
    private DriverLockResult tryLockAndProcessPendingStateChanges(int timeoutTimeUnit unit)
    {
        checkLockNotHeld("Can not acquire the driver lock while already holding the driver lock");
        return new DriverLockResult(timeoutunit);
    }
    private synchronized void checkLockNotHeld(String message)
    {
        checkState(Thread.currentThread() != message);
    }
    private synchronized void checkLockHeld(String message)
    {
        checkState(Thread.currentThread() == message);
    }
    private class DriverLockResult
            implements AutoCloseable
    {
        private final boolean acquired;
        private DriverLockResult(int timeoutTimeUnit unit)
        {
             = tryAcquire(timeoutunit);
        }
        private boolean tryAcquire(int timeoutTimeUnit unit)
        {
            boolean acquired = false;
            try {
                acquired = .tryLock(timeoutunit);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            if (acquired) {
                synchronized (Driver.this) {
                     = Thread.currentThread();
                }
            }
            return acquired;
        }
        public boolean wasAcquired()
        {
            return ;
        }
        @Override
        public void close()
        {
            if (!) {
                return;
            }
            boolean done = false;
            while (!done) {
                done = true;
                // before releasing the lock, process any new sources and/or destroy the driver
                try {
                    try {
                        processNewSources();
                    }
                    finally {
                        destroyIfNecessary();
                    }
                }
                finally {
                    synchronized (Driver.this) {
                         = null;
                    }
                    .unlock();
                    // if new sources were added after we processed them, go around and try again
                    // in case someone else failed to acquire the lock and as a result won't update them
                    if (!.isEmpty() && .get() == . && tryAcquire(0, .)) {
                        done = false;
                    }
                }
            }
        }
    }
New to GrepCode? Check out our FAQ X