Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.operator;
 
 
 import java.util.List;
 
 import static com.facebook.presto.RowPagesBuilder.rowPagesBuilder;
 import static com.facebook.presto.SessionTestUtils.TEST_SESSION;
 import static com.facebook.presto.spi.type.BigintType.BIGINT;
 import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
 import static com.facebook.presto.testing.TestingTaskContext.createTaskContext;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Throwables.getRootCause;
 import static io.airlift.concurrent.Threads.daemonThreadsNamed;
 import static java.util.concurrent.Executors.newCachedThreadPool;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 @Test(singleThreaded = true)
 public class TestDriver
 {
     private ExecutorService executor;
     private DriverContext driverContext;
 
     @BeforeMethod
     public void setUp()
             throws Exception
     {
          = newCachedThreadPool(daemonThreadsNamed("test-%s"));
 
                 .addPipelineContext(truetrue)
                 .addDriverContext();
     }
 
     @AfterMethod
     public void tearDown()
     {
         .shutdownNow();
     }
 
     @Test
     public void testNormalFinish()
     {
         List<Typetypes = ImmutableList.<Type>of();
         ValuesOperator source = new ValuesOperator(.addOperatorContext(0, "values"), typesrowPagesBuilder(types)
                 .addSequencePage(10, 20, 30, 40)
                 .build());
 
         MaterializingOperator sink = createSinkOperator(source);
         Driver driver = new Driver(sourcesink);
 
         assertSame(driver.getDriverContext(), );
 
         assertFalse(driver.isFinished());
         ListenableFuture<?> blocked = driver.processFor(new Duration(1, .));
        assertTrue(blocked.isDone());
        assertTrue(driver.isFinished());
        assertTrue(sink.isFinished());
        assertTrue(source.isFinished());
    }
    @Test
    public void testAbruptFinish()
    {
        List<Typetypes = ImmutableList.<Type>of();
        ValuesOperator source = new ValuesOperator(.addOperatorContext(0, "values"), typesrowPagesBuilder(types)
                .addSequencePage(10, 20, 30, 40)
                .build());
        MaterializingOperator sink = createSinkOperator(source);
        Driver driver = new Driver(sourcesink);
        assertSame(driver.getDriverContext(), );
        assertFalse(driver.isFinished());
        driver.close();
        assertTrue(driver.isFinished());
        // finish is only called in normal operations
        assertFalse(source.isFinished());
        assertFalse(sink.isFinished());
        // close is always called (values operator doesn't have a closed state)
        assertTrue(sink.isClosed());
    }
    @Test
    public void testAddSourceFinish()
    {
        PlanNodeId sourceId = new PlanNodeId("source");
        final List<Typetypes = ImmutableList.<Type>of();
        TableScanOperator source = new TableScanOperator(.addOperatorContext(99, "values"),
                sourceId,
                new PageSourceProvider()
                {
                    @Override
                    public ConnectorPageSource createPageSource(Split splitList<ColumnHandlecolumns)
                    {
                        return new FixedPageSource(rowPagesBuilder(types)
                                .addSequencePage(10, 20, 30, 40)
                                .build());
                    }
                },
                types,
                ImmutableList.<ColumnHandle>of());
        MaterializingOperator sink = createSinkOperator(source);
        Driver driver = new Driver(sourcesink);
        assertSame(driver.getDriverContext(), );
        assertFalse(driver.isFinished());
        assertFalse(driver.processFor(new Duration(1, .)).isDone());
        assertFalse(driver.isFinished());
        driver.updateSource(new TaskSource(sourceId, ImmutableSet.of(new ScheduledSplit(0, newMockSplit())), true));
        assertFalse(driver.isFinished());
        assertTrue(driver.processFor(new Duration(1, .)).isDone());
        assertTrue(driver.isFinished());
        assertTrue(sink.isFinished());
        assertTrue(source.isFinished());
    }
    @Test
            throws Exception
    {
        BrokenOperator brokenOperator = new BrokenOperator(.addOperatorContext(0, "source"), false);
        final Driver driver = new Driver(brokenOperatorcreateSinkOperator(brokenOperator));
        assertSame(driver.getDriverContext(), );
        // block thread in operator processing
        Future<BooleandriverProcessFor = .submit(new Callable<Boolean>()
        {
            @Override
            public Boolean call()
                    throws Exception
            {
                return driver.processFor(new Duration(1, .)).isDone();
            }
        });
        brokenOperator.waitForLocked();
        driver.close();
        assertTrue(driver.isFinished());
        try {
            driverProcessFor.get(1, .);
            fail("Expected InterruptedException");
        }
        catch (ExecutionException e) {
            checkArgument(getRootCause(einstanceof InterruptedException"Expected root cause exception to be an instance of InterruptedException");
        }
    }
    @Test
            throws Exception
    {
        BrokenOperator brokenOperator = new BrokenOperator(.addOperatorContext(0, "source"), true);
        final Driver driver = new Driver(brokenOperatorcreateSinkOperator(brokenOperator));
        assertSame(driver.getDriverContext(), );
        // block thread in operator close
        Future<BooleandriverClose = .submit(new Callable<Boolean>()
        {
            @Override
            public Boolean call()
                    throws Exception
            {
                driver.close();
                return true;
            }
        });
        brokenOperator.waitForLocked();
        assertTrue(driver.processFor(new Duration(1, .)).isDone());
        assertTrue(driver.isFinished());
        brokenOperator.unlock();
        assertTrue(driverClose.get());
    }
    @Test
    public void testBrokenOperatorAddSource()
            throws Exception
    {
        PlanNodeId sourceId = new PlanNodeId("source");
        final List<Typetypes = ImmutableList.<Type>of();
        // create a table scan operator that does not block, which will cause the driver loop to busy wait
        TableScanOperator source = new NotBlockedTableScanOperator(.addOperatorContext(99, "values"),
                sourceId,
                new PageSourceProvider()
                {
                    @Override
                    public ConnectorPageSource createPageSource(Split splitList<ColumnHandlecolumns)
                    {
                        return new FixedPageSource(rowPagesBuilder(types)
                                .addSequencePage(10, 20, 30, 40)
                                .build());
                    }
                },
                types,
                ImmutableList.<ColumnHandle>of());
        BrokenOperator brokenOperator = new BrokenOperator(.addOperatorContext(0, "source"));
        final Driver driver = new Driver(sourcebrokenOperator);
        // block thread in operator processing
        Future<BooleandriverProcessFor = .submit(new Callable<Boolean>()
        {
            @Override
            public Boolean call()
                    throws Exception
            {
                return driver.processFor(new Duration(1, .)).isDone();
            }
        });
        brokenOperator.waitForLocked();
        assertSame(driver.getDriverContext(), );
        assertFalse(driver.isFinished());
        // processFor always returns NOT_BLOCKED, because DriveLockResult was not acquired
        assertTrue(driver.processFor(new Duration(1, .)).isDone());
        assertFalse(driver.isFinished());
        driver.updateSource(new TaskSource(sourceId, ImmutableSet.of(new ScheduledSplit(0, newMockSplit())), true));
        assertFalse(driver.isFinished());
        // processFor always returns NOT_BLOCKED, because DriveLockResult was not acquired
        assertTrue(driver.processFor(new Duration(1, .)).isDone());
        assertFalse(driver.isFinished());
        driver.close();
        assertTrue(driver.isFinished());
        try {
            driverProcessFor.get(1, .);
            fail("Expected InterruptedException");
        }
        catch (ExecutionException e) {
            checkArgument(getRootCause(einstanceof InterruptedException"Expected root cause exception to be an instance of InterruptedException");
        }
    }
    private static Split newMockSplit()
    {
        return new Split("test"new MockSplit());
    }
    {
        return new MaterializingOperator(.addOperatorContext(1, "sink"), source.getTypes());
    }
    private static class BrokenOperator
            implements OperatorCloseable
    {
        private final OperatorContext operatorContext;
        private final ReentrantLock lock = new ReentrantLock();
        private final CountDownLatch lockedLatch = new CountDownLatch(1);
        private final CountDownLatch unlockLatch = new CountDownLatch(1);
        private final boolean lockForClose;
        private BrokenOperator(OperatorContext operatorContext)
        {
            this(operatorContextfalse);
        }
        private BrokenOperator(OperatorContext operatorContextboolean lockForClose)
        {
            this. = operatorContext;
            this. = lockForClose;
        }
        @Override
        public OperatorContext getOperatorContext()
        {
            return ;
        }
        public void unlock()
        {
            .countDown();
        }
        private void waitForLocked()
        {
            try {
                assertTrue(.await(10, .));
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Interrupted"e);
            }
        }
        private void waitForUnlock()
        {
            try {
                assertTrue(.tryLock(1, .));
                try {
                    .countDown();
                    assertTrue(.await(5, .));
                }
                finally {
                    .unlock();
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Interrupted"e);
            }
        }
        @Override
        public List<TypegetTypes()
        {
            return ImmutableList.of();
        }
        @Override
        public void finish()
        {
            waitForUnlock();
        }
        @Override
        public boolean isFinished()
        {
            waitForUnlock();
            return true;
        }
        @Override
        public ListenableFuture<?> isBlocked()
        {
            waitForUnlock();
            return ;
        }
        @Override
        public boolean needsInput()
        {
            waitForUnlock();
            return false;
        }
        @Override
        public void addInput(Page page)
        {
            waitForUnlock();
        }
        @Override
        public Page getOutput()
        {
            waitForUnlock();
            return null;
        }
        @Override
        public void close()
                throws IOException
        {
            if () {
                waitForUnlock();
            }
        }
    }
    private static class NotBlockedTableScanOperator
            extends TableScanOperator
    {
        public NotBlockedTableScanOperator(
                OperatorContext operatorContext,
                PlanNodeId planNodeId,
                PageSourceProvider pageSourceProvider,
                List<Typetypes,
                Iterable<ColumnHandlecolumns)
        {
            super(operatorContextplanNodeIdpageSourceProvidertypescolumns);
        }
        @Override
        public ListenableFuture<?> isBlocked()
        {
            return ;
        }
    }
    private static class MockSplit
            implements ConnectorSplit
    {
        @Override
        public boolean isRemotelyAccessible()
        {
            return false;
        }
        @Override
        public List<HostAddressgetAddresses()
        {
            return ImmutableList.of();
        }
        @Override
        public Object getInfo()
        {
            return null;
        }
    }
New to GrepCode? Check out our FAQ X