Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.execution;
 
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import static com.google.common.collect.Sets.newConcurrentHashSet;
 import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
 import static io.airlift.concurrent.Threads.threadsNamed;
 import static java.util.concurrent.Executors.newCachedThreadPool;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 public class TaskExecutorSimulator
         implements Closeable
 {
     private static final boolean PRINT_TASK_COMPLETION = false;
     private static final boolean PRINT_SPLIT_COMPLETION = false;
 
     public static void main(String[] args)
             throws Exception
     {
         try (TaskExecutorSimulator simulator = new TaskExecutorSimulator()) {
             simulator.run();
         }
     }
 
     private TaskExecutor taskExecutor;
 
     public TaskExecutorSimulator()
     {
 
          = new TaskExecutor(24, 48, new Ticker()
         {
             private final long start = System.nanoTime();
 
             @Override
             public long read()
             {
                 // run 10 times faster than reality
                 long now = System.nanoTime();
                 return (now - ) * 100;
             }
         });
         .start();
     }
 
     @Override
     public void close()
     {
         .stop();
         .shutdownNow();
     }
 
     public void run()
             throws Exception
     {
         Multimap<IntegerSimulationTasktasks = Multimaps.synchronizedListMultimap(ArrayListMultimap.<IntegerSimulationTask>create());
         Set<ListenableFuture<?>> finishFutures = newConcurrentHashSet();
        AtomicBoolean done = new AtomicBoolean();
        long start = System.nanoTime();
        // large tasks
        for (int userId = 0; userId < 2; userId++) {
            ListenableFuture<?> future = createUser("large_" + userId, 100, donetasks);
            finishFutures.add(future);
        }
        // small tasks
        for (int userId = 0; userId < 4; userId++) {
            ListenableFuture<?> future = createUser("small_" + userId, 5, donetasks);
            finishFutures.add(future);
        }
        // tiny tasks
        for (int userId = 0; userId < 1; userId++) {
            ListenableFuture<?> future = createUser("tiny_" + userId, 1, donetasks);
            finishFutures.add(future);
        }
        // warm up
        for (int i = 0; i < 30; i++) {
            ..sleep(1000);
            ..println();
        }
        tasks.clear();
        // run
        for (int i = 0; i < 60; i++) {
            ..sleep(1000);
            ..println();
        }
        // capture finished tasks
        Map<IntegerCollection<SimulationTask>> middleTasks;
        synchronized (tasks) {
            middleTasks = new TreeMap<>(tasks.asMap());
        }
        // wait for finish
        done.set(true);
        Futures.allAsList(finishFutures).get(1, .);
        Duration runtime = Duration.nanosSince(start).convertToMostSuccinctTimeUnit();
        synchronized (this) {
            ..println();
            ..println("Simulation finished in  " + runtime);
            ..println();
            for (Entry<IntegerCollection<SimulationTask>> entry : middleTasks.entrySet()) {
                Distribution durationDistribution = new Distribution();
                Distribution taskParallelismDistribution = new Distribution();
                for (SimulationTask task : entry.getValue()) {
                    long taskStart = .;
                    long taskEnd = 0;
                    long totalCpuTime = 0;
                    for (SimulationSplit split : task.getSplits()) {
                        taskStart = Math.min(taskStartsplit.getStartNanos());
                        taskEnd = Math.max(taskEndsplit.getDoneNanos());
                        totalCpuTime += ..toNanos(split.getRequiredProcessMillis());
                    }
                    Duration taskDuration = new Duration(taskEnd - taskStart).convertTo(.);
                    durationDistribution.add(taskDuration.toMillis());
                    double taskParallelism = 1.0 * totalCpuTime / (taskEnd - taskStart);
                    taskParallelismDistribution.add((long) (taskParallelism * 100));
                }
                ..println("Splits " + entry.getKey() + ": Completed " + entry.getValue().size());
                Map<DoubleLongdurationPercentiles = durationDistribution.getPercentiles();
                ..printf("   wall time ms :: p01 %4s :: p05 %4s :: p10 %4s :: p97 %4s :: p50 %4s :: p75 %4s :: p90 %4s :: p95 %4s :: p99 %4s\n",
                        durationPercentiles.get(0.01),
                        durationPercentiles.get(0.05),
                        durationPercentiles.get(0.10),
                        durationPercentiles.get(0.25),
                        durationPercentiles.get(0.50),
                        durationPercentiles.get(0.75),
                        durationPercentiles.get(0.90),
                        durationPercentiles.get(0.95),
                        durationPercentiles.get(0.99));
                Map<DoubleLongparallelismPercentiles = taskParallelismDistribution.getPercentiles();
                ..printf("    parallelism :: p99 %4.2f :: p95 %4.2f :: p90 %4.2f :: p75 %4.2f :: p50 %4.2f :: p25 %4.2f :: p10 %4.2f :: p05 %4.2f :: p01 %4.2f\n",
                        parallelismPercentiles.get(0.99) / 100.0,
                        parallelismPercentiles.get(0.95) / 100.0,
                        parallelismPercentiles.get(0.90) / 100.0,
                        parallelismPercentiles.get(0.75) / 100.0,
                        parallelismPercentiles.get(0.50) / 100.0,
                        parallelismPercentiles.get(0.25) / 100.0,
                        parallelismPercentiles.get(0.10) / 100.0,
                        parallelismPercentiles.get(0.05) / 100.0,
                        parallelismPercentiles.get(0.01) / 100.0);
            }
        }
        Thread.sleep(10);
    }
    private ListenableFuture<?> createUser(final String userId,
            final int splitsPerTask,
            final TaskExecutor taskExecutor,
            final AtomicBoolean done,
            final Multimap<IntegerSimulationTasktasks)
    {
        return .submit(new Callable<Void>()
        {
            @Override
            public Void call()
                    throws Exception
            {
                long taskId = 0;
                while (!done.get()) {
                    SimulationTask task = new SimulationTask(taskExecutornew TaskId(userId"0", String.valueOf(taskId++)));
                    task.schedule(splitsPerTasknew Duration(0, )).get();
                    task.destroy();
                    printTaskCompletion(task);
                    tasks.put(splitsPerTasktask);
                }
                return null;
            }
        });
    }
    private synchronized void printTaskCompletion(SimulationTask task)
    {
        if (!) {
            return;
        }
        long taskStart = .;
        long taskEnd = 0;
        long taskQueuedTime = 0;
        long totalCpuTime = 0;
        for (SimulationSplit split : task.getSplits()) {
            taskStart = Math.min(taskStartsplit.getStartNanos());
            taskEnd = Math.max(taskEndsplit.getDoneNanos());
            taskQueuedTime += split.getQueuedNanos();
            totalCpuTime += ..toNanos(split.getRequiredProcessMillis());
        }
        ..printf("%-12s %8s %8s %.2f\n",
                task.getTaskId() + ":",
                new Duration(taskQueuedTime).convertTo(.),
                new Duration(taskEnd - taskStart).convertTo(.),
                1.0 * totalCpuTime / (taskEnd - taskStart)
        );
        // print split info
        if () {
            for (SimulationSplit split : task.getSplits()) {
                Duration totalQueueTime = new Duration(split.getQueuedNanos(), ).convertTo(.);
                Duration executionWallTime = new Duration(split.getDoneNanos() - split.getStartNanos(), ).convertTo(.);
                Duration totalWallTime = new Duration(split.getDoneNanos() - split.getCreatedNanos(), ).convertTo(.);
                ..printf("         %8s %8s %8s\n"totalQueueTimeexecutionWallTimetotalWallTime);
            }
            ..println();
        }
    }
    private static class SimulationTask
    {
        private final long createdNanos = System.nanoTime();
        private final TaskExecutor taskExecutor;
        private final Object taskId;
        private final List<SimulationSplitsplits = new ArrayList<>();
        private final List<ListenableFuture<?>> splitFutures = new ArrayList<>();
        private final TaskHandle taskHandle;
        private SimulationTask(TaskExecutor taskExecutorTaskId taskId)
        {
            this. = taskExecutor;
            this. = taskId;
             = taskExecutor.addTask(taskId);
        }
        public void destroy()
        {
            .removeTask();
        }
        public ListenableFuture<?> schedule(final int splitsExecutorService executorfinal Duration entryDelay)
        {
            final SettableFuture<Voidfuture = SettableFuture.create();
            executor.submit(new Runnable()
            {
                @Override
                public void run()
                {
                    try {
                        for (int splitId = 0; splitId < splitssplitId++) {
                            SimulationSplit split = new SimulationSplit(new Duration(80, .), new Duration(1, .));
                            SimulationTask.this..add(split);
                            .addAll(.enqueueSplits(false, ImmutableList.of(split)));
                            Thread.sleep(entryDelay.toMillis());
                        }
                        Futures.allAsList().get();
                        future.set(null);
                    }
                    catch (Throwable e) {
                        future.setException(e);
                        throw Throwables.propagate(e);
                    }
                }
            });
            return future;
        }
        private Object getTaskId()
        {
            return ;
        }
        private long getCreatedNanos()
        {
            return ;
        }
        private List<SimulationSplitgetSplits()
        {
            return ;
        }
    }
    private static class SimulationSplit
            implements SplitRunner
    {
        private final long requiredProcessMillis;
        private final long processMillisPerCall;
        private final AtomicLong completedProcessMillis = new AtomicLong();
        private final AtomicInteger calls = new AtomicInteger(0);
        private final long createdNanos = System.nanoTime();
        private final AtomicLong startNanos = new AtomicLong(-1);
        private final AtomicLong doneNanos = new AtomicLong(-1);
        private final AtomicLong queuedNanos = new AtomicLong();
        private long lastCallNanos = ;
        private SimulationSplit(Duration requiredProcessTimeDuration processTimePerCall)
        {
            this. = requiredProcessTime.toMillis();
            this. = processTimePerCall.toMillis();
        }
        private long getRequiredProcessMillis()
        {
            return ;
        }
        private long getCreatedNanos()
        {
            return ;
        }
        private long getStartNanos()
        {
            return .get();
        }
        private long getDoneNanos()
        {
            return .get();
        }
        private long getQueuedNanos()
        {
            return .get();
        }
        @Override
        public boolean isFinished()
        {
            return .get() >= 0;
        }
        @Override
        public void close()
        {
        }
        @Override
        public ListenableFuture<?> processFor(Duration duration)
                throws Exception
        {
            long callStart = System.nanoTime();
            .compareAndSet(-1, callStart);
            .incrementAndGet();
            .addAndGet(callStart - );
            long processMillis = Math.min( - .get(), );
            ..sleep(processMillis);
            long completedMillis = .addAndGet(processMillis);
            boolean isFinished = completedMillis >= ;
            long callEnd = System.nanoTime();
             = callEnd;
            if (isFinished) {
                .compareAndSet(-1, callEnd);
            }
            return Futures.immediateCheckedFuture(null);
        }
    }
New to GrepCode? Check out our FAQ X