Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package org.apache.helix.controller;
  
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
  *   http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 
 import java.util.List;
 import java.util.Map;
 
Cluster Controllers main goal is to keep the cluster state as close as possible to Ideal State. It does this by listening to changes in cluster state and scheduling new tasks to get cluster state to best possible ideal state. Every instance of this class can control can control only one cluster Get all the partitions use IdealState, CurrentState and Messages
foreach partition
1. get the (instance,state) from IdealState, CurrentState and PendingMessages
2. compute best possible state (instance,state) pair. This needs previous step data and state model constraints
3. compute the messages/tasks needed to move to 1 to 2
4. select the messages that can be sent, needs messages and state model constraints
5. send messages
 
   private static final Logger logger = Logger.getLogger(GenericHelixController.class.getName());
   volatile boolean init = false;
   private final PipelineRegistry _registry;
 
 
The _paused flag is checked by function handleEvent(), while if the flag is set handleEvent() will be no-op. Other event handling logic keeps the same when the flag is set.
  private boolean _paused;

  
The timer that can periodically run the rebalancing pipeline. The timer will start if there is one resource group has the config to use the timer.
  Timer _rebalanceTimer = null;
Default constructor that creates a default pipeline registry. This is sufficient in most cases, but if there is a some thing specific needed use another constructor where in you can pass a pipeline registry
  public GenericHelixController() {
    this(createDefaultRegistry());
  }
  class RebalanceTask extends TimerTask {
    public RebalanceTask(HelixManager manager) {
       = manager;
    }
    @Override
    public void run() {
      NotificationContext changeContext = new NotificationContext();
      changeContext.setType(..);
      ClusterEvent event = new ClusterEvent("periodicalRebalance");
      event.addAttribute("helixmanager"changeContext.getManager());
      event.addAttribute("changeContext"changeContext);
      List<ZNRecorddummy = new ArrayList<ZNRecord>();
      event.addAttribute("eventData"dummy);
      // Should be able to process
      handleEvent(event);
    }
  }
  // TODO who should stop this timer
  
Starts the rebalancing timer with the specified period. Start the timer if necessary; If the period is smaller than the current period, cancel the current timer and use the new period.
  void startRebalancingTimer(int periodHelixManager manager) {
    .info("Controller starting timer at period " + period);
    if (period < ) {
      if ( != null) {
        .cancel();
      }
       = new Timer(true);
       = period;
    } else {
      .info("Controller already has timer at period " + );
    }
  }

  
Starts the rebalancing timer
    if ( != null) {
      .cancel();
       = null;
    }
  }
  private static PipelineRegistry createDefaultRegistry() {
    .info("createDefaultRegistry");
    synchronized (GenericHelixController.class) {
      PipelineRegistry registry = new PipelineRegistry();
      // cluster data cache refresh
      Pipeline dataRefresh = new Pipeline();
      dataRefresh.addStage(new ReadClusterDataStage());
      // rebalance pipeline
      Pipeline rebalancePipeline = new Pipeline();
      rebalancePipeline.addStage(new CompatibilityCheckStage());
      rebalancePipeline.addStage(new ResourceComputationStage());
      rebalancePipeline.addStage(new CurrentStateComputationStage());
      rebalancePipeline.addStage(new BestPossibleStateCalcStage());
      rebalancePipeline.addStage(new PersistAssignmentStage());
      rebalancePipeline.addStage(new MessageGenerationStage());
      rebalancePipeline.addStage(new MessageSelectionStage());
      rebalancePipeline.addStage(new MessageThrottleStage());
      rebalancePipeline.addStage(new TaskAssignmentStage());
      // external view generation
      Pipeline externalViewPipeline = new Pipeline();
      externalViewPipeline.addStage(new ExternalViewComputeStage());
      registry.register("idealStateChange"dataRefreshrebalancePipeline);
      registry.register("currentStateChange"dataRefreshrebalancePipelineexternalViewPipeline);
      registry.register("configChange"dataRefreshrebalancePipeline);
      registry.register("liveInstanceChange"dataRefreshrebalancePipelineexternalViewPipeline);
      registry.register("messageChange"dataRefreshrebalancePipeline);
      registry.register("externalView"dataRefresh);
      registry.register("resume"dataRefreshrebalancePipelineexternalViewPipeline);
      registry
          .register("periodicalRebalance"dataRefreshrebalancePipelineexternalViewPipeline);
      return registry;
    }
  }
     = false;
     = registry;
  }

  
lock-always: caller always needs to obtain an external lock before call, calls to handleEvent() should be serialized

Parameters:
event
  protected synchronized void handleEvent(ClusterEvent event) {
    HelixManager manager = event.getAttribute("helixmanager");
    if (manager == null) {
      .error("No cluster manager in event:" + event.getName());
      return;
    }
    if (!manager.isLeader()) {
      .error("Cluster manager: " + manager.getInstanceName()
          + " is not leader. Pipeline will not be invoked");
      return;
    }
    if () {
      .info("Cluster is paused. Ignoring the event:" + event.getName());
      return;
    }
    NotificationContext context = null;
    if (event.getAttribute("changeContext") != null) {
      context = (NotificationContext) (event.getAttribute("changeContext"));
    }
    // Initialize _clusterStatusMonitor
    if (context != null) {
      if (context.getType() == .) {
        if ( != null) {
          .reset();
           = null;
        }
        stopRebalancingTimer();
        .info("Get FINALIZE notification, skip the pipeline. Event :" + event.getName());
        return;
      } else {
        if ( == null) {
           = new ClusterStatusMonitor(manager.getClusterName());
        }
        event.addAttribute("clusterStatusMonitor");
      }
    }
    List<Pipelinepipelines = .getPipelinesForEvent(event.getName());
    if (pipelines == null || pipelines.size() == 0) {
      .info("No pipeline to run for event:" + event.getName());
      return;
    }
    for (Pipeline pipeline : pipelines) {
      try {
        pipeline.handle(event);
        pipeline.finish();
      } catch (Exception e) {
        .error("Exception while executing pipeline: " + pipeline
            + ". Will not continue to next pipeline"e);
        break;
      }
    }
  }
  // TODO since we read data in pipeline, we can get rid of reading from zookeeper in
  // callback
  public void onExternalViewChange(List<ExternalViewexternalViewList,
      NotificationContext changeContext) {
    // logger.info("START: GenericClusterController.onExternalViewChange()");
    // ClusterEvent event = new ClusterEvent("externalViewChange");
    // event.addAttribute("helixmanager", changeContext.getManager());
    // event.addAttribute("changeContext", changeContext);
    // event.addAttribute("eventData", externalViewList);
    // // handleEvent(event);
    // logger.info("END: GenericClusterController.onExternalViewChange()");
  }
  public void onStateChange(String instanceNameList<CurrentStatestatesInfo,
      NotificationContext changeContext) {
    .info("START: GenericClusterController.onStateChange()");
    ClusterEvent event = new ClusterEvent("currentStateChange");
    event.addAttribute("helixmanager"changeContext.getManager());
    event.addAttribute("instanceName"instanceName);
    event.addAttribute("changeContext"changeContext);
    event.addAttribute("eventData"statesInfo);
    handleEvent(event);
    .info("END: GenericClusterController.onStateChange()");
  }
  public void onHealthChange(String instanceNameList<HealthStatreports,
      NotificationContext changeContext) {
    
When there are more participant ( > 20, can be in hundreds), This callback can be called quite frequently as each participant reports health stat every minute. Thus we change the health check pipeline to run in a timer callback.
  }
  public void onMessage(String instanceNameList<Messagemessages,
      NotificationContext changeContext) {
    .info("START: GenericClusterController.onMessage()");
    ClusterEvent event = new ClusterEvent("messageChange");
    event.addAttribute("helixmanager"changeContext.getManager());
    event.addAttribute("instanceName"instanceName);
    event.addAttribute("changeContext"changeContext);
    event.addAttribute("eventData"messages);
    handleEvent(event);
    if ( != null && messages != null) {
      .addMessageQueueSize(instanceNamemessages.size());
    }
    .info("END: GenericClusterController.onMessage()");
  }
  public void onLiveInstanceChange(List<LiveInstanceliveInstances,
      NotificationContext changeContext) {
    .info("START: Generic GenericClusterController.onLiveInstanceChange()");
    if (liveInstances == null) {
      liveInstances = Collections.emptyList();
    }
    // Go though the live instance list and make sure that we are observing them
    // accordingly. The action is done regardless of the paused flag.
    if (changeContext.getType() == ..
        || changeContext.getType() == ..) {
      checkLiveInstancesObservation(liveInstanceschangeContext);
    } else if (changeContext.getType() == ..) {
      // on finalize, should remove all message/current-state listeners
      .info("remove message/current-state listeners. lastSeenInstances: "
          +  + ", lastSeenSessions: " + );
      liveInstances = Collections.emptyList();
      checkLiveInstancesObservation(liveInstanceschangeContext);
    }
    ClusterEvent event = new ClusterEvent("liveInstanceChange");
    event.addAttribute("helixmanager"changeContext.getManager());
    event.addAttribute("changeContext"changeContext);
    event.addAttribute("eventData"liveInstances);
    handleEvent(event);
    .info("END: Generic GenericClusterController.onLiveInstanceChange()");
  }
  void checkRebalancingTimer(HelixManager managerList<IdealStateidealStates) {
    if (manager.getConfigAccessor() == null) {
      .warn(manager.getInstanceName()
          + " config accessor doesn't exist. should be in file-based mode.");
      return;
    }
    for (IdealState idealState : idealStates) {
      int period = idealState.getRebalanceTimerPeriod();
      if (period > 0) {
        startRebalancingTimer(periodmanager);
      }
    }
  }
  public void onIdealStateChange(List<IdealStateidealStatesNotificationContext changeContext) {
    .info("START: Generic GenericClusterController.onIdealStateChange()");
    ClusterEvent event = new ClusterEvent("idealStateChange");
    event.addAttribute("helixmanager"changeContext.getManager());
    event.addAttribute("changeContext"changeContext);
    event.addAttribute("eventData"idealStates);
    handleEvent(event);
    if (changeContext.getType() != .) {
      checkRebalancingTimer(changeContext.getManager(), idealStates);
    }
    .info("END: Generic GenericClusterController.onIdealStateChange()");
  }
  public void onConfigChange(List<InstanceConfigconfigsNotificationContext changeContext) {
    .info("START: GenericClusterController.onConfigChange()");
    ClusterEvent event = new ClusterEvent("configChange");
    event.addAttribute("changeContext"changeContext);
    event.addAttribute("helixmanager"changeContext.getManager());
    event.addAttribute("eventData"configs);
    handleEvent(event);
    .info("END: GenericClusterController.onConfigChange()");
  }
  public void onControllerChange(NotificationContext changeContext) {
    .info("START: GenericClusterController.onControllerChange()");
    if (changeContext != null && changeContext.getType() == .) {
      .info("GenericClusterController.onControllerChange() FINALIZE");
      return;
    }
    HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
    // double check if this controller is the leader
    Builder keyBuilder = accessor.keyBuilder();
    LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
    if (leader == null) {
      
          .warn("No controller exists for cluster:" + changeContext.getManager().getClusterName());
      return;
    } else {
      String leaderName = leader.getInstanceName();
      String instanceName = changeContext.getManager().getInstanceName();
      if (leaderName == null || !leaderName.equals(instanceName)) {
        .warn("leader name does NOT match, my name: " + instanceName + ", leader: " + leader);
        return;
      }
    }
    PauseSignal pauseSignal = accessor.getProperty(keyBuilder.pause());
    if (pauseSignal != null) {
       = true;
      .info("controller is now paused");
    } else {
      if () {
        // it currently paused
        .info("controller is now resumed");
         = false;
        ClusterEvent event = new ClusterEvent("resume");
        event.addAttribute("changeContext"changeContext);
        event.addAttribute("helixmanager"changeContext.getManager());
        event.addAttribute("eventData"pauseSignal);
        handleEvent(event);
      } else {
         = false;
      }
    }
    .info("END: GenericClusterController.onControllerChange()");
  }

  
Go through the list of liveinstances in the cluster, and add currentstateChange listener and Message listeners to them if they are newly added. For current state change, the observation is tied to the session id of each live instance.
  protected void checkLiveInstancesObservation(List<LiveInstanceliveInstances,
      NotificationContext changeContext) {
    // construct maps for current live-instances
    Map<StringLiveInstancecurInstances = new HashMap<StringLiveInstance>();
    Map<StringLiveInstancecurSessions = new HashMap<StringLiveInstance>();
    for (LiveInstance liveInstance : liveInstances) {
      curInstances.put(liveInstance.getInstanceName(), liveInstance);
      curSessions.put(liveInstance.getTypedSessionId().stringify(), liveInstance);
    }
    Map<StringLiveInstancelastInstances = .get();
    Map<StringLiveInstancelastSessions = .get();
    HelixManager manager = changeContext.getManager();
    Builder keyBuilder = new Builder(manager.getClusterName());
    if (lastSessions != null) {
      for (String session : lastSessions.keySet()) {
        if (!curSessions.containsKey(session)) {
          // remove current-state listener for expired session
          String instanceName = lastSessions.get(session).getInstanceName();
          manager.removeListener(keyBuilder.currentStates(instanceNamesession), this);
        }
      }
    }
    if (lastInstances != null) {
      for (String instance : lastInstances.keySet()) {
        if (!curInstances.containsKey(instance)) {
          // remove message listener for disconnected instances
          manager.removeListener(keyBuilder.messages(instance), this);
        }
      }
    }
    for (String session : curSessions.keySet()) {
      if (lastSessions == null || !lastSessions.containsKey(session)) {
        String instanceName = curSessions.get(session).getInstanceName();
        try {
          // add current-state listeners for new sessions
          manager.addCurrentStateChangeListener(thisinstanceNamesession);
          .info(manager.getInstanceName() + " added current-state listener for instance: "
              + instanceName + ", session: " + session + ", listener: " + this);
        } catch (Exception e) {
          .error("Fail to add current state listener for instance: " + instanceName
              + " with session: " + sessione);
        }
      }
    }
    for (String instance : curInstances.keySet()) {
      if (lastInstances == null || !lastInstances.containsKey(instance)) {
        try {
          // add message listeners for new instances
          manager.addMessageListener(thisinstance);
          .info(manager.getInstanceName() + " added message listener for " + instance
              + ", listener: " + this);
        } catch (Exception e) {
          .error("Fail to add message listener for instance: " + instancee);
        }
      }
    }
    // update last-seen
    .set(curInstances);
    .set(curSessions);
  }
New to GrepCode? Check out our FAQ X