Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.execution;
 
 
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE;
 import static com.facebook.presto.util.Failures.checkCondition;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 public class NodeScheduler
 {
     private final String coordinatorNodeId;
     private final NodeManager nodeManager;
     private final AtomicLong scheduleLocal = new AtomicLong();
     private final AtomicLong scheduleRack = new AtomicLong();
     private final AtomicLong scheduleRandom = new AtomicLong();
     private final int minCandidates;
     private final boolean locationAwareScheduling;
     private final boolean includeCoordinator;
     private final int maxSplitsPerNode;
     private final int maxSplitsPerNodePerTaskWhenFull;
     private final NodeTaskMap nodeTaskMap;
     private final boolean doubleScheduling;
 
     @Inject
     public NodeScheduler(NodeManager nodeManagerNodeSchedulerConfig configNodeTaskMap nodeTaskMap)
     {
         this. = nodeManager;
         this. = nodeManager.getCurrentNode().getNodeIdentifier();
         this. = config.getMinCandidates();
         this. = config.isLocationAwareSchedulingEnabled();
         this. = config.isIncludeCoordinator();
         this. = config.isMultipleTasksPerNodeEnabled();
         this. = config.getMaxSplitsPerNode();
         this. = checkNotNull(nodeTaskMap"nodeTaskMap is null");
         checkArgument( > "maxSplitsPerNode must be > maxSplitsPerNodePerTaskWhenFull");
     }
 
     @Managed
     public long getScheduleLocal()
     {
         return .get();
     }
 
     @Managed
     public long getScheduleRack()
     {
         return .get();
     }
 
     @Managed
     public long getScheduleRandom()
     {
         return .get();
     }
    @Managed
    public void reset()
    {
        .set(0);
        .set(0);
        .set(0);
    }
    public NodeSelector createNodeSelector(String dataSourceName)
    {
        // this supplier is thread-safe. TODO: this logic should probably move to the scheduler since the choice of which node to run in should be
        // done as close to when the the split is about to be scheduled
        Supplier<NodeMapnodeMap = Suppliers.memoizeWithExpiration(() -> {
            ImmutableSetMultimap.Builder<HostAddressNodebyHostAndPort = ImmutableSetMultimap.builder();
            ImmutableSetMultimap.Builder<InetAddressNodebyHost = ImmutableSetMultimap.builder();
            ImmutableSetMultimap.Builder<RackNodebyRack = ImmutableSetMultimap.builder();
            Set<Nodenodes;
            if (dataSourceName != null) {
                nodes = .getActiveDatasourceNodes(dataSourceName);
            }
            else {
                nodes = .getActiveNodes();
            }
            for (Node node : nodes) {
                try {
                    byHostAndPort.put(node.getHostAndPort(), node);
                    InetAddress host = InetAddress.getByName(node.getHttpUri().getHost());
                    byHost.put(hostnode);
                    byRack.put(Rack.of(host), node);
                }
                catch (UnknownHostException e) {
                    // ignore
                }
            }
            return new NodeMap(byHostAndPort.build(), byHost.build(), byRack.build());
        }, 5, .);
        return new NodeSelector(nodeMap);
    }
    public class NodeSelector
    {
        private final AtomicReference<Supplier<NodeMap>> nodeMap;
        public NodeSelector(Supplier<NodeMapnodeMap)
        {
            this. = new AtomicReference<>(nodeMap);
        }
        public void lockDownNodes()
        {
            .set(Suppliers.ofInstance(.get().get()));
        }
        public List<NodeallNodes()
        {
            return ImmutableList.copyOf(.get().get().getNodesByHostAndPort().values());
        }
        public Node selectCurrentNode()
        {
            // TODO: this is a hack to force scheduling on the coordinator
            return .getCurrentNode();
        }
        public List<NodeselectRandomNodes(int limit)
        {
            checkArgument(limit > 0, "limit must be at least 1");
            List<Nodeselected = new ArrayList<>(limit);
            for (Node node : lazyShuffle(.get().get().getNodesByHostAndPort().values())) {
                if ( || !.equals(node.getNodeIdentifier())) {
                    selected.add(node);
                }
                if (selected.size() >= limit) {
                    break;
                }
            }
            if ( && !selected.isEmpty()) {
                // Cycle the nodes until we reach the limit
                int uniqueNodes = selected.size();
                int i = 0;
                while (selected.size() < limit) {
                    if (i >= uniqueNodes) {
                        i = 0;
                    }
                    selected.add(selected.get(i));
                    i++;
                }
            }
            return selected;
        }

        
Identifies the nodes for running the specified splits.

Parameters:
splits the splits that need to be assigned to nodes
Returns:
a multimap from node to splits only for splits for which we could identify a node to schedule on. If we cannot find an assignment for a split, it is not included in the map.
        public Multimap<NodeSplitcomputeAssignments(Set<SplitsplitsIterable<RemoteTaskexistingTasks)
        {
            Multimap<NodeSplitassignment = HashMultimap.create();
            Map<NodeIntegerassignmentCount = new HashMap<>();
            // maintain a temporary local cache of partitioned splits on the node
            Map<NodeIntegersplitCountByNode = new HashMap<>();
            Map<StringIntegerqueuedSplitCountByNode = new HashMap<>();
            for (RemoteTask task : existingTasks) {
                String nodeId = task.getNodeId();
                queuedSplitCountByNode.put(nodeIdqueuedSplitCountByNode.getOrDefault(nodeId, 0) + task.getQueuedPartitionedSplitCount());
            }
            for (Split split : splits) {
                List<NodecandidateNodes;
                if ( || !split.isRemotelyAccessible()) {
                    candidateNodes = selectCandidateNodes(.get().get(), split);
                }
                else {
                    candidateNodes = selectRandomNodes();
                }
                checkCondition(!candidateNodes.isEmpty(), "No nodes available to run query");
                // compute and cache number of splits currently assigned to each node
                candidateNodes.stream()
                        .filter(node -> !splitCountByNode.containsKey(node))
                        .forEach(node -> splitCountByNode.put(node.getPartitionedSplitsOnNode(node)));
                Node chosenNode = null;
                int min = .;
                for (Node node : candidateNodes) {
                    int totalSplitCount = assignmentCount.getOrDefault(node, 0) + splitCountByNode.get(node);
                    if (totalSplitCount < min && totalSplitCount < ) {
                        chosenNode = node;
                        min = totalSplitCount;
                    }
                }
                if (chosenNode == null) {
                    for (Node node : candidateNodes) {
                        int assignedSplitCount = assignmentCount.getOrDefault(node, 0);
                        int queuedSplitCount = queuedSplitCountByNode.getOrDefault(node.getNodeIdentifier(), 0);
                        int totalSplitCount = queuedSplitCount + assignedSplitCount;
                        if (totalSplitCount < min && totalSplitCount < ) {
                            chosenNode = node;
                            min = totalSplitCount;
                        }
                    }
                }
                if (chosenNode != null) {
                    assignment.put(chosenNodesplit);
                    assignmentCount.put(chosenNodeassignmentCount.getOrDefault(chosenNode, 0) + 1);
                }
            }
            return assignment;
        }
        private List<NodeselectCandidateNodes(NodeMap nodeMapSplit split)
        {
            Set<Nodechosen = new LinkedHashSet<>();
            String coordinatorIdentifier = .getCurrentNode().getNodeIdentifier();
            // first look for nodes that match the hint
            for (HostAddress hint : split.getAddresses()) {
                nodeMap.getNodesByHostAndPort().get(hint).stream()
                        .filter(node ->  || !coordinatorIdentifier.equals(node.getNodeIdentifier()))
                        .filter(chosen::add)
                        .forEach(node -> .incrementAndGet());
                InetAddress address;
                try {
                    address = hint.toInetAddress();
                }
                catch (UnknownHostException e) {
                    // skip addresses that don't resolve
                    continue;
                }
                // consider a split with a host hint without a port as being accessible
                // by all nodes in that host
                if (!hint.hasPort() || split.isRemotelyAccessible()) {
                    nodeMap.getNodesByHost().get(address).stream()
                            .filter(node ->  || !coordinatorIdentifier.equals(node.getNodeIdentifier()))
                            .filter(chosen::add)
                            .forEach(node -> .incrementAndGet());
                }
            }
            // add nodes in same rack, if below the minimum count
            if (split.isRemotelyAccessible() && chosen.size() < ) {
                for (HostAddress hint : split.getAddresses()) {
                    InetAddress address;
                    try {
                        address = hint.toInetAddress();
                    }
                    catch (UnknownHostException e) {
                        // skip addresses that don't resolve
                        continue;
                    }
                    for (Node node : nodeMap.getNodesByRack().get(Rack.of(address))) {
                        if ( || !coordinatorIdentifier.equals(node.getNodeIdentifier())) {
                            if (chosen.add(node)) {
                                .incrementAndGet();
                            }
                            if (chosen.size() == ) {
                                break;
                            }
                        }
                    }
                    if (chosen.size() == ) {
                        break;
                    }
                }
            }
            // add some random nodes if below the minimum count
            if (split.isRemotelyAccessible()) {
                if (chosen.size() < ) {
                    for (Node node : lazyShuffle(nodeMap.getNodesByHost().values())) {
                        if ( || !coordinatorIdentifier.equals(node.getNodeIdentifier())) {
                            if (chosen.add(node)) {
                                .incrementAndGet();
                            }
                            if (chosen.size() == ) {
                                break;
                            }
                        }
                    }
                }
            }
            // if the chosen set is empty and the hint includes the coordinator,
            // force pick the coordinator
            if (chosen.isEmpty() && !) {
                HostAddress coordinatorHostAddress = .getCurrentNode().getHostAndPort();
                if (split.getAddresses().stream().anyMatch(host -> canSplitRunOnHost(splitcoordinatorHostAddresshost))) {
                    chosen.add(.getCurrentNode());
                }
            }
            return ImmutableList.copyOf(chosen);
        }
        private boolean canSplitRunOnHost(Split splitHostAddress coordinatorHostHostAddress host)
        {
            // Exact match of the coordinator
            if (host.equals(coordinatorHost)) {
                return true;
            }
            // If the split is remotely accessible or the split location doesn't specify a port,
            // we can ignore the coordinator's port and match just the ip address
            return (!host.hasPort() || split.isRemotelyAccessible()) &&
                    host.getHostText().equals(coordinatorHost.getHostText());
        }
    }
    private static <T> Iterable<T> lazyShuffle(Iterable<T> iterable)
    {
        return new Iterable<T>()
        {
            @Override
            public Iterator<T> iterator()
            {
                return new AbstractIterator<T>()
                {
                    private final List<T> list = Lists.newArrayList(iterable);
                    private int limit = .size();
                    @Override
                    protected T computeNext()
                    {
                        if ( == 0) {
                            return endOfData();
                        }
                        int position = ThreadLocalRandom.current().nextInt();
                        T result = .get(position);
                        .set(position.get( - 1));
                        --;
                        return result;
                    }
                };
            }
        };
    }
    private static class NodeMap
    {
        private final SetMultimap<HostAddressNodenodesByHostAndPort;
        private final SetMultimap<InetAddressNodenodesByHost;
        private final SetMultimap<RackNodenodesByRack;
        public NodeMap(SetMultimap<HostAddressNodenodesByHostAndPortSetMultimap<InetAddressNodenodesByHostSetMultimap<RackNodenodesByRack)
        {
            this. = nodesByHostAndPort;
            this. = nodesByHost;
            this. = nodesByRack;
        }
        private SetMultimap<HostAddressNodegetNodesByHostAndPort()
        {
            return ;
        }
        public SetMultimap<InetAddressNodegetNodesByHost()
        {
            return ;
        }
        public SetMultimap<RackNodegetNodesByRack()
        {
            return ;
        }
    }
    private static class Rack
    {
        private final int id;
        public static Rack of(InetAddress address)
        {
            // TODO: we need a plugin for this
            int id = InetAddresses.coerceToInteger(address) & 0xFF_FF_FF_00;
            return new Rack(id);
        }
        private Rack(int id)
        {
            this. = id;
        }
        @Override
        public boolean equals(Object o)
        {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            Rack rack = (Racko;
            if ( != rack.id) {
                return false;
            }
            return true;
        }
        @Override
        public int hashCode()
        {
            return ;
        }
    }
New to GrepCode? Check out our FAQ X