Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package org.apache.helix.controller.strategy;
  
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
  *   http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 
 import java.util.List;
 import java.util.Map;
 
 
 public class RUSHMasterSlaveStrategy {
  
Build the config map for RUSH algorithm. The input of RUSH algorithm groups nodes into "cluster"s, and different clusters can be assigned with different weights.

Parameters:
numClusters number of node clusters
instancesPerCluster List of clusters, each contain a list of node name strings.
replicationDegree the replication degree
clusterWeights the weight for each node cluster
Returns:
this config map structure for RUSH algorithm.
 
   static HashMap<StringObjectbuildRushConfig(int numClusters,
       List<List<String>> instancesPerClusterint replicationDegreeList<IntegerclusterWeights) {
     HashMap<StringObjectconfig = new HashMap<StringObject>();
     config.put("replicationDegree"replicationDegree);
 
     HashMap[] clusterList = new HashMap[numClusters];
     config.put("subClusters"clusterList);
 
     HashMap[] nodes;
     HashMap<StringStringnode;
     HashMap<StringObjectclusterData;
     for (int n = 0; n < numClustersn++) {
       int numNodes = instancesPerCluster.get(n).size();
       nodes = new HashMap[numNodes];
       for (int i = 0; i < numNodesi++) {
         node = new HashMap<StringString>();
         node.put("partition"instancesPerCluster.get(n).get(i));
         nodes[i] = node;
       }
       clusterData = new HashMap<StringObject>();
       clusterData.put("weight"clusterWeights.get(n));
       clusterData.put("nodes"nodes);
       clusterList[n] = clusterData;
     }
     return config;
   }

  
Calculate the ideal state for list of instances clusters.

Parameters:
numClusters number of node clusters
instanceClusters List of clusters, each contain a list of node name strings.
instanceClusterWeights the weight for each instance cluster
partitions the partition number of the database
replicas the replication degree
resourceName the name of the database
Returns:
The ZNRecord that contains the ideal state
 
   public static ZNRecord calculateIdealState(List<List<String>> instanceClusters,
       List<IntegerinstanceClusterWeightsint partitionsint replicasString resourceName)
       throws Exception {
     ZNRecord result = new ZNRecord(resourceName);
 
     int numberOfClusters = instanceClusters.size();
     List<List<String>> nodesInClusters = instanceClusters;
     List<IntegerclusterWeights = instanceClusterWeights;
 
     HashMap<StringObjectrushConfig =
        buildRushConfig(numberOfClustersnodesInClustersreplicas + 1, clusterWeights);
    RUSHrHash rushHash = new RUSHrHash(rushConfig);
    Random r = new Random(0);
    for (int i = 0; i < partitionsi++) {
      int partitionId = i;
      String partitionName = resourceName + ".partition-" + partitionId;
      ArrayList<HashMappartitionAssignmentResult = rushHash.findNode(i);
      List<StringnodeNames = new ArrayList<String>();
      for (HashMap<?, ?> p : partitionAssignmentResult) {
        for (Object key : p.keySet()) {
          if (p.get(keyinstanceof String) {
            nodeNames.add(p.get(key).toString());
          }
        }
      }
      Map<StringStringpartitionAssignment = new TreeMap<StringString>();
      for (int j = 0; j < nodeNames.size(); j++) {
        partitionAssignment.put(nodeNames.get(j), "SLAVE");
      }
      int master = r.nextInt(nodeNames.size());
      // master = nodeNames.size()/2;
      partitionAssignment.put(nodeNames.get(master), "MASTER");
      result.setMapField(partitionNamepartitionAssignment);
    }
    result.setSimpleField(..toString(), String.valueOf(partitions));
    return result;
  }
  public static ZNRecord calculateIdealState(List<StringinstanceClusters,
      int instanceClusterWeightint partitionsint replicasString resourceName)
      throws Exception {
    List<List<String>> instanceClustersList = new ArrayList<List<String>>();
    instanceClustersList.add(instanceClusters);
    List<IntegerinstanceClusterWeightList = new ArrayList<Integer>();
    instanceClusterWeightList.add(instanceClusterWeight);
    return calculateIdealState(instanceClustersListinstanceClusterWeightListpartitions,
        replicasresourceName);
  }

  
Helper function to see how many partitions are mapped to different instances in two ideal states
  public static void printDiff(ZNRecord record1ZNRecord record2) {
    int diffCount = 0;
    int diffCountMaster = 0;
    for (String key : record1.getMapFields().keySet()) {
      Map<StringStringmap1 = record1.getMapField(key);
      Map<StringStringmap2 = record2.getMapField(key);
      for (String k : map1.keySet()) {
        if (!map2.containsKey(k)) {
          diffCount++;
        } else if (!map1.get(k).equalsIgnoreCase(map2.get(k))) {
          diffCountMaster++;
        }
      }
    }
    ..println("\ndiff count = " + diffCount);
    ..println("\nmaster diff count:" + diffCountMaster);
  }

  
Helper function to calculate and print the standard deviation of the partition assignment ideal state.
  public static void printIdealStateStats(ZNRecord record) {
    Map<StringIntegercountsMap = new TreeMap<StringInteger>();
    Map<StringIntegermasterCountsMap = new TreeMap<StringInteger>();
    for (String key : record.getMapFields().keySet()) {
      Map<StringStringmap1 = record.getMapField(key);
      for (String k : map1.keySet()) {
        if (!countsMap.containsKey(k)) {
          countsMap.put(knew Integer(0));
        } else {
          countsMap.put(kcountsMap.get(k).intValue() + 1);
        }
        if (!masterCountsMap.containsKey(k)) {
          masterCountsMap.put(knew Integer(0));
        } else if (map1.get(k).equalsIgnoreCase("MASTER")) {
          masterCountsMap.put(kmasterCountsMap.get(k).intValue() + 1);
        }
      }
    }
    double sum = 0;
    int maxCount = 0;
    int minCount = .;
    for (String k : countsMap.keySet()) {
      int count = countsMap.get(k);
      sum += count;
      if (maxCount < count) {
        maxCount = count;
      }
      if (minCount > count) {
        minCount = count;
      }
      ..print(count + " ");
    }
    ..println("\nMax count: " + maxCount + " min count:" + minCount);
    ..println("\n master:");
    double sumMaster = 0;
    int maxCountMaster = 0;
    int minCountMaster = .;
    for (String k : masterCountsMap.keySet()) {
      int count = masterCountsMap.get(k);
      sumMaster += count;
      if (maxCountMaster < count) {
        maxCountMaster = count;
      }
      if (minCountMaster > count) {
        minCountMaster = count;
      }
      ..print(count + " ");
    }
    ..println("\nMean master: " + 1.0 * sumMaster / countsMap.size());
    ..println("Max master count: " + maxCountMaster + " min count:" + minCountMaster);
    double mean = sum / (countsMap.size());
    // calculate the deviation of the node distribution
    double deviation = 0;
    for (String k : countsMap.keySet()) {
      double count = countsMap.get(k);
      deviation += (count - mean) * (count - mean);
    }
    ..println("Mean: " + mean + " normal deviation:"
        + Math.sqrt(deviation / countsMap.size()) / mean);
    // System.out.println("Max count: " + maxCount + " min count:" + minCount);
    int steps = 10;
    int stepLen = (maxCount - minCount) / steps;
    if (stepLen == 0)
      return;
    List<Integerhistogram = new ArrayList<Integer>((maxCount - minCount) / stepLen + 1);
    for (int i = 0; i < (maxCount - minCount) / stepLen + 1; i++) {
      histogram.add(0);
    }
    for (String k : countsMap.keySet()) {
      int count = countsMap.get(k);
      int stepNo = (count - minCount) / stepLen;
      histogram.set(stepNohistogram.get(stepNo) + 1);
    }
    ..println("histogram:");
    for (Integer x : histogram) {
      ..print(x + " ");
    }
  }
  public static void main(String args[]) throws Exception {
    int partitions = 4096, replicas = 2;
    String resourceName = "espressoDB1";
    List<StringinstanceNames = new ArrayList<String>();
    List<List<String>> instanceCluster1 = new ArrayList<List<String>>();
    for (int i = 0; i < 20; i++) {
      instanceNames.add("local" + i + "host_123" + i);
    }
    instanceCluster1.add(instanceNames);
    List<Integerweights1 = new ArrayList<Integer>();
    weights1.add(1);
    ZNRecord result =
        RUSHMasterSlaveStrategy.calculateIdealState(instanceCluster1weights1partitions,
            replicasresourceName);
    printIdealStateStats(result);
    List<StringinstanceNames2 = new ArrayList<String>();
    for (int i = 400; i < 405; i++) {
      instanceNames2.add("localhost_123" + i);
    }
    instanceCluster1.add(instanceNames2);
    weights1.add(1);
    ZNRecord result2 =
        RUSHMasterSlaveStrategy.calculateIdealState(instanceCluster1weights1partitions,
            replicasresourceName);
    printDiff(resultresult2);
    printIdealStateStats(result2);
  }
New to GrepCode? Check out our FAQ X