Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.memory;
 
 
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import static com.facebook.presto.memory.LocalMemoryManager.GENERAL_POOL;
 import static com.facebook.presto.memory.LocalMemoryManager.RESERVED_POOL;
 import static com.facebook.presto.util.ImmutableCollectors.toImmutableList;
 import static com.facebook.presto.util.ImmutableCollectors.toImmutableSet;
 import static com.google.common.collect.Sets.difference;
 import static java.util.Objects.requireNonNull;
 
 public class ClusterMemoryManager
 {
     private static final Logger log = Logger.get(ClusterMemoryManager.class);
     private final NodeManager nodeManager;
     private final LocationFactory locationFactory;
     private final HttpClient httpClient;
     private final MBeanExporter exporter;
     private final JsonCodec<MemoryInfomemoryInfoCodec;
     private final DataSize maxQueryMemory;
     private final boolean enabled;
     private final AtomicLong memoryPoolAssignmentsVersion = new AtomicLong();
     private final AtomicLong clusterMemoryUsageBytes = new AtomicLong();
     private final AtomicLong clusterMemoryBytes = new AtomicLong();
     private final Map<StringRemoteNodeMemorynodes = new HashMap<>();
 
     @GuardedBy("this")
     private final Map<MemoryPoolIdClusterMemoryPoolpools = new HashMap<>();
 
     @Inject
     public ClusterMemoryManager(
             @ForMemoryManager HttpClient httpClient,
             NodeManager nodeManager,
             LocationFactory locationFactory,
             MBeanExporter exporter,
             JsonCodec<MemoryInfomemoryInfoCodec,
             JsonCodec<MemoryPoolAssignmentsRequestassignmentsRequestJsonCodec,
             MemoryManagerConfig config)
     {
         requireNonNull(config"config is null");
         this. = requireNonNull(nodeManager"nodeManager is null");
         this. = requireNonNull(locationFactory"locationFactory is null");
         this. = requireNonNull(httpClient"httpClient is null");
         this. = requireNonNull(exporter"exporter is null");
         this. = requireNonNull(memoryInfoCodec"memoryInfoCodec is null");
         this. = requireNonNull(assignmentsRequestJsonCodec"assignmentsRequestJsonCodec is null");
         this. = config.getMaxQueryMemory();
         this. = config.isClusterMemoryManagerEnabled();
     }
 
     public void process(Iterable<QueryExecutionqueries)
     {
         if (!) {
             return;
         }
         long totalBytes = 0;
         for (QueryExecution query : queries) {
             long bytes = query.getTotalMemoryReservation();
            totalBytes += bytes;
            if (bytes > .toBytes()) {
                query.fail(new ExceededMemoryLimitException("Query"));
            }
        }
        .set(totalBytes);
        Map<MemoryPoolIdIntegercountByPool = new HashMap<>();
        for (QueryExecution query : queries) {
            MemoryPoolId id = query.getMemoryPool().getId();
            countByPool.put(idcountByPool.getOrDefault(id, 0) + 1);
        }
        updatePools(countByPool);
        updateNodes(updateAssignments(queries));
    }
    synchronized Map<MemoryPoolIdClusterMemoryPoolgetPools()
    {
        return ImmutableMap.copyOf();
    }
    {
        ClusterMemoryPool reservedPool = .get();
        ClusterMemoryPool generalPool = .get();
        long version = .incrementAndGet();
        // Check that all previous assignments have propagated to the visible nodes. This doesn't account for temporary network issues,
        // and is more of a safety check than a guarantee
        if (reservedPool != null && generalPool != null && allAssignmentsHavePropagated(queries)) {
            if (reservedPool.getQueries() == 0 && generalPool.getBlockedNodes() > 0) {
                QueryExecution biggestQuery = null;
                long maxMemory = -1;
                for (QueryExecution queryExecution : queries) {
                    long bytesUsed = queryExecution.getTotalMemoryReservation();
                    if (bytesUsed > maxMemory) {
                        biggestQuery = queryExecution;
                        maxMemory = bytesUsed;
                    }
                }
                for (QueryExecution queryExecution : queries) {
                    if (queryExecution.getQueryId().equals(biggestQuery.getQueryId())) {
                        queryExecution.setMemoryPool(new VersionedMemoryPoolId(version));
                    }
                }
            }
        }
        ImmutableList.Builder<MemoryPoolAssignmentassignments = ImmutableList.builder();
        for (QueryExecution queryExecution : queries) {
            assignments.add(new MemoryPoolAssignment(queryExecution.getQueryId(), queryExecution.getMemoryPool().getId()));
        }
        return new MemoryPoolAssignmentsRequest(versionassignments.build());
    }
    private boolean allAssignmentsHavePropagated(Iterable<QueryExecutionqueries)
    {
        if (.isEmpty()) {
            // Assignments can't have propagated, if there are no visible nodes.
            return false;
        }
        long newestAssignment = ImmutableList.copyOf(queries).stream()
                .map(QueryExecution::getMemoryPool)
                .mapToLong(VersionedMemoryPoolId::getVersion)
                .min()
                .orElse(-1);
        long mostOutOfDateNode = .values().stream()
                .mapToLong(RemoteNodeMemory::getCurrentAssignmentVersion)
                .min()
                .orElse(.);
        return newestAssignment <= mostOutOfDateNode;
    }
    private void updateNodes(MemoryPoolAssignmentsRequest assignments)
    {
        Set<NodeactiveNodes = .getActiveNodes();
        ImmutableSet<StringactiveNodeIds = activeNodes.stream()
                .map(Node::getNodeIdentifier)
                .collect(toImmutableSet());
        // Remove nodes that don't exist anymore
        .keySet().removeAll(difference(.keySet(), activeNodeIds));
        // Add new nodes
        for (Node node : activeNodes) {
            if (!.containsKey(node.getNodeIdentifier())) {
            }
        }
        // Schedule refresh
        for (RemoteNodeMemory node : .values()) {
            node.asyncRefresh(assignments);
        }
    }
    private synchronized void updatePools(Map<MemoryPoolIdIntegerqueryCounts)
    {
        // Update view of cluster memory and pools
        List<MemoryInfonodeMemoryInfos = .values().stream()
                .map(RemoteNodeMemory::getInfo)
                .filter(Optional::isPresent)
                .map(Optional::get)
                .collect(toImmutableList());
        long totalClusterMemory = nodeMemoryInfos.stream()
                .map(MemoryInfo::getTotalNodeMemory)
                .mapToLong(DataSize::toBytes)
                .sum();
        .set(totalClusterMemory);
        Set<MemoryPoolIdactivePoolIds = nodeMemoryInfos.stream()
                .flatMap(info -> info.getPools().keySet().stream())
                .collect(toImmutableSet());
        Set<MemoryPoolIdremovedPools = difference(.keySet(), activePoolIds);
        for (MemoryPoolId removed : removedPools) {
            unexport(.get(removed));
            .remove(removed);
        }
        for (MemoryPoolId id : activePoolIds) {
            ClusterMemoryPool pool = .computeIfAbsent(idpoolId -> {
                ClusterMemoryPool newPool = new ClusterMemoryPool(poolId);
                String objectName = ObjectNames.builder(ClusterMemoryPool.classnewPool.getId().toString()).build();
                try {
                    .export(objectNamenewPool);
                }
                catch (JmxException e) {
                    .error(e"Error exporting memory pool %s"poolId);
                }
                return newPool;
            });
            pool.update(nodeMemoryInfosqueryCounts.getOrDefault(pool.getId(), 0));
        }
    }
    @PreDestroy
    public synchronized void destroy()
    {
        for (ClusterMemoryPool pool : .values()) {
            unexport(pool);
        }
        .clear();
    }
    private void unexport(ClusterMemoryPool pool)
    {
        try {
            String objectName = ObjectNames.builder(ClusterMemoryPool.classpool.getId().toString()).build();
            .unexport(objectName);
        }
        catch (JmxException e) {
            .error(e"Failed to unexport pool %s"pool.getId());
        }
    }
    @Managed
    public long getClusterMemoryUsageBytes()
    {
        return .get();
    }
    @Managed
    public long getClusterMemoryBytes()
    {
        return .get();
    }
New to GrepCode? Check out our FAQ X