Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package org.apache.helix.controller.stages;
  
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
  *   http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 
 import java.util.Map;
 import java.util.Set;
 
 
For partition compute best possible (instance,state) pair based on IdealState,StateModel,LiveInstance
 
 public class BestPossibleStateCalcStage extends AbstractBaseStage {
   private static final Logger LOG = Logger.getLogger(BestPossibleStateCalcStage.class.getName());
 
   @Override
   public void process(ClusterEvent eventthrows Exception {
     long startTime = System.currentTimeMillis();
     if (.isInfoEnabled()) {
       .info("START BestPossibleStateCalcStage.process()");
     }
 
     ResourceCurrentState currentStateOutput =
     Map<ResourceIdResourceConfigresourceMap =
         event.getAttribute(..toString());
     Cluster cluster = event.getAttribute("ClusterDataCache");
 
     if (currentStateOutput == null || resourceMap == null || cluster == null) {
       throw new StageException("Missing attributes in event:" + event
           + ". Requires CURRENT_STATE|RESOURCES|DataCache");
     }
 
     BestPossibleStateOutput bestPossibleStateOutput =
         compute(clustereventresourceMapcurrentStateOutput);
     event.addAttribute(..toString(), bestPossibleStateOutput);
 
     long endTime = System.currentTimeMillis();
     if (.isInfoEnabled()) {
       .info("END BestPossibleStateCalcStage.process(). took: " + (endTime - startTime) + " ms");
     }
   }

  
Fallback for cases when the resource has been dropped, but current state exists

Parameters:
cluster cluster snapshot
resourceId the resource for which to generate an assignment
currentStateOutput full snapshot of the current state
stateModelDef state model the resource follows
Returns:
assignment for the dropped resource
 
   private ResourceAssignment mapDroppedResource(Cluster clusterResourceId resourceId,
       ResourceCurrentState currentStateOutputStateModelDefinition stateModelDef) {
     ResourceAssignment partitionMapping = new ResourceAssignment(resourceId);
     Set<PartitionIdmappedPartitions =
         currentStateOutput.getCurrentStateMappedPartitions(resourceId);
     if (mappedPartitions == null) {
       return partitionMapping;
     }
     for (PartitionId partitionId : mappedPartitions) {
       Set<ParticipantIddisabledParticipantsForPartition =
          ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
              partitionId);
      Map<StateStringupperBounds =
          ConstraintBasedAssignment
              .stateConstraints(stateModelDefresourceIdcluster.getConfig());
      partitionMapping.addReplicaMap(partitionId, ConstraintBasedAssignment
          .computeAutoBestStateForPartition(upperBoundscluster.getLiveParticipantMap().keySet(),
              stateModelDefnullcurrentStateOutput.getCurrentStateMap(resourceIdpartitionId),
              disabledParticipantsForPartition));
    }
    return partitionMapping;
  }

  
Update a ResourceAssignment with dropped and disabled participants for partitions

Parameters:
cluster cluster snapshot
resourceAssignment current resource assignment
currentStateOutput aggregated current state
stateModelDef state model definition for the resource
  private void mapDroppedAndDisabledPartitions(Cluster cluster,
      ResourceAssignment resourceAssignmentResourceCurrentState currentStateOutput,
      StateModelDefinition stateModelDef) {
    // get the total partition set: mapped and current state
    ResourceId resourceId = resourceAssignment.getResourceId();
    Set<PartitionIdmappedPartitions = Sets.newHashSet();
    mappedPartitions.addAll(currentStateOutput.getCurrentStateMappedPartitions(resourceId));
    mappedPartitions.addAll(resourceAssignment.getMappedPartitionIds());
    for (PartitionId partitionId : mappedPartitions) {
      // for each partition, get the dropped and disabled mappings
      Set<ParticipantIddisabledParticipants =
          ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
              partitionId);
      // get the error participants
      Map<ParticipantIdStatecurrentStateMap =
          currentStateOutput.getCurrentStateMap(resourceIdpartitionId);
      Set<ParticipantIderrorParticipants = Sets.newHashSet();
      for (ParticipantId participantId : currentStateMap.keySet()) {
        State state = currentStateMap.get(participantId);
        if (state.equals(State.from(.))) {
          errorParticipants.add(participantId);
        }
      }
      // get the dropped and disabled map
      State initialState = stateModelDef.getTypedInitialState();
      Map<ParticipantIdStateparticipantStateMap = resourceAssignment.getReplicaMap(partitionId);
      Set<ParticipantIdparticipants = participantStateMap.keySet();
      Map<ParticipantIdStatedroppedAndDisabledMap =
          ConstraintBasedAssignment.dropAndDisablePartitions(currentStateMapparticipants,
              disabledParticipantsinitialState);
      // don't map error participants
      for (ParticipantId participantId : errorParticipants) {
        droppedAndDisabledMap.remove(participantId);
      }
      // save the mappings, overwriting as necessary
      participantStateMap.putAll(droppedAndDisabledMap);
      // include this add step in case the resource assignment did not already map this partition
      resourceAssignment.addReplicaMap(partitionIdparticipantStateMap);
    }
  }
  private BestPossibleStateOutput compute(Cluster clusterClusterEvent event,
      Map<ResourceIdResourceConfigresourceMapResourceCurrentState currentStateOutput) {
    Map<StateModelDefIdStateModelDefinitionstateModelDefs = cluster.getStateModelMap();
    for (ResourceId resourceId : resourceMap.keySet()) {
      if (.isDebugEnabled()) {
        .debug("Processing resource:" + resourceId);
      }
      ResourceConfig resourceConfig = resourceMap.get(resourceId);
      RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
      RebalancerContext context = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
      StateModelDefinition stateModelDef = stateModelDefs.get(context.getStateModelDefId());
      ResourceAssignment resourceAssignment = null;
      if (rebalancerConfig != null) {
        HelixRebalancer rebalancer = rebalancerConfig.getRebalancer();
        HelixManager manager = event.getAttribute("helixmanager");
        if (rebalancer == null) {
          rebalancer = new FallbackRebalancer();
        }
        rebalancer.init(manager);
        resourceAssignment =
            rebalancer.computeResourceMapping(rebalancerConfigclustercurrentStateOutput);
      }
      if (resourceAssignment == null) {
        resourceAssignment =
            mapDroppedResource(clusterresourceIdcurrentStateOutputstateModelDef);
      } else {
        mapDroppedAndDisabledPartitions(clusterresourceAssignmentcurrentStateOutput,
            stateModelDef);
      }
      output.setResourceAssignment(resourceIdresourceAssignment);
    }
    return output;
  }
New to GrepCode? Check out our FAQ X