Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package org.apache.helix.controller.strategy;
  
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
  *   http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 
 
 public class AutoRebalanceStrategy {
 
   private static Logger logger = Logger.getLogger(AutoRebalanceStrategy.class);
 
   private final String _resourceName;
   private final List<String_partitions;
   private final LinkedHashMap<StringInteger_states;
   private final int _maximumPerNode;
 
   private Map<StringNode_nodeMap;
   private List<Node_liveNodesList;
   private Map<IntegerString_stateMap;
 
   private Set<Replica_orphaned;

  
Initialize this strategy for a resource

Parameters:
resourceName the resource for which an assignment will be computed
partitions the partition names for the resource
states the states and the number of replicas that should be in each state
maximumPerNode the maximum number of replicas any note can hold
placementScheme the scheme to use for preferred replica locations. If null, this is AutoRebalanceStrategy.DefaultPlacementScheme
 
   public AutoRebalanceStrategy(String resourceNamefinal List<Stringpartitions,
       final LinkedHashMap<StringIntegerstatesint maximumPerNode,
       ReplicaPlacementScheme placementScheme) {
      = resourceName;
      = partitions;
      = states;
      = maximumPerNode;
     if (placementScheme != null) {
        = placementScheme;
     } else {
        = new DefaultPlacementScheme();
     }
   }

  
 
   public AutoRebalanceStrategy(String resourceNamefinal List<Stringpartitions,
       final LinkedHashMap<StringIntegerstates) {
     this(resourceNamepartitionsstates.new DefaultPlacementScheme());
   }

  
Constructor to support logically-typed Helix components

Parameters:
resourceId the resource for which to compute an assignment
partitions the partitions of the resource
states the states and counts for each state
maximumPerNode the maximum number of replicas per node
placementScheme the scheme to use for preferred replica locations. If null, this is AutoRebalanceStrategy.DefaultPlacementScheme
  public AutoRebalanceStrategy(ResourceId resourceIdfinal List<PartitionIdpartitions,
      final LinkedHashMap<StateIntegerstatesint maximumPerNode,
      ReplicaPlacementScheme placementScheme) {
    LinkedHashMap<StringIntegerrawStateCountMap = new LinkedHashMap<StringInteger>();
    for (State state : states.keySet()) {
      rawStateCountMap.put(state.toString(), states.get(state));
    }
    List<StringpartitionNames = Lists.transform(partitions, Functions.toStringFunction());
     = resourceId.stringify();
     = partitionNames;
     = rawStateCountMap;
     = maximumPerNode;
    if (placementScheme != null) {
       = placementScheme;
    } else {
    }
  }

  
Wrap computePartitionAssignment(java.util.List,java.util.Map,java.util.List) with a function that takes concrete types

Parameters:
liveNodes list of live participant ids
currentMapping map of partition id to map of participant id to state
allNodes list of all participant ids
Returns:
the preference list and replica mapping
      final Map<PartitionIdMap<ParticipantIdState>> currentMapping,
      final List<ParticipantIdallNodes) {
    final List<StringrawLiveNodes = Lists.transform(liveNodes, Functions.toStringFunction());
    final List<StringrawAllNodes = Lists.transform(allNodes, Functions.toStringFunction());
    final Map<StringMap<StringString>> rawCurrentMapping =
        ResourceAssignment.stringMapsFromReplicaMaps(currentMapping);
    return computePartitionAssignment(rawLiveNodesrawCurrentMappingrawAllNodes);
  }

  
Determine a preference list and mapping of partitions to nodes for all replicas

Parameters:
liveNodes the current list of live participants
currentMapping the current assignment of replicas to nodes
allNodes the full list of known nodes in the system
Returns:
the preference list and replica mapping
  public ZNRecord computePartitionAssignment(final List<StringliveNodes,
      final Map<StringMap<StringString>> currentMappingfinal List<StringallNodes) {
    List<StringsortedLiveNodes = new ArrayList<String>(liveNodes);
    Collections.sort(sortedLiveNodes);
    List<StringsortedAllNodes = new ArrayList<String>(allNodes);
    Collections.sort(sortedAllNodes);
    int numReplicas = countStateReplicas();
    ZNRecord znRecord = new ZNRecord();
    if (sortedLiveNodes.size() == 0) {
      return znRecord;
    }
    int distRemainder = (numReplicas * .size()) % sortedLiveNodes.size();
    int distFloor = (numReplicas * .size()) / sortedLiveNodes.size();
     = new HashMap<StringNode>();
     = new ArrayList<Node>();
    for (String id : sortedAllNodes) {
      Node node = new Node(id);
      node.capacity = 0;
      node.hasCeilingCapacity = false;
      .put(idnode);
    }
    for (int i = 0; i < sortedLiveNodes.size(); i++) {
      boolean usingCeiling = false;
      int targetSize = ( > 0) ? Math.min(distFloor) : distFloor;
      if (distRemainder > 0 && targetSize < ) {
        targetSize += 1;
        distRemainder = distRemainder - 1;
        usingCeiling = true;
      }
      Node node = .get(sortedLiveNodes.get(i));
      node.isAlive = true;
      node.capacity = targetSize;
      node.hasCeilingCapacity = usingCeiling;
      .add(node);
    }
    // compute states for all replica ids
    // compute the preferred mapping if all nodes were up
    // logger.info("preferred mapping:"+ preferredAssignment);
    // from current mapping derive the ones in preferred location
    // this will update the nodes with their current fill status
    // from current mapping derive the ones not in preferred location
    // compute orphaned replicas that are not assigned to any node
    if (.isInfoEnabled()) {
      .info("orphan = " + );
    }
    assignOrphans();
    prepareResult(znRecord);
    return znRecord;
  }

  
Move replicas assigned to non-preferred nodes if their current node is at capacity and its preferred node is under capacity.
    // iterate through non preferred and see if we can move them to the
    // preferred location if the donor has more than it should and stealer has
    // enough capacity
    while (iterator.hasNext()) {
      Entry<ReplicaNodeentry = iterator.next();
      Replica replica = entry.getKey();
      Node donor = entry.getValue();
      Node receiver = .get(replica);
      if (donor.capacity < donor.currentlyAssigned
          && receiver.capacity > receiver.currentlyAssigned && receiver.canAdd(replica)) {
        donor.currentlyAssigned = donor.currentlyAssigned - 1;
        receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
        donor.nonPreferred.remove(replica);
        receiver.preferred.add(replica);
        donor.newReplicas.remove(replica);
        receiver.newReplicas.add(replica);
        iterator.remove();
      }
    }
  }

  
Slot in orphaned partitions randomly so as to maintain even load on live nodes.
  private void assignOrphans() {
    // now iterate over nodes and remaining orphaned partitions and assign
    // partitions randomly
    // Better to iterate over orphaned partitions first
    while (it.hasNext()) {
      Replica replica = it.next();
      boolean added = false;
      int startIndex = computeRandomStartIndex(replica);
      for (int index = startIndexindex < startIndex + .size(); index++) {
        Node receiver = .get(index % .size());
        if (receiver.capacity > receiver.currentlyAssigned && receiver.canAdd(replica)) {
          receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
          receiver.nonPreferred.add(replica);
          receiver.newReplicas.add(replica);
          added = true;
          break;
        }
      }
      if (!added) {
        // try adding the replica by making room for it
        added = assignOrphanByMakingRoom(replica);
      }
      if (added) {
        it.remove();
      }
    }
    if (.size() > 0 && .isInfoEnabled()) {
      .info("could not assign nodes to partitions: " + );
    }
  }

  
If an orphan can't be assigned normally, see if a node can borrow capacity to accept it

Parameters:
replica The replica to assign
Returns:
true if the assignment succeeded, false otherwise
  private boolean assignOrphanByMakingRoom(Replica replica) {
    Node capacityDonor = null;
    Node capacityAcceptor = null;
    int startIndex = computeRandomStartIndex(replica);
    for (int index = startIndexindex < startIndex + .size(); index++) {
      Node current = .get(index % .size());
      if (current.hasCeilingCapacity && current.capacity > current.currentlyAssigned
          && !current.canAddIfCapacity(replica) && capacityDonor == null) {
        // this node has space but cannot accept the node
        capacityDonor = current;
      } else if (!current.hasCeilingCapacity && current.capacity == current.currentlyAssigned
          && current.canAddIfCapacity(replica) && capacityAcceptor == null) {
        // this node would be able to accept the replica if it has ceiling capacity
        capacityAcceptor = current;
      }
      if (capacityDonor != null && capacityAcceptor != null) {
        break;
      }
    }
    if (capacityDonor != null && capacityAcceptor != null) {
      // transfer ceiling capacity and add the node
      capacityAcceptor.steal(capacityDonorreplica);
      return true;
    }
    return false;
  }

  
Move replicas from too-full nodes to nodes that can accept the replicas
  private void moveExcessReplicas() {
    // iterate over nodes and move extra load
    Iterator<Replicait;
    for (Node donor : ) {
      if (donor.capacity < donor.currentlyAssigned) {
        Collections.sort(donor.nonPreferred);
        it = donor.nonPreferred.iterator();
        while (it.hasNext()) {
          Replica replica = it.next();
          int startIndex = computeRandomStartIndex(replica);
          for (int index = startIndexindex < startIndex + .size(); index++) {
            Node receiver = .get(index % .size());
            if (receiver.canAdd(replica)) {
              receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
              receiver.nonPreferred.add(replica);
              donor.currentlyAssigned = donor.currentlyAssigned - 1;
              it.remove();
              break;
            }
          }
          if (donor.capacity >= donor.currentlyAssigned) {
            break;
          }
        }
        if (donor.capacity < donor.currentlyAssigned) {
          .warn("Could not take partitions out of node:" + donor.id);
        }
      }
    }
  }

  
Update a ZNRecord with the results of the rebalancing.

Parameters:
znRecord
  private void prepareResult(ZNRecord znRecord) {
    // The map fields are keyed on partition name to a pair of node and state, i.e. it
    // indicates that the partition with given state is served by that node
    //
    // The list fields are also keyed on partition and list all the nodes serving that partition.
    // This is useful to verify that there is no node serving multiple replicas of the same
    // partition.
    Map<StringList<String>> newPreferences = new TreeMap<StringList<String>>();
    for (String partition : ) {
      znRecord.setMapField(partitionnew TreeMap<StringString>());
      znRecord.setListField(partitionnew ArrayList<String>());
      newPreferences.put(partitionnew ArrayList<String>());
    }
    // for preference lists, the rough priority that we want is:
    // [existing preferred, existing non-preferred, non-existing preferred, non-existing
    // non-preferred]
    for (Node node : ) {
      for (Replica replica : node.preferred) {
        if (node.newReplicas.contains(replica)) {
          newPreferences.get(replica.partition).add(node.id);
        } else {
          znRecord.getListField(replica.partition).add(node.id);
        }
      }
    }
    for (Node node : ) {
      for (Replica replica : node.nonPreferred) {
        if (node.newReplicas.contains(replica)) {
          newPreferences.get(replica.partition).add(node.id);
        } else {
          znRecord.getListField(replica.partition).add(node.id);
        }
      }
    }
    normalizePreferenceLists(znRecord.getListFields(), newPreferences);
    // generate preference maps based on the preference lists
    for (String partition : ) {
      List<StringpreferenceList = znRecord.getListField(partition);
      int i = 0;
      for (String participant : preferenceList) {
        znRecord.getMapField(partition).put(participant.get(i));
        i++;
      }
    }
  }

  
Adjust preference lists to reduce the number of same replicas on an instance. This will separately normalize two sets of preference lists, and then append the results of the second set to those of the first. This basically ensures that existing replicas are automatically preferred.

Parameters:
preferenceLists map of (partition --> list of nodes)
newPreferences map containing node preferences not consistent with the current assignment
  private void normalizePreferenceLists(Map<StringList<String>> preferenceLists,
      Map<StringList<String>> newPreferences) {
    Map<StringMap<StringInteger>> nodeReplicaCounts =
        new HashMap<StringMap<StringInteger>>();
    for (String partition : preferenceLists.keySet()) {
      normalizePreferenceList(preferenceLists.get(partition), nodeReplicaCounts);
    }
    for (String partition : newPreferences.keySet()) {
      normalizePreferenceList(newPreferences.get(partition), nodeReplicaCounts);
      preferenceLists.get(partition).addAll(newPreferences.get(partition));
    }
  }

  
Adjust a single preference list for replica assignment imbalance

Parameters:
preferenceList list of node names
nodeReplicaCounts map of (node --> state --> count)
  private void normalizePreferenceList(List<StringpreferenceList,
      Map<StringMap<StringInteger>> nodeReplicaCounts) {
    // make this a LinkedHashSet to preserve iteration order
    Set<StringnotAssigned = new LinkedHashSet<String>(preferenceList);
    List<StringnewPreferenceList = new ArrayList<String>();
    int replicas = Math.min(countStateReplicas(), preferenceList.size());
    for (int i = 0; i < replicasi++) {
      String state = .get(i);
      String node = getMinimumNodeForReplica(statenotAssignednodeReplicaCounts);
      newPreferenceList.add(node);
      notAssigned.remove(node);
      Map<StringIntegercounts = nodeReplicaCounts.get(node);
      counts.put(statecounts.get(state) + 1);
    }
    preferenceList.clear();
    preferenceList.addAll(newPreferenceList);
  }

  
Get the node which hosts the fewest of a given replica

Parameters:
state the state
nodes nodes to check
nodeReplicaCounts current assignment of replicas
Returns:
the node most willing to accept the replica
  private String getMinimumNodeForReplica(String stateSet<Stringnodes,
      Map<StringMap<StringInteger>> nodeReplicaCounts) {
    String minimalNode = null;
    int minimalCount = .;
    for (String node : nodes) {
      int count = getReplicaCountForNode(statenodenodeReplicaCounts);
      if (count < minimalCount) {
        minimalCount = count;
        minimalNode = node;
      }
    }
    return minimalNode;
  }

  
Safe check for the number of replicas of a given id assiged to a node

Parameters:
state the state to assign
node the node to check
nodeReplicaCounts a map of node to replica id and counts
Returns:
the number of currently assigned replicas of the given id
  private int getReplicaCountForNode(String stateString node,
      Map<StringMap<StringInteger>> nodeReplicaCounts) {
    if (!nodeReplicaCounts.containsKey(node)) {
      Map<StringIntegerreplicaCounts = new HashMap<StringInteger>();
      replicaCounts.put(state, 0);
      nodeReplicaCounts.put(nodereplicaCounts);
      return 0;
    }
    Map<StringIntegerreplicaCounts = nodeReplicaCounts.get(node);
    if (!replicaCounts.containsKey(state)) {
      replicaCounts.put(state, 0);
      return 0;
    }
    return replicaCounts.get(state);
  }

  
Compute the subset of the current mapping where replicas are not mapped according to their preferred assignment.

Parameters:
currentMapping Current mapping of replicas to nodes
Returns:
The current assignments that do not conform to the preferred assignment
      Map<StringMap<StringString>> currentMapping) {
    Map<ReplicaNodeexistingNonPreferredAssignment = new TreeMap<ReplicaNode>();
    int count = countStateReplicas();
    for (String partition : currentMapping.keySet()) {
      Map<StringStringnodeStateMap = currentMapping.get(partition);
      for (String nodeId : nodeStateMap.keySet()) {
        Node node = .get(nodeId);
        boolean skip = false;
        for (Replica replica : node.preferred) {
          if (replica.partition.equals(partition)) {
            skip = true;
            break;
          }
        }
        if (skip) {
          continue;
        }
        // check if its in one of the preferred position
        for (int replicaId = 0; replicaId < countreplicaId++) {
          Replica replica = new Replica(partitionreplicaId);
          if (.get(replica). != node.id
              && !.containsKey(replica)
              && !existingNonPreferredAssignment.containsKey(replica)) {
            existingNonPreferredAssignment.put(replicanode);
            node.nonPreferred.add(replica);
            break;
          }
        }
      }
    }
    return existingNonPreferredAssignment;
  }

  
Get a live node index to try first for a replica so that each possible start index is roughly uniformly assigned.

Parameters:
replica The replica to assign
Returns:
The starting node index to try
  private int computeRandomStartIndex(final Replica replica) {
    return (replica.hashCode() & 0x7FFFFFFF) % .size();
  }

  
Get a set of replicas not currently assigned to any node

Returns:
Unassigned replicas
  private Set<ReplicacomputeOrphaned() {
    Set<ReplicaorphanedPartitions = new TreeSet<Replica>(.keySet());
      if (orphanedPartitions.contains(r)) {
        orphanedPartitions.remove(r);
      }
    }
      if (orphanedPartitions.contains(r)) {
        orphanedPartitions.remove(r);
      }
    }
    return orphanedPartitions;
  }

  
Determine the replicas already assigned to their preferred nodes

Parameters:
currentMapping Current assignment of replicas to nodes
Returns:
Assignments that conform to the preferred placement
      final Map<StringMap<StringString>> currentMapping) {
    Map<ReplicaNodeexistingPreferredAssignment = new TreeMap<ReplicaNode>();
    int count = countStateReplicas();
    for (String partition : currentMapping.keySet()) {
      Map<StringStringnodeStateMap = currentMapping.get(partition);
      for (String nodeId : nodeStateMap.keySet()) {
        Node node = .get(nodeId);
        node.currentlyAssigned = node.currentlyAssigned + 1;
        // check if its in one of the preferred position
        for (int replicaId = 0; replicaId < countreplicaId++) {
          Replica replica = new Replica(partitionreplicaId);
          if (.containsKey(replica)
              && !existingPreferredAssignment.containsKey(replica)
              && .get(replica). == node.id) {
            existingPreferredAssignment.put(replicanode);
            node.preferred.add(replica);
            break;
          }
        }
      }
    }
    return existingPreferredAssignment;
  }

  
Given a predefined set of all possible nodes, compute an assignment of replicas to nodes that evenly assigns all replicas to nodes.

Parameters:
allNodes Identifiers to all nodes, live and non-live
Returns:
Preferred assignment of replicas
  private Map<ReplicaNodecomputePreferredPlacement(final List<StringallNodes) {
    Map<ReplicaNodepreferredMapping;
    preferredMapping = new HashMap<ReplicaNode>();
    int partitionId = 0;
    int numReplicas = countStateReplicas();
    int count = countStateReplicas();
    for (String partition : ) {
      for (int replicaId = 0; replicaId < countreplicaId++) {
        Replica replica = new Replica(partitionreplicaId);
        String nodeName =
            .getLocation(partitionIdreplicaId.size(), numReplicas,
                allNodes);
        preferredMapping.put(replica.get(nodeName));
      }
      partitionId = partitionId + 1;
    }
    return preferredMapping;
  }

  
Counts the total number of replicas given a state-count mapping

Parameters:
states
Returns:
  private int countStateReplicas() {
    int total = 0;
    for (Integer count : .values()) {
      total += count;
    }
    return total;
  }

  
Compute a map of replica ids to state names

Returns:
Map: replica id -> state name
  private Map<IntegerStringgenerateStateMap() {
    int replicaId = 0;
    Map<IntegerStringstateMap = new HashMap<IntegerString>();
    for (String state : .keySet()) {
      Integer count = .get(state);
      for (int i = 0; i < counti++) {
        stateMap.put(replicaIdstate);
        replicaId++;
      }
    }
    return stateMap;
  }

  
A Node is an entity that can serve replicas. It has a capacity and knowledge of replicas assigned to it, so it can decide if it can receive additional replicas.
  class Node {
    public int currentlyAssigned;
    public int capacity;
    public boolean hasCeilingCapacity;
    private String id;
    boolean isAlive;
    private List<Replicapreferred;
    private List<ReplicanonPreferred;
    private Set<ReplicanewReplicas;
    public Node(String id) {
       = new ArrayList<Replica>();
       = new ArrayList<Replica>();
       = new TreeSet<Replica>();
       = 0;
       = false;
      this. = id;
    }

    
Check if this replica can be legally added to this node

Parameters:
replica The replica to test
Returns:
true if the assignment can be made, false otherwise
    public boolean canAdd(Replica replica) {
      if ( >= ) {
        return false;
      }
      return canAddIfCapacity(replica);
    }

    
Check if this replica can be legally added to this node, provided that it has enough capacity.

Parameters:
replica The replica to test
Returns:
true if the assignment can be made, false otherwise
    public boolean canAddIfCapacity(Replica replica) {
      if (!) {
        return false;
      }
      for (Replica r : ) {
        if (r.partition.equals(replica.partition)) {
          return false;
        }
      }
      for (Replica r : ) {
        if (r.partition.equals(replica.partition)) {
          return false;
        }
      }
      return true;
    }

    
Receive a replica by stealing capacity from another Node

Parameters:
donor The node that has excess capacity
replica The replica to receive
    public void steal(Node donorReplica replica) {
      donor.hasCeilingCapacity = false;
      donor.capacity--;
       = true;
      ++;
      ++;
      .add(replica);
      .add(replica);
    }
    @Override
    public String toString() {
      StringBuilder sb = new StringBuilder();
      sb.append("##########\nname=").append().append("\npreferred:").append(.size())
          .append("\nnonpreferred:").append(.size());
      return sb.toString();
    }
  }

  
A Replica is a combination of a partition of the resource, the state the replica is in and an identifier signifying a specific replica of a given partition and state.
  class Replica implements Comparable<Replica> {
    private String partition;
    private int replicaId// this is a partition-relative id
    private String format;
    public Replica(String partitionint replicaId) {
      this. = partition;
      this. = replicaId;
      this. = this. + "|" + this.;
    }
    @Override
    public String toString() {
      return ;
    }
    @Override
    public boolean equals(Object that) {
      if (that instanceof Replica) {
        return this..equals(((Replicathat).);
      }
      return false;
    }
    @Override
    public int hashCode() {
      return this..hashCode();
    }
    @Override
    public int compareTo(Replica that) {
      if (that instanceof Replica) {
        return this..compareTo(that.format);
      }
      return -1;
    }
  }

  
Interface for providing a custom approach to computing a replica's affinity to a node.
  public interface ReplicaPlacementScheme {
    
Initialize global state

Parameters:
manager The instance to which this placement is associated
    public void init(final HelixManager manager);

    
Given properties of this replica, determine the node it would prefer to be served by

Parameters:
partitionId The current partition
replicaId The current replica with respect to the current partition
numPartitions The total number of partitions
numReplicas The total number of replicas per partition
nodeNames A list of identifiers of all nodes, live and non-live
Returns:
The name of the node that would prefer to serve this replica
    public String getLocation(int partitionIdint replicaIdint numPartitionsint numReplicas,
        final List<StringnodeNames);
  }

  
Compute preferred placements based on a default strategy that assigns replicas to nodes as evenly as possible while avoiding placing two replicas of the same partition on any node.
  public static class DefaultPlacementScheme implements ReplicaPlacementScheme {
    @Override
    public void init(final HelixManager manager) {
      // do nothing since this is independent of the manager
    }
    @Override
    public String getLocation(int partitionIdint replicaIdint numPartitionsint numReplicas,
        final List<StringnodeNames) {
      int index;
      if (nodeNames.size() > numPartitions) {
        // assign replicas in partition order in case there are more nodes than partitions
        index = (partitionId + replicaId * numPartitions) % nodeNames.size();
      } else if (nodeNames.size() == numPartitions) {
        // need a replica offset in case the sizes of these sets are the same
        index =
            ((partitionId + replicaId * numPartitions) % nodeNames.size() + replicaId)
                % nodeNames.size();
      } else {
        // in all other cases, assigning a replica at a time for each partition is reasonable
        index = (partitionId + replicaId) % nodeNames.size();
      }
      return nodeNames.get(index);
    }
  }
New to GrepCode? Check out our FAQ X