Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.execution;
 
 
 
 import java.util.List;
 import java.util.Set;
 
 import static com.google.common.base.MoreObjects.toStringHelper;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.Sets.newConcurrentHashSet;
 import static io.airlift.concurrent.Threads.threadsNamed;
 import static java.util.concurrent.Executors.newCachedThreadPool;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 public class TaskExecutor
 {
     private static final Logger log = Logger.get(TaskExecutor.class);
 
     // each task is guaranteed a minimum number of tasks
     private static final int GUARANTEED_SPLITS_PER_TASK = 3;
 
     // each time we run a split, run it for this length before returning to the pool
     private static final Duration SPLIT_RUN_QUANTA = new Duration(1, .);
 
     private static final AtomicLong NEXT_RUNNER_ID = new AtomicLong();
     private static final AtomicLong NEXT_WORKER_ID = new AtomicLong();
 
     private final ExecutorService executor;
     private final ThreadPoolExecutorMBean executorMBean;
 
     private final int runnerThreads;
     private final int minimumNumberOfTasks;
 
     private final Ticker ticker;
 
     @GuardedBy("this")
     private final List<TaskHandletasks;
 
     private final Set<PrioritizedSplitRunnerallSplits = new HashSet<>();
 
     private final AtomicLongArray completedTasksPerLevel = new AtomicLongArray(5);
 
     private final TimeStat queuedTime = new TimeStat();
     private final TimeStat wallTime = new TimeStat();
 
    private volatile boolean closed;
    @Inject
    public TaskExecutor(TaskManagerConfig config)
    {
        this(checkNotNull(config"config is null").getMaxShardProcessorThreads());
    }
    public TaskExecutor(int runnerThreads)
    {
        this(runnerThreads, Ticker.systemTicker());
    }
    public TaskExecutor(int runnerThreadsTicker ticker)
    {
        checkArgument(runnerThreads > 0, "runnerThreads must be at least 1");
        // we manages thread pool size directly, so create an unlimited pool
        this. = newCachedThreadPool(threadsNamed("task-processor-%d"));
        this. = runnerThreads;
        this. = checkNotNull(ticker"ticker is null");
        // we assume we need at least two tasks per runner thread to keep the system busy
        this. = 2 * this.;
        this. = new PriorityBlockingQueue<>(Runtime.getRuntime().availableProcessors() * 10);
        this. = new LinkedList<>();
    }
    public synchronized void start()
    {
        checkState(!"TaskExecutor is closed");
        for (int i = 0; i < i++) {
            addRunnerThread();
        }
    }
    @PreDestroy
    public synchronized void stop()
    {
         = true;
        .shutdownNow();
    }
    @Override
    public synchronized String toString()
    {
        return toStringHelper(this)
                .add("runnerThreads")
                .add("allSplits".size())
                .add("pendingSplits".size())
                .add("runningSplits".size())
                .add("blockedSplits".size())
                .toString();
    }
    private synchronized void addRunnerThread()
    {
        try {
            .execute(new Runner());
        }
        catch (RejectedExecutionException ignored) {
        }
    }
    public synchronized TaskHandle addTask(TaskId taskId)
    {
        TaskHandle taskHandle = new TaskHandle(checkNotNull(taskId"taskId is null"));
        .add(taskHandle);
        return taskHandle;
    }
    public synchronized void removeTask(TaskHandle taskHandle)
    {
        taskHandle.destroy();
        .remove(taskHandle);
        // record completed stats
        long threadUsageNanos = taskHandle.getThreadUsageNanos();
        int priorityLevel = calculatePriorityLevel(threadUsageNanos);
        .incrementAndGet(priorityLevel);
    }
    public synchronized List<ListenableFuture<?>> enqueueSplits(TaskHandle taskHandleboolean forceStartList<? extends SplitRunnertaskSplits)
    {
        List<ListenableFuture<?>> finishedFutures = new ArrayList<>(taskSplits.size());
        for (SplitRunner taskSplit : taskSplits) {
            PrioritizedSplitRunner prioritizedSplitRunner = new PrioritizedSplitRunner(taskHandletaskSplit);
            if (forceStart) {
                // Note: we do not record queued time for forced splits
                startSplit(prioritizedSplitRunner);
                // add the runner to the handle so it can be destroyed if the task is canceled
                taskHandle.recordRunningSplit(prioritizedSplitRunner);
            }
            else {
                // add this to the work queue for the task
                taskHandle.enqueueSplit(prioritizedSplitRunner);
                // if task is under the limit for gaurenteed splits, start one
                scheduleTaskIfNecessary(taskHandle);
                // if globally we have more resources, start more
                addNewEntrants();
            }
            finishedFutures.add(prioritizedSplitRunner.getFinishedFuture());
        }
        return finishedFutures;
    }
    private void splitFinished(PrioritizedSplitRunner split)
    {
        synchronized (this) {
            .remove(split);
            TaskHandle taskHandle = split.getTaskHandle();
            taskHandle.splitComplete(split);
            .add(Duration.nanosSince(split.createdNanos));
            scheduleTaskIfNecessary(taskHandle);
            addNewEntrants();
        }
        // call destroy outside of synchronized block as it is expensive and doesn't need a lock on the task executor
        split.destroy();
    }
    private synchronized void scheduleTaskIfNecessary(TaskHandle taskHandle)
    {
        // if task has less than the minimum guaranteed splits running,
        // immediately schedule a new split for this task.  This assures
        // that a task gets its fair amount of consideration (you have to
        // have splits to be considered for running on a thread).
        if (taskHandle.getRunningSplits() < ) {
            PrioritizedSplitRunner split = taskHandle.pollNextSplit();
            if (split != null) {
                startSplit(split);
                .add(Duration.nanosSince(split.createdNanos));
            }
        }
    }
    private synchronized void addNewEntrants()
    {
        int running = .size();
        for (int i = 0; i <  - runningi++) {
            PrioritizedSplitRunner split = pollNextSplitWorker();
            if (split == null) {
                break;
            }
            .add(Duration.nanosSince(split.createdNanos));
            startSplit(split);
        }
    }
    private synchronized void startSplit(PrioritizedSplitRunner split)
    {
        .add(split);
        .put(split);
    }
    private synchronized PrioritizedSplitRunner pollNextSplitWorker()
    {
        // todo find a better algorithm for this
        // find the first task that produces a split, then move that task to the
        // end of the task list, so we get round robin
        for (Iterator<TaskHandleiterator = .iterator(); iterator.hasNext(); ) {
            TaskHandle task = iterator.next();
            PrioritizedSplitRunner split = task.pollNextSplit();
            if (split != null) {
                // move task to end of list
                iterator.remove();
                // CAUTION: we are modifying the list in the loop which would normally
                // cause a ConcurrentModificationException but we exit immediately
                .add(task);
                return split;
            }
        }
        return null;
    }
    public static class TaskHandle
    {
        private final TaskId taskId;
        private final Queue<PrioritizedSplitRunnerqueuedSplits = new ArrayDeque<>(10);
        private final List<PrioritizedSplitRunnerrunningSplits = new ArrayList<>(10);
        private final AtomicLong taskThreadUsageNanos = new AtomicLong();
        private final AtomicInteger nextSplitId = new AtomicInteger();
        private TaskHandle(TaskId taskId)
        {
            this. = taskId;
        }
        private long addThreadUsageNanos(long durationNanos)
        {
            return .addAndGet(durationNanos);
        }
        private TaskId getTaskId()
        {
            return ;
        }
        private void destroy()
        {
            for (PrioritizedSplitRunner runningSplit : ) {
                runningSplit.destroy();
            }
            .clear();
            for (PrioritizedSplitRunner queuedSplit : ) {
                queuedSplit.destroy();
            }
            .clear();
        }
        private void enqueueSplit(PrioritizedSplitRunner split)
        {
            .add(split);
        }
        private void recordRunningSplit(PrioritizedSplitRunner split)
        {
            .add(split);
        }
        private int getRunningSplits()
        {
            return .size();
        }
        private long getThreadUsageNanos()
        {
            return .get();
        }
        private PrioritizedSplitRunner pollNextSplit()
        {
            PrioritizedSplitRunner split = .poll();
            if (split != null) {
                .add(split);
            }
            return split;
        }
        private void splitComplete(PrioritizedSplitRunner split)
        {
            .remove(split);
        }
        private int getNextSplitId()
        {
            return .getAndIncrement();
        }
        @Override
        public String toString()
        {
            return toStringHelper(this)
                    .add("taskId")
                    .toString();
        }
    }
    private static class PrioritizedSplitRunner
            implements Comparable<PrioritizedSplitRunner>
    {
        private final long createdNanos = System.nanoTime();
        private final TaskHandle taskHandle;
        private final int splitId;
        private final long workerId;
        private final SplitRunner split;
        private final Ticker ticker;
        private final SettableFuture<?> finishedFuture = SettableFuture.create();
        private final AtomicBoolean destroyed = new AtomicBoolean();
        private final AtomicInteger priorityLevel = new AtomicInteger();
        private final AtomicLong threadUsageNanos = new AtomicLong();
        private final AtomicLong lastRun = new AtomicLong();
        private final AtomicLong start = new AtomicLong();
        private final AtomicLong cpuTime = new AtomicLong();
        private final AtomicLong processCalls = new AtomicLong();
        private PrioritizedSplitRunner(TaskHandle taskHandleSplitRunner splitTicker ticker)
        {
            this. = taskHandle;
            this. = taskHandle.getNextSplitId();
            this. = split;
            this. = ticker;
            this. = .getAndIncrement();
        }
        private TaskHandle getTaskHandle()
        {
            return ;
        }
        private ListenableFuture<?> getFinishedFuture()
        {
            return ;
        }
        public void destroy()
        {
            .set(true);
            try {
                .close();
            }
            catch (RuntimeException e) {
                .error(e"Error closing split for task %s".getTaskId());
            }
        }
        public boolean isFinished()
        {
            boolean finished = .isFinished();
            if (finished) {
                .set(null);
            }
            return finished || .get();
        }
        public ListenableFuture<?> process()
                throws Exception
        {
            try {
                .compareAndSet(0, System.currentTimeMillis());
                .incrementAndGet();
                CpuTimer timer = new CpuTimer();
                ListenableFuture<?> blocked = .processFor();
                CpuTimer.CpuDuration elapsed = timer.elapsedTime();
                // update priority level base on total thread usage of task
                long durationNanos = elapsed.getWall().roundTo();
                long threadUsageNanos = .addThreadUsageNanos(durationNanos);
                this..set(threadUsageNanos);
                .set(calculatePriorityLevel(threadUsageNanos));
                // record last run for prioritization within a level
                .set(.read());
                .addAndGet(elapsed.getCpu().roundTo());
                return blocked;
            }
            catch (Throwable e) {
                .setException(e);
                throw e;
            }
        }
        public boolean updatePriorityLevel()
        {
            int newPriority = calculatePriorityLevel(.getThreadUsageNanos());
            if (newPriority == .getAndSet(newPriority)) {
                return false;
            }
            // update thread usage while if level changed
            return true;
        }
        @Override
        public int compareTo(PrioritizedSplitRunner o)
        {
            int level = .get();
            int result = Ints.compare(levelo.priorityLevel.get());
            if (result != 0) {
                return result;
            }
            if (level < 4) {
                result = Long.compare(.get(), .get());
            }
            else {
                result = Long.compare(.get(), o.lastRun.get());
            }
            if (result != 0) {
                return result;
            }
            return Longs.compare(o.workerId);
        }
        public int getSplitId()
        {
            return ;
        }
        public String getInfo()
        {
            return String.format("Split %-15s-%d (start = %s, wall = %s ms, cpu = %s ms, calls = %s)",
                    .getTaskId(),
                    ,
                    .get(),
                    System.currentTimeMillis() - .get(),
                    (int) (.get() / 1.0e6),
                    .get());
        }
        @Override
        public String toString()
        {
            return String.format("Split %-15s-%d".getTaskId(), );
        }
    }
    private static int calculatePriorityLevel(long threadUsageNanos)
    {
        long millis = .toMillis(threadUsageNanos);
        int priorityLevel;
        if (millis < 1000) {
            priorityLevel = 0;
        }
        else if (millis < 10_000) {
            priorityLevel = 1;
        }
        else if (millis < 60_000) {
            priorityLevel = 2;
        }
        else if (millis < 300_000) {
            priorityLevel = 3;
        }
        else {
            priorityLevel = 4;
        }
        return priorityLevel;
    }
    private class Runner
            implements Runnable
    {
        private final long runnerId = .getAndIncrement();
        @Override
        public void run()
        {
            try (SetThreadName runnerName = new SetThreadName("SplitRunner-%s")) {
                while (! && !Thread.currentThread().isInterrupted()) {
                    // select next worker
                    final PrioritizedSplitRunner split;
                    try {
                        split = .take();
                        if (split.updatePriorityLevel()) {
                            // priority level changed, return split to queue for re-prioritization
                            .put(split);
                            continue;
                        }
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                    try (SetThreadName splitName = new SetThreadName(split.getTaskHandle().getTaskId() + "-" + split.getSplitId())) {
                        .add(split);
                        boolean finished;
                        ListenableFuture<?> blocked;
                        try {
                            blocked = split.process();
                            finished = split.isFinished();
                        }
                        finally {
                            .remove(split);
                        }
                        if (finished) {
                            .debug("%s is finished"split.getInfo());
                            splitFinished(split);
                        }
                        else {
                            if (blocked.isDone()) {
                                .put(split);
                            }
                            else {
                                .add(split);
                                blocked.addListener(new Runnable()
                                {
                                    @Override
                                    public void run()
                                    {
                                        .remove(split);
                                        split.updatePriorityLevel();
                                        .put(split);
                                    }
                                }, );
                            }
                        }
                    }
                    catch (Throwable t) {
                        .error(t"Error processing %s"split.getInfo());
                        splitFinished(split);
                    }
                }
            }
            finally {
                // unless we have been closed, we need to replace this thread
                if (!) {
                    addRunnerThread();
                }
            }
        }
    }
    //
    // STATS
    //
    @Managed
    public synchronized int getTasks()
    {
        return .size();
    }
    @Managed
    public int getRunnerThreads()
    {
        return ;
    }
    @Managed
    public int getMinimumNumberOfTasks()
    {
        return ;
    }
    @Managed
    public synchronized int getTotalSplits()
    {
        return .size();
    }
    @Managed
    public int getPendingSplits()
    {
        return .size();
    }
    @Managed
    public int getRunningSplits()
    {
        return .size();
    }
    @Managed
    public int getBlockedSplits()
    {
        return .size();
    }
    @Managed
    public long getCompletedTasksLevel0()
    {
        return .get(0);
    }
    @Managed
    public long getCompletedTasksLevel1()
    {
        return .get(1);
    }
    @Managed
    public long getCompletedTasksLevel2()
    {
        return .get(2);
    }
    @Managed
    public long getCompletedTasksLevel3()
    {
        return .get(3);
    }
    @Managed
    public long getCompletedTasksLevel4()
    {
        return .get(4);
    }
    @Managed
    public long getRunningTasksLevel0()
    {
        return calculateRunningTasksForLevel(0);
    }
    @Managed
    public long getRunningTasksLevel1()
    {
        return calculateRunningTasksForLevel(1);
    }
    @Managed
    public long getRunningTasksLevel2()
    {
        return calculateRunningTasksForLevel(2);
    }
    @Managed
    public long getRunningTasksLevel3()
    {
        return calculateRunningTasksForLevel(3);
    }
    @Managed
    public long getRunningTasksLevel4()
    {
        return calculateRunningTasksForLevel(4);
    }
    @Managed
    @Nested
    public TimeStat getQueuedTime()
    {
        return ;
    }
    @Managed
    @Nested
    public TimeStat getWallTime()
    {
        return ;
    }
    private synchronized int calculateRunningTasksForLevel(int level)
    {
        int count = 0;
        for (TaskHandle task : ) {
            if (calculatePriorityLevel(task.getThreadUsageNanos()) == level) {
                count++;
            }
        }
        return count;
    }
    @Managed(description = "Task processor executor")
    @Nested
    {
        return ;
    }
New to GrepCode? Check out our FAQ X