Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.execution;
 
 
 
 import static com.google.common.collect.Iterables.getOnlyElement;
 import static org.testng.Assert.assertEquals;
 
 public class TaskExecutorTest
 {
     @Test(invocationCount = 100)
     public void test()
             throws Exception
     {
         TaskExecutor taskExecutor = new TaskExecutor(4, 8);
         taskExecutor.start();
 
         try {
             TaskHandle taskHandle = taskExecutor.addTask(new TaskId("test""test""test"));
 
             final Phaser beginPhase = new Phaser();
             beginPhase.register();
             final Phaser verificationComplete = new Phaser();
             verificationComplete.register();
 
             // add two jobs
             TestingJob driver1 = new TestingJob(beginPhaseverificationComplete, 10);
             ListenableFuture<?> future1 = getOnlyElement(taskExecutor.enqueueSplits(taskHandletrue, ImmutableList.of(driver1)));
             TestingJob driver2 = new TestingJob(beginPhaseverificationComplete, 10);
             ListenableFuture<?> future2 = getOnlyElement(taskExecutor.enqueueSplits(taskHandletrue, ImmutableList.of(driver2)));
             assertEquals(driver1.getCompletedPhases(), 0);
             assertEquals(driver2.getCompletedPhases(), 0);
 
             // verify worker have arrived but haven't processed yet
             beginPhase.arriveAndAwaitAdvance();
             assertEquals(driver1.getCompletedPhases(), 0);
             assertEquals(driver2.getCompletedPhases(), 0);
             verificationComplete.arriveAndAwaitAdvance();
 
             // advance one phase and verify
             beginPhase.arriveAndAwaitAdvance();
             assertEquals(driver1.getCompletedPhases(), 1);
             assertEquals(driver2.getCompletedPhases(), 1);
 
             verificationComplete.arriveAndAwaitAdvance();
 
             // add one more job
             TestingJob driver3 = new TestingJob(beginPhaseverificationComplete, 10);
             ListenableFuture<?> future3 = getOnlyElement(taskExecutor.enqueueSplits(taskHandlefalse, ImmutableList.of(driver3)));
 
             // advance one phase and verify
             beginPhase.arriveAndAwaitAdvance();
             assertEquals(driver1.getCompletedPhases(), 2);
             assertEquals(driver2.getCompletedPhases(), 2);
             assertEquals(driver3.getCompletedPhases(), 0);
             verificationComplete.arriveAndAwaitAdvance();
 
             // advance to the end of the first two task and verify
             beginPhase.arriveAndAwaitAdvance();
             for (int i = 0; i < 7; i++) {
                 verificationComplete.arriveAndAwaitAdvance();
                 beginPhase.arriveAndAwaitAdvance();
                 assertEquals(beginPhase.getPhase(), verificationComplete.getPhase() + 1);
             }
             assertEquals(driver1.getCompletedPhases(), 10);
             assertEquals(driver2.getCompletedPhases(), 10);
             assertEquals(driver3.getCompletedPhases(), 8);
             future1.get(1, .);
             future2.get(1, .);
             verificationComplete.arriveAndAwaitAdvance();
 
             // advance two more times and verify
             beginPhase.arriveAndAwaitAdvance();
             verificationComplete.arriveAndAwaitAdvance();
             beginPhase.arriveAndAwaitAdvance();
             assertEquals(driver1.getCompletedPhases(), 10);
             assertEquals(driver2.getCompletedPhases(), 10);
             assertEquals(driver3.getCompletedPhases(), 10);
            future3.get(1, .);
            verificationComplete.arriveAndAwaitAdvance();
            assertEquals(driver1.getFirstPhase(), 0);
            assertEquals(driver2.getFirstPhase(), 0);
            assertEquals(driver3.getFirstPhase(), 2);
            assertEquals(driver1.getLastPhase(), 10);
            assertEquals(driver2.getLastPhase(), 10);
            assertEquals(driver3.getLastPhase(), 12);
        }
        finally {
            taskExecutor.stop();
        }
    }
    @Test
    public void testTaskHandle()
            throws Exception
    {
        TaskExecutor taskExecutor = new TaskExecutor(4, 8);
        taskExecutor.start();
        try {
            TaskHandle taskHandle = taskExecutor.addTask(new TaskId("test""test""test"));
            Phaser beginPhase = new Phaser();
            beginPhase.register();
            Phaser verificationComplete = new Phaser();
            verificationComplete.register();
            TestingJob driver = new TestingJob(beginPhaseverificationComplete, 10);
            // force enqueue a split
            taskExecutor.enqueueSplits(taskHandletrue, ImmutableList.of(driver));
            assertEquals(taskHandle.getRunningSplits(), 0);
            // normal enqueue a split
            taskExecutor.enqueueSplits(taskHandlefalse, ImmutableList.of(driver));
            assertEquals(taskHandle.getRunningSplits(), 1);
        }
        finally {
            taskExecutor.stop();
        }
    }
    private static class TestingJob
            implements SplitRunner
    {
        private final Phaser awaitWorkers;
        private final Phaser awaitVerifiers;
        private final int requiredPhases;
        private final AtomicInteger completedPhases = new AtomicInteger();
        private final AtomicInteger firstPhase = new AtomicInteger(-1);
        private final AtomicInteger lastPhase = new AtomicInteger(-1);
        public TestingJob(Phaser awaitWorkersPhaser awaitVerifiersint requiredPhases)
        {
            this. = awaitWorkers;
            this. = awaitVerifiers;
            this. = requiredPhases;
            awaitWorkers.register();
            awaitVerifiers.register();
        }
        private int getFirstPhase()
        {
            return .get();
        }
        private int getLastPhase()
        {
            return .get();
        }
        private int getCompletedPhases()
        {
            return .get();
        }
        @Override
        public ListenableFuture<?> processFor(Duration duration)
                throws Exception
        {
            int phase = .arriveAndAwaitAdvance();
            .compareAndSet(-1, phase - 1);
            .set(phase);
            .arriveAndAwaitAdvance();
            .getAndIncrement();
            return Futures.immediateFuture(null);
        }
        @Override
        public boolean isFinished()
        {
            boolean isFinished = .get() >= ;
            if (isFinished) {
                .arriveAndDeregister();
                .arriveAndDeregister();
            }
            return isFinished;
        }
        @Override
        public void close()
        {
        }
    }
New to GrepCode? Check out our FAQ X