Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.execution;
 
 
 
 import java.net.URI;
 import java.util.List;
 import java.util.Set;
 
 import static com.facebook.presto.OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS;
 import static com.facebook.presto.SessionTestUtils.TEST_SESSION;
 import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
 import static com.facebook.presto.util.Failures.toFailures;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static io.airlift.concurrent.Threads.daemonThreadsNamed;
 import static io.airlift.units.DataSize.Unit.GIGABYTE;
 import static io.airlift.units.DataSize.Unit.MEGABYTE;
 import static java.util.concurrent.Executors.newCachedThreadPool;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.fail;
 
 @Test(singleThreaded = true)
 public class TestSqlStageExecution
 {
     public static final TaskId OUT = new TaskId("query""stage""out");
     private NodeTaskMap nodeTaskMap;
     private InMemoryNodeManager nodeManager;
     private NodeScheduler nodeScheduler;
     private LocationFactory locationFactory;
    public void setUp()
            throws Exception
    {
         = new InMemoryNodeManager();
        ImmutableList.Builder<NodenodeBuilder = ImmutableList.builder();
        nodeBuilder.add(new PrestoNode("other1", URI.create("http://127.0.0.1:11"), .));
        nodeBuilder.add(new PrestoNode("other2", URI.create("http://127.0.0.1:12"), .));
        nodeBuilder.add(new PrestoNode("other3", URI.create("http://127.0.0.1:13"), .));
        ImmutableList<Nodenodes = nodeBuilder.build();
        .addNode("foo"nodes);
        NodeSchedulerConfig nodeSchedulerConfig = new NodeSchedulerConfig()
                .setMaxSplitsPerNode(20)
                .setIncludeCoordinator(false)
                .setMaxPendingSplitsPerNodePerTask(10);
         = new NodeTaskMap();
         = new NodeScheduler(nodeSchedulerConfig);
         = new MockLocationFactory();
         = TestingSplit::createLocalSplit;
    }
    @Test(expectedExceptions = ExecutionException.class, expectedExceptionsMessageRegExp = ".*No nodes available to run query")
    public void testExcludeCoordinator()
            throws Exception
    {
        InMemoryNodeManager nodeManager = new InMemoryNodeManager();
        NodeScheduler nodeScheduler = new NodeScheduler(nodeManagernew NodeSchedulerConfig().setIncludeCoordinator(false), );
        // Start sql stage execution
        StageExecutionPlan tableScanPlan = createTableScanPlan("test", 20, TestingSplit::createEmptySplit);
        SqlStageExecution sqlStageExecution = createSqlStageExecution(nodeScheduler, 2, tableScanPlan);
        Future<?> future = sqlStageExecution.start();
        future.get(1, .);
    }
    @Test
    public void testSplitAssignment()
            throws Exception
    {
        // Start sql stage execution (schedule 15 splits in batches of 2), there are 3 nodes, each node should get 5 splits
        StageExecutionPlan tableScanPlan = createTableScanPlan("test", 15, );
        SqlStageExecution sqlStageExecution1 = createSqlStageExecution(, 2, tableScanPlan);
        Future<?> future1 = sqlStageExecution1.start();
        future1.get(1, .);
        for (RemoteTask remoteTask : sqlStageExecution1.getAllTasks()) {
            assertEquals(remoteTask.getPartitionedSplitCount(), 5);
        }
        // Add new node
        Node additionalNode = new PrestoNode("other4", URI.create("http://127.0.0.1:14"), .);
        .addNode("foo"additionalNode);
        // Schedule next query with 5 splits. Since the new node does not have any splits, all 5 splits are assigned to the new node
        StageExecutionPlan tableScanPlan2 = createTableScanPlan("test", 5, );
        SqlStageExecution sqlStageExecution2 = createSqlStageExecution(, 5, tableScanPlan2);
        Future<?> future2 = sqlStageExecution2.start();
        future2.get(1, .);
        List<RemoteTasktasks2 = sqlStageExecution2.getTasks(additionalNode);
        RemoteTask task = Iterables.getFirst(tasks2null);
        assertNotNull(task);
        assertEquals(task.getPartitionedSplitCount(), 5);
    }
    @Test
            throws Exception
    {
        // Start sql stage execution with 100 splits. Only 20 will be scheduled on each node as that is the maxSplitsPerNode
        StageExecutionPlan tableScanPlan = createTableScanPlan("test", 100, );
        SqlStageExecution sqlStageExecution1 = createSqlStageExecution(, 100, tableScanPlan);
        Future<?> future1 = sqlStageExecution1.start();
        // The stage scheduler will block and this will cause a timeout exception
        try {
            future1.get(2, .);
        }
        catch (TimeoutException e) {
            // expected
        }
        for (RemoteTask task : sqlStageExecution1.getAllTasks()) {
            assertEquals(task.getPartitionedSplitCount(), 20);
        }
    }
    private SqlStageExecution createSqlStageExecution(NodeScheduler nodeSchedulerint splitBatchSizeStageExecutionPlan tableScanPlan)
    {
        ExecutorService remoteTaskExecutor = newCachedThreadPool(daemonThreadsNamed("remoteTaskExecutor-%s"));
        MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory(remoteTaskExecutor);
        ExecutorService executor = newCachedThreadPool(daemonThreadsNamed("stageExecutor-%s"));
        OutputBuffers outputBuffers = 
                .withBuffer(new UnpartitionedPagePartitionFunction())
                .withNoMoreBufferIds();
        return new SqlStageExecution(new QueryId("query"),
                ,
                tableScanPlan,
                nodeScheduler,
                remoteTaskFactory,
                ,
                splitBatchSize,
                8,      // initialHashPartitions
                executor,
                ,
                outputBuffers);
    }
    @Test(enabled = false)
    public void testYieldCausesFullSchedule()
            throws Exception
    {
        ExecutorService executor = newCachedThreadPool(daemonThreadsNamed("test-%s"));
        SqlStageExecution stageExecution = null;
        try {
            StageExecutionPlan joinPlan = createJoinPlan("A");
            InMemoryNodeManager nodeManager = new InMemoryNodeManager();
            nodeManager.addNode("foo"new PrestoNode("other", URI.create("http://127.0.0.1:11"), .));
            OutputBuffers outputBuffers = 
                    .withBuffer(new UnpartitionedPagePartitionFunction())
                    .withNoMoreBufferIds();
            stageExecution = new SqlStageExecution(new QueryId("query"),
                    new MockLocationFactory(),
                    joinPlan,
                    new NodeScheduler(nodeManagernew NodeSchedulerConfig(), ),
                    new MockRemoteTaskFactory(executor),
                    ,
                    1000,
                    8,
                    executor,
                    ,
                    outputBuffers);
            Future<?> future = stageExecution.start();
            long start = System.nanoTime();
            while (true) {
                StageInfo stageInfo = stageExecution.getStageInfo();
                assertEquals(stageInfo.getState(), .);
                StageInfo tableScanInfo = stageInfo.getSubStages().get(0);
                StageState tableScanState = tableScanInfo.getState();
                switch (tableScanState) {
                    case :
                    case :
                    case :
                        break;
                    case :
                        // there should be two tasks (even though only one can ever be used)
                        assertEquals(stageInfo.getTasks().size(), 2);
                        assertEquals(tableScanInfo.getTasks().size(), 1);
                        assertEquals(tableScanInfo.getTasks().get(0).getOutputBuffers().getState(), .);
                        return;
                    case :
                    case :
                    case :
                        fail("Unexpected state for table scan stage " + tableScanState);
                        break;
                }
                if (..toSeconds(System.nanoTime() - start) > 1) {
                    fail("Expected test to complete within 1 second");
                }
                try {
                    future.get(50, .);
                }
                catch (TimeoutException e) {
                }
            }
        }
        finally {
            if (stageExecution != null) {
                stageExecution.cancel();
            }
            executor.shutdownNow();
        }
    }
    private StageExecutionPlan createJoinPlan(String planId)
    {
        // create table scan for build data with a single split, so it is only waiting on the no-more buffers call
        StageExecutionPlan build = createTableScanPlan("build", 1, );
        // create an exchange to read the build data
        RemoteSourceNode buildSource = new RemoteSourceNode(new PlanNodeId(planId + "-build"),
                build.getFragment().getId(),
                ImmutableList.copyOf(build.getFragment().getSymbols().keySet()));
        // create table scan for probe data with three splits, so it will not send the no-more buffers call
        StageExecutionPlan probe = createTableScanPlan("probe", 10, );
        // create an exchange to read the probe data
        RemoteSourceNode probeSource = new RemoteSourceNode(new PlanNodeId(planId + "-probe"),
                probe.getFragment().getId(),
                ImmutableList.copyOf(probe.getFragment().getSymbols().keySet()));
        // join build and probe
        JoinNode joinNode = new JoinNode(new PlanNodeId(planId), ..probeSourcebuildSource, ImmutableList.<EquiJoinClause>of(), Optional.empty(), Optional.empty());
        PlanFragment joinPlan = new PlanFragment(
                new PlanFragmentId(planId),
                joinNode,
                probe.getFragment().getSymbols(), // this is wrong, but it works
                joinNode.getOutputSymbols(),
                .,
                new PlanNodeId(planId),
                .,
                ImmutableList.<Symbol>of(),
                Optional.empty());
        return new StageExecutionPlan(joinPlan,
                probe.getDataSource(),
                ImmutableList.of(probebuild)
        );
    }
    private static StageExecutionPlan createTableScanPlan(String planIdint splitCountSupplier<ConnectorSplitsplitFactory)
    {
        Symbol symbol = new Symbol("column");
        // table scan with splitCount splits
        PlanNodeId tableScanNodeId = new PlanNodeId(planId);
        PlanFragment testFragment = new PlanFragment(
                new PlanFragmentId(planId),
                new TableScanNode(
                        tableScanNodeId,
                        new TableHandle("test"new TestingTableHandle()),
                        ImmutableList.of(symbol),
                        ImmutableMap.of(symbolnew TestingColumnHandle("column")),
                        Optional.empty(),
                        TupleDomain.all(),
                        null),
                ImmutableMap.<SymbolType>of(symbol),
                ImmutableList.of(symbol),
                .,
                tableScanNodeId,
                .,
                ImmutableList.<Symbol>of(),
                Optional.empty());
        ImmutableList.Builder<ConnectorSplitsplits = ImmutableList.builder();
        for (int i = 0; i < splitCounti++) {
            splits.add(splitFactory.get());
        }
        SplitSource splitSource = new ConnectorAwareSplitSource("test"new FixedSplitSource(nullsplits.build()));
        return new StageExecutionPlan(testFragment,
                Optional.of(splitSource),
                ImmutableList.<StageExecutionPlan>of()
        );
    }
    private static class MockRemoteTaskFactory
            implements RemoteTaskFactory
    {
        private final Executor executor;
        private MockRemoteTaskFactory(Executor executor)
        {
            this. = executor;
        }
        @Override
        public RemoteTask createRemoteTask(
                Session session,
                TaskId taskId,
                Node node,
                PlanFragment fragment,
                Multimap<PlanNodeIdSplitinitialSplits,
                OutputBuffers outputBuffers)
        {
            return new MockRemoteTask(taskIdnode.getNodeIdentifier(), initialSplits);
        }
        private static class MockRemoteTask
                implements RemoteTask
        {
            private final AtomicLong nextTaskInfoVersion = new AtomicLong(.);
            private final URI location;
            private final TaskStateMachine taskStateMachine;
            private final TaskContext taskContext;
            private final SharedBuffer sharedBuffer;
            private final String nodeId;
            @GuardedBy("this")
            private final Set<PlanNodeIdnoMoreSplits = new HashSet<>();
            @GuardedBy("this")
            private final Multimap<PlanNodeIdSplitsplits = HashMultimap.create();
            public MockRemoteTask(TaskId taskId,
                    String nodeId,
                    Executor executor,
                    Multimap<PlanNodeIdSplitinitialSplits)
            {
                this. = new TaskStateMachine(checkNotNull(taskId"taskId is null"), checkNotNull(executor"executor is null"));
                MemoryPool memoryPool = new MemoryPool(new MemoryPoolId("test"), new DataSize(1, ), false);
                this. = new QueryContext(falsenew DataSize(1, ), memoryPoolexecutor).addTaskContext(new DataSize(256, ), new DataSize(1, ), truetrue);
                this. = URI.create("fake://task/" + taskId);
                this. = new SharedBuffer(taskIdexecutorcheckNotNull(new DataSize(1, .), "maxBufferSize is null"));
                this. = nodeId;
                .putAll(initialSplits);
            }
            @Override
            public String getNodeId()
            {
                return ;
            }
            @Override
            public TaskInfo getTaskInfo()
            {
                TaskState state = .getState();
                List<ExecutionFailureInfofailures = ImmutableList.of();
                if (state == .) {
                    failures = toFailures(.getFailureCauses());
                }
                return new TaskInfo(
                        .getTaskId(),
                        Optional.empty(),
                        .getAndIncrement(),
                        state,
                        ,
                        DateTime.now(),
                        .getInfo(),
                        ImmutableSet.<PlanNodeId>of(),
                        .getTaskStats(),
                        failures);
            }
            public void finished()
            {
                .finished();
            }
            @Override
            public void start()
            {
            }
            @Override
            public void addSplits(PlanNodeId sourceIdIterable<Splitsplits)
            {
                checkNotNull(splits"splits is null");
                for (Split split : splits) {
                    this..put(sourceIdsplit);
                }
            }
            @Override
            public void noMoreSplits(PlanNodeId sourceId)
            {
                .add(sourceId);
            }
            @Override
            public void setOutputBuffers(OutputBuffers outputBuffers)
            {
                .setOutputBuffers(outputBuffers);
            }
            @Override
            public void addStateChangeListener(StateChangeListener<TaskInfostateChangeListener)
            {
                .addStateChangeListener(newValue -> stateChangeListener.stateChanged(getTaskInfo()));
            }
            @Override
            public void cancel()
            {
                .cancel();
            }
            @Override
            public void abort()
            {
                .abort();
            }
            @Override
            public int getPartitionedSplitCount()
            {
                if (.getState().isDone()) {
                    return 0;
                }
                return .size();
            }
            @Override
            public int getQueuedPartitionedSplitCount()
            {
                return getPartitionedSplitCount();
            }
        }
    }
New to GrepCode? Check out our FAQ X