Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.execution;
 
 
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import static io.airlift.concurrent.Threads.daemonThreadsNamed;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
 @Test(singleThreaded = true)
 public class TestNodeScheduler
 {
     private NodeTaskMap nodeTaskMap;
     private InMemoryNodeManager nodeManager;
     private Map<NodeRemoteTasktaskMap;
 
     @BeforeMethod
     public void setUp()
             throws Exception
     {
          = new NodeTaskMap();
          = new InMemoryNodeManager();
 
         ImmutableList.Builder<NodenodeBuilder = ImmutableList.builder();
         nodeBuilder.add(new PrestoNode("other1", URI.create("http://127.0.0.1:11"), .));
         nodeBuilder.add(new PrestoNode("other2", URI.create("http://127.0.0.1:12"), .));
         nodeBuilder.add(new PrestoNode("other3", URI.create("http://127.0.0.1:13"), .));
         ImmutableList<Nodenodes = nodeBuilder.build();
         .addNode("foo"nodes);
         NodeSchedulerConfig nodeSchedulerConfig = new NodeSchedulerConfig()
                 .setMaxSplitsPerNode(20)
                 .setIncludeCoordinator(false)
                 .setMaxPendingSplitsPerNodePerTask(10);
 
         NodeScheduler nodeScheduler = new NodeScheduler(nodeSchedulerConfig);
         // contents of taskMap indicate the node-task map for the current stage
          = new HashMap<>();
          = nodeScheduler.createNodeSelector("foo");
          = Executors.newCachedThreadPool(daemonThreadsNamed("remoteTaskExecutor-%s"));
     }
 
     @AfterMethod
     public void tearDown()
             throws Exception
     {
         .shutdown();
     }
 
     @Test
             throws Exception
     {
         NodeSchedulerConfig config = new NodeSchedulerConfig()
                 .setMaxSplitsPerNode(20)
                 .setIncludeCoordinator(false)
                 .setLocationAwareSchedulingEnabled(false)
                 .setMaxPendingSplitsPerNodePerTask(10);
 
         NodeScheduler scheduler = new NodeScheduler(this.configthis.);
         NodeScheduler.NodeSelector selector = scheduler.createNodeSelector("foo");
        Split split = new Split("foo"new TestSplitLocal());
        Set<Splitsplits = ImmutableSet.of(split);
        Map.Entry<NodeSplitassignment = Iterables.getOnlyElement(selector.computeAssignments(splits.values()).entries());
        assertEquals(assignment.getKey().getHostAndPort(), split.getAddresses().get(0));
        assertEquals(assignment.getValue(), split);
    }
    @Test
    public void testScheduleLocal()
            throws Exception
    {
        Split split = new Split("foo"new TestSplitLocal());
        Set<Splitsplits = ImmutableSet.of(split);
        Map.Entry<NodeSplitassignment = Iterables.getOnlyElement(.computeAssignments(splits.values()).entries());
        assertEquals(assignment.getKey().getHostAndPort(), split.getAddresses().get(0));
        assertEquals(assignment.getValue(), split);
    }
    @Test
    public void testMultipleTasksPerNode()
    {
        NodeSchedulerConfig nodeSchedulerConfig = new NodeSchedulerConfig()
                .setMaxSplitsPerNode(20)
                .setIncludeCoordinator(false)
                .setMaxPendingSplitsPerNodePerTask(10);
        NodeScheduler nodeScheduler = new NodeScheduler(nodeSchedulerConfig);
        NodeScheduler.NodeSelector nodeSelector = nodeScheduler.createNodeSelector("foo");
        List<Nodenodes = nodeSelector.selectRandomNodes(10);
        assertEquals(nodes.size(), 3);
        nodeSchedulerConfig.setMultipleTasksPerNodeEnabled(true);
        nodeScheduler = new NodeScheduler(nodeSchedulerConfig);
        nodeSelector = nodeScheduler.createNodeSelector("foo");
        nodes = nodeSelector.selectRandomNodes(9);
        assertEquals(nodes.size(), 9);
        Map<StringIntegercounts = new HashMap<>();
        for (Node node : nodes) {
            Integer value = counts.get(node.getNodeIdentifier());
            counts.put(node.getNodeIdentifier(), (value == null ? 0 : value) + 1);
        }
        assertEquals(counts.get("other1").intValue(), 3);
        assertEquals(counts.get("other2").intValue(), 3);
        assertEquals(counts.get("other3").intValue(), 3);
    }
    @Test
    public void testScheduleRemote()
            throws Exception
    {
        Set<Splitsplits = new HashSet<>();
        splits.add(new Split("foo"new TestSplitRemote()));
        Multimap<NodeSplitassignments = .computeAssignments(splits.values());
        assertEquals(assignments.size(), 1);
    }
    @Test
    public void testBasicAssignment()
            throws Exception
    {
        // One split for each node
        Set<Splitsplits = new HashSet<>();
        for (int i = 0; i < 3; i++) {
            splits.add(new Split("foo"new TestSplitRemote()));
        }
        Multimap<NodeSplitassignments = .computeAssignments(splits.values());
        assertEquals(assignments.entries().size(), 3);
        for (Node node : .getActiveDatasourceNodes("foo")) {
            assertTrue(assignments.keySet().contains(node));
        }
    }
    @Test
    public void testMaxSplitsPerNode()
            throws Exception
    {
        Node newNode = new PrestoNode("other4", URI.create("http://127.0.0.1:14"), .);
        .addNode("foo"newNode);
        ImmutableList.Builder<SplitinitialSplits = ImmutableList.builder();
        for (int i = 0; i < 10; i++) {
            initialSplits.add(new Split("foo"new TestSplitRemote()));
        }
        MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory();
        // Max out number of splits on node
        RemoteTask remoteTask1 = remoteTaskFactory.createTableScanTask(newNodeinitialSplits.build());
        .addTask(newNoderemoteTask1);
        RemoteTask remoteTask2 = remoteTaskFactory.createTableScanTask(newNodeinitialSplits.build());
        .addTask(newNoderemoteTask2);
        Set<Splitsplits = new HashSet<>();
        for (int i = 0; i < 5; i++) {
            splits.add(new Split("foo"new TestSplitRemote()));
        }
        Multimap<NodeSplitassignments = .computeAssignments(splits.values());
        // no split should be assigned to the newNode, as it already has maxNodeSplits assigned to it
        assertFalse(assignments.keySet().contains(newNode));
    }
    @Test
    public void testMaxSplitsPerNodePerTask()
            throws Exception
    {
        Node newNode = new PrestoNode("other4", URI.create("http://127.0.0.1:14"), .);
        .addNode("foo"newNode);
        ImmutableList.Builder<SplitinitialSplits = ImmutableList.builder();
        for (int i = 0; i < 20; i++) {
            initialSplits.add(new Split("foo"new TestSplitRemote()));
        }
        MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory();
        for (Node node : .getActiveDatasourceNodes("foo")) {
            // Max out number of splits on node
            RemoteTask remoteTask = remoteTaskFactory.createTableScanTask(nodeinitialSplits.build());
            .addTask(noderemoteTask);
        }
        RemoteTask newRemoteTask = remoteTaskFactory.createTableScanTask(newNodeinitialSplits.build());
        // Max out pending splits on new node
        .put(newNodenewRemoteTask);
        .addTask(newNodenewRemoteTask);
        Set<Splitsplits = new HashSet<>();
        for (int i = 0; i < 5; i++) {
            splits.add(new Split("foo"new TestSplitRemote()));
        }
        Multimap<NodeSplitassignments = .computeAssignments(splits.values());
        // no split should be assigned to the newNode, as it already has
        // maxSplitsPerNode + maxSplitsPerNodePerTask assigned to it
        assertEquals(assignments.keySet().size(), 3); // Splits should be scheduled on the other three nodes
        assertFalse(assignments.keySet().contains(newNode)); // No splits scheduled on the maxed out node
    }
    @Test
    public void testTaskCompletion()
            throws Exception
    {
        MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory();
        Node chosenNode = Iterables.get(.getActiveDatasourceNodes("foo"), 0);
        RemoteTask remoteTask = remoteTaskFactory.createTableScanTask(chosenNode, ImmutableList.of(new Split("foo"new TestSplitRemote())));
        .addTask(chosenNoderemoteTask);
        assertEquals(.getPartitionedSplitsOnNode(chosenNode), 1);
        remoteTask.abort();
        ..sleep(100); // Sleep until cache expires
        assertEquals(.getPartitionedSplitsOnNode(chosenNode), 0);
    }
    private class TestSplitLocal
            implements ConnectorSplit
    {
        @Override
        public boolean isRemotelyAccessible()
        {
            return false;
        }
        @Override
        public List<HostAddressgetAddresses()
        {
            return ImmutableList.of(HostAddress.fromString("127.0.0.1:11"));
        }
        @Override
        public Object getInfo()
        {
            return this;
        }
    }
    private class TestSplitRemote
            implements ConnectorSplit
    {
        @Override
        public boolean isRemotelyAccessible()
        {
            return true;
        }
        @Override
        public List<HostAddressgetAddresses()
        {
            int randomPort = ThreadLocalRandom.current().nextInt(5000);
            return ImmutableList.of(HostAddress.fromString("127.0.0.1:" + randomPort));
        }
        @Override
        public Object getInfo()
        {
            return this;
        }
    }
New to GrepCode? Check out our FAQ X