Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.execution;
 
 
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE;
 import static com.facebook.presto.util.Failures.checkCondition;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 public class NodeScheduler
 {
     private final NodeManager nodeManager;
     private final AtomicLong scheduleLocal = new AtomicLong();
     private final AtomicLong scheduleRack = new AtomicLong();
     private final AtomicLong scheduleRandom = new AtomicLong();
     private final int minCandidates;
     private final boolean locationAwareScheduling;
     private final boolean includeCoordinator;
     private final int maxSplitsPerNode;
     private final int maxSplitsPerNodePerTaskWhenFull;
     private final NodeTaskMap nodeTaskMap;
     private final boolean doubleScheduling;
 
     @Inject
     public NodeScheduler(NodeManager nodeManagerNodeSchedulerConfig configNodeTaskMap nodeTaskMap)
     {
         this. = nodeManager;
         this. = config.getMinCandidates();
         this. = config.isLocationAwareSchedulingEnabled();
         this. = config.isIncludeCoordinator();
         this. = config.isMultipleTasksPerNodeEnabled();
         this. = config.getMaxSplitsPerNode();
         this. = checkNotNull(nodeTaskMap"nodeTaskMap is null");
         checkArgument( > "maxSplitsPerNode must be > maxSplitsPerNodePerTaskWhenFull");
     }
 
     @Managed
     public long getScheduleLocal()
     {
         return .get();
     }
 
     @Managed
     public long getScheduleRack()
     {
         return .get();
     }
 
     @Managed
     public long getScheduleRandom()
     {
         return .get();
     }
 
    @Managed
    public void reset()
    {
        .set(0);
        .set(0);
        .set(0);
    }
    public NodeSelector createNodeSelector(final String dataSourceName)
    {
        // this supplier is thread-safe. TODO: this logic should probably move to the scheduler since the choice of which node to run in should be
        // done as close to when the the split is about to be scheduled
        Supplier<NodeMapnodeMap = Suppliers.memoizeWithExpiration(new Supplier<NodeMap>()
        {
            @Override
            public NodeMap get()
            {
                ImmutableSetMultimap.Builder<HostAddressNodebyHostAndPort = ImmutableSetMultimap.builder();
                ImmutableSetMultimap.Builder<InetAddressNodebyHost = ImmutableSetMultimap.builder();
                ImmutableSetMultimap.Builder<RackNodebyRack = ImmutableSetMultimap.builder();
                Set<Nodenodes;
                if (dataSourceName != null) {
                    nodes = .getActiveDatasourceNodes(dataSourceName);
                }
                else {
                    nodes = .getActiveNodes();
                }
                for (Node node : nodes) {
                    try {
                        byHostAndPort.put(node.getHostAndPort(), node);
                        InetAddress host = InetAddress.getByName(node.getHttpUri().getHost());
                        byHost.put(hostnode);
                        byRack.put(Rack.of(host), node);
                    }
                    catch (UnknownHostException e) {
                        // ignore
                    }
                }
                return new NodeMap(byHostAndPort.build(), byHost.build(), byRack.build());
            }
        }, 5, .);
        return new NodeSelector(nodeMap);
    }
    public class NodeSelector
    {
        private final AtomicReference<Supplier<NodeMap>> nodeMap;
        public NodeSelector(Supplier<NodeMapnodeMap)
        {
            this. = new AtomicReference<>(nodeMap);
        }
        public void lockDownNodes()
        {
            .set(Suppliers.ofInstance(.get().get()));
        }
        public List<NodeallNodes()
        {
            return ImmutableList.copyOf(.get().get().getNodesByHostAndPort().values());
        }
        public Node selectCurrentNode()
        {
            // TODO: this is a hack to force scheduling on the coordinator
            return .getCurrentNode();
        }
        public List<NodeselectRandomNodes(int limit)
        {
            checkArgument(limit > 0, "limit must be at least 1");
            final String coordinatorIdentifier = .getCurrentNode().getNodeIdentifier();
            FluentIterable<Nodenodes = FluentIterable.from(lazyShuffle(.get().get().getNodesByHostAndPort().values()))
                    .filter(new Predicate<Node>()
                    {
                        @Override
                        public boolean apply(Node node)
                        {
                            return  || !coordinatorIdentifier.equals(node.getNodeIdentifier());
                        }
                    });
            if () {
                nodes = nodes.cycle();
            }
            return nodes.limit(limit).toList();
        }

        
Identifies the nodes for runnning the specified splits.

Parameters:
splits the splits that need to be assigned to nodes
Returns:
a multimap from node to splits only for splits for which we could identify a node to schedule on. If we cannot find an assignment for a split, it is not included in the map.
        public Multimap<NodeSplitcomputeAssignments(Set<SplitsplitsIterable<RemoteTaskexistingTasks)
        {
            Multimap<NodeSplitassignment = HashMultimap.create();
            Map<NodeIntegerassignmentCount = new HashMap<>();
            // maintain a temporary local cache of partitioned splits on the node
            Map<NodeIntegersplitCountByNode = new HashMap<>();
            Map<StringIntegerqueuedSplitCountByNode = new HashMap<>();
            for (RemoteTask task : existingTasks) {
                String nodeId = task.getNodeId();
                if (!queuedSplitCountByNode.containsKey(nodeId)) {
                    queuedSplitCountByNode.put(nodeId, 0);
                }
                queuedSplitCountByNode.put(nodeIdqueuedSplitCountByNode.get(nodeId) + task.getQueuedPartitionedSplitCount());
            }
            for (Split split : splits) {
                List<NodecandidateNodes;
                if ( || !split.isRemotelyAccessible()) {
                    candidateNodes = selectCandidateNodes(.get().get(), split);
                }
                else {
                    candidateNodes = selectRandomNodes();
                }
                checkCondition(!candidateNodes.isEmpty(), "No nodes available to run query");
                // compute and cache number of splits currently assigned to each node
                for (Node node : candidateNodes) {
                    if (!splitCountByNode.containsKey(node)) {
                        splitCountByNode.put(node.getPartitionedSplitsOnNode(node));
                    }
                }
                Node chosenNode = null;
                int min = .;
                for (Node node : candidateNodes) {
                    int assignedSplitCount = assignmentCount.containsKey(node) ? assignmentCount.get(node) : 0;
                    int totalSplitCount = assignedSplitCount + splitCountByNode.get(node);
                    if (totalSplitCount < min && totalSplitCount < ) {
                        chosenNode = node;
                        min = totalSplitCount;
                    }
                }
                if (chosenNode == null) {
                    for (Node node : candidateNodes) {
                        int assignedSplitCount = assignmentCount.containsKey(node) ? assignmentCount.get(node) : 0;
                        int queuedSplitCount = 0;
                        if (queuedSplitCountByNode.containsKey(node.getNodeIdentifier())) {
                            queuedSplitCount = queuedSplitCountByNode.get(node.getNodeIdentifier());
                        }
                        int totalSplitCount = queuedSplitCount + assignedSplitCount;
                        if (totalSplitCount < min && totalSplitCount < ) {
                            chosenNode = node;
                            min = totalSplitCount;
                        }
                    }
                }
                if (chosenNode != null) {
                    assignment.put(chosenNodesplit);
                    int count = assignmentCount.containsKey(chosenNode) ? assignmentCount.get(chosenNode) : 0;
                    assignmentCount.put(chosenNodecount + 1);
                }
            }
            return assignment;
        }
        private List<NodeselectCandidateNodes(NodeMap nodeMapfinal Split split)
        {
            Set<Nodechosen = new LinkedHashSet<>();
            String coordinatorIdentifier = .getCurrentNode().getNodeIdentifier();
            // first look for nodes that match the hint
            for (HostAddress hint : split.getAddresses()) {
                for (Node node : nodeMap.getNodesByHostAndPort().get(hint)) {
                    if ( || !coordinatorIdentifier.equals(node.getNodeIdentifier())) {
                        if (chosen.add(node)) {
                            .incrementAndGet();
                        }
                    }
                }
                InetAddress address;
                try {
                    address = hint.toInetAddress();
                }
                catch (UnknownHostException e) {
                    // skip addresses that don't resolve
                    continue;
                }
                // consider a split with a host hint without a port as being accessible
                // by all nodes in that host
                if (!hint.hasPort() || split.isRemotelyAccessible()) {
                    for (Node node : nodeMap.getNodesByHost().get(address)) {
                        if ( || !coordinatorIdentifier.equals(node.getNodeIdentifier())) {
                            if (chosen.add(node)) {
                                .incrementAndGet();
                            }
                        }
                    }
                }
            }
            // add nodes in same rack, if below the minimum count
            if (split.isRemotelyAccessible() && chosen.size() < ) {
                for (HostAddress hint : split.getAddresses()) {
                    InetAddress address;
                    try {
                        address = hint.toInetAddress();
                    }
                    catch (UnknownHostException e) {
                        // skip addresses that don't resolve
                        continue;
                    }
                    for (Node node : nodeMap.getNodesByRack().get(Rack.of(address))) {
                        if ( || !coordinatorIdentifier.equals(node.getNodeIdentifier())) {
                            if (chosen.add(node)) {
                                .incrementAndGet();
                            }
                            if (chosen.size() == ) {
                                break;
                            }
                        }
                    }
                    if (chosen.size() == ) {
                        break;
                    }
                }
            }
            // add some random nodes if below the minimum count
            if (split.isRemotelyAccessible()) {
                if (chosen.size() < ) {
                    for (Node node : lazyShuffle(nodeMap.getNodesByHost().values())) {
                        if ( || !coordinatorIdentifier.equals(node.getNodeIdentifier())) {
                            if (chosen.add(node)) {
                                .incrementAndGet();
                            }
                            if (chosen.size() == ) {
                                break;
                            }
                        }
                    }
                }
            }
            // if the chosen set is empty and the hint includes the coordinator,
            // force pick the coordinator
            if (chosen.isEmpty() && !) {
                final HostAddress coordinatorHostAddress = .getCurrentNode().getHostAndPort();
                if (FluentIterable.from(split.getAddresses()).anyMatch(new Predicate<HostAddress>() {
                    @Override
                    public boolean apply(HostAddress hostAddress)
                    {
                        // Exact match of the coordinator
                        if (hostAddress.equals(coordinatorHostAddress)) {
                            return true;
                        }
                        // If the split is remotely accessible or the split location doesn't specify a port,
                        // we can ignore the coordinator's port and match just the ip address
                        return (!hostAddress.hasPort() || split.isRemotelyAccessible()) &&
                                hostAddress.getHostText().equals(coordinatorHostAddress.getHostText());
                    }
                })) {
                   chosen.add(.getCurrentNode());
                }
            }
            return ImmutableList.copyOf(chosen);
        }
    }
    private static <T> Iterable<T> lazyShuffle(final Iterable<T> iterable)
    {
        return new Iterable<T>()
        {
            @Override
            public Iterator<T> iterator()
            {
                return new AbstractIterator<T>()
                {
                    List<T> list = Lists.newArrayList(iterable);
                    int limit = .size();
                    @Override
                    protected T computeNext()
                    {
                        if ( == 0) {
                            return endOfData();
                        }
                        int position = ThreadLocalRandom.current().nextInt();
                        T result = .get(position);
                        .set(position.get( - 1));
                        --;
                        return result;
                    }
                };
            }
        };
    }
    private static class NodeMap
    {
        private final SetMultimap<HostAddressNodenodesByHostAndPort;
        private final SetMultimap<InetAddressNodenodesByHost;
        private final SetMultimap<RackNodenodesByRack;
        public NodeMap(SetMultimap<HostAddressNodenodesByHostAndPortSetMultimap<InetAddressNodenodesByHostSetMultimap<RackNodenodesByRack)
        {
            this. = nodesByHostAndPort;
            this. = nodesByHost;
            this. = nodesByRack;
        }
        private SetMultimap<HostAddressNodegetNodesByHostAndPort()
        {
            return ;
        }
        public SetMultimap<InetAddressNodegetNodesByHost()
        {
            return ;
        }
        public SetMultimap<RackNodegetNodesByRack()
        {
            return ;
        }
    }
    private static class Rack
    {
        private int id;
        public static Rack of(InetAddress address)
        {
            // TODO: this needs to be pluggable
            int id = InetAddresses.coerceToInteger(address) & 0xFF_FF_FF_00;
            return new Rack(id);
        }
        private Rack(int id)
        {
            this. = id;
        }
        @Override
        public boolean equals(Object o)
        {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            Rack rack = (Racko;
            if ( != rack.id) {
                return false;
            }
            return true;
        }
        @Override
        public int hashCode()
        {
            return ;
        }
    }
New to GrepCode? Check out our FAQ X