Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package org.apache.helix.manager.zk;
  
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
  *   http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 
 import java.util.List;
 
 
 public class ZKHelixManager implements HelixManagerIZkStateListener {
   private static Logger LOG = Logger.getLogger(ZKHelixManager.class);
 
   public static final int FLAPPING_TIME_WINDIOW = 300000; // Default to 300 sec
   public static final int MAX_DISCONNECT_THRESHOLD = 5;
   public static final String ALLOW_PARTICIPANT_AUTO_JOIN = "allowParticipantAutoJoin";
 
   protected final String _zkAddress;
   private final String _clusterName;
   private final String _instanceName;
   private final InstanceType _instanceType;
   private final int _sessionTimeout;
   protected final List<CallbackHandler_handlers;
   private final HelixManagerProperties _properties;

  
helix version#
 
   private final String _version;
 
  protected ZkClient _zkclient = null;
  private final Builder _keyBuilder;
  private volatile String _sessionId;

  
Keep track of timestamps that zk State has become Disconnected If in a _timeWindowLengthMs window zk State has become Disconnected for more than_maxDisconnectThreshold times disconnect the zkHelixManager
  private final List<Long_disconnectTimeHistory = new ArrayList<Long>();
  private final int _flappingTimeWindowMs;
  private final int _maxDisconnectThreshold;

  
participant fields
  private final List<HelixTimerTask_timerTasks = new ArrayList<HelixTimerTask>();

  
controller fields
  protected final List<HelixTimerTask_controllerTimerTasks = new ArrayList<HelixTimerTask>();

  
status dump timer-task
  static class StatusDumpTask extends HelixTimerTask {
    Timer _timer = null;
    final ZkClient zkclient;
    public StatusDumpTask(ZkClient zkclientHelixManager helixController) {
      this. = zkclient;
      this. = helixController;
    }
    @Override
    public void start() {
      long initialDelay = 30 * 60 * 1000;
      long period = 120 * 60 * 1000;
      int timeThresholdNoChange = 180 * 60 * 1000;
      if ( == null) {
        .info("Start StatusDumpTask");
         = new Timer("StatusDumpTimerTask"true);
            timeThresholdNoChange), initialDelayperiod);
      }
    }
    @Override
    public void stop() {
      if ( != null) {
        .info("Stop StatusDumpTask");
        .cancel();
         = null;
      }
    }
  }
  public ZKHelixManager(String clusterNameString instanceNameInstanceType instanceType,
      String zkAddress) {
    .info("Create a zk-based cluster manager. zkSvr: " + zkAddress + ", clusterName: "
        + clusterName + ", instanceName: " + instanceName + ", type: " + instanceType);
     = zkAddress;
     = clusterName;
     = instanceType;
    if (instanceName == null) {
      try {
        instanceName =
            InetAddress.getLocalHost().getCanonicalHostName() + "-" + instanceType.toString();
      } catch (UnknownHostException e) {
        // can ignore it
        .info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable"e);
        instanceName = "UNKNOWN";
      }
    }
     = instanceName;
     = new HelixManagerProperties("cluster-manager-version.properties");
     = new Builder(clusterName);
     = new DefaultMessagingService(this);

    
use system property if available
        getSystemPropertyAsInt("helixmanager.flappingTimeWindow",
            .);
        getSystemPropertyAsInt("helixmanager.maxDisconnectThreshold",
        getSystemPropertyAsInt("zk.session.timeout".);

    
instance type specific init
    switch (instanceType) {
    case :
          new ParticipantHealthReportCollectorImpl(this);
      break;
    case :
       = null;
      break;
          new ParticipantHealthReportCollectorImpl(this);
      break;
    case :
    case :
       = null;
      break;
    default:
      throw new IllegalArgumentException("unrecognized type: " + instanceType);
    }
  }
  private int getSystemPropertyAsInt(String propertyKeyint propertyDefaultValue) {
    String valueString = System.getProperty(propertyKey"" + propertyDefaultValue);
    try {
      int value = Integer.parseInt(valueString);
      if (value > 0) {
        return value;
      }
    } catch (NumberFormatException e) {
      .warn("Exception while parsing property: " + propertyKey + ", string: " + valueString
          + ", using default value: " + propertyDefaultValue);
    }
    return propertyDefaultValue;
  }
  public boolean removeListener(PropertyKey keyObject listener) {
    .info("Removing listener: " + listener + " on path: " + key.getPath() + " from cluster: "
        +  + " by instance: " + );
    synchronized (this) {
      List<CallbackHandlertoRemove = new ArrayList<CallbackHandler>();
      for (CallbackHandler handler : ) {
        // compare property-key path and listener reference
        if (handler.getPath().equals(key.getPath()) && handler.getListener().equals(listener)) {
          toRemove.add(handler);
        }
      }
      .removeAll(toRemove);
      // handler.reset() may modify the handlers list, so do it outside the iteration
      for (CallbackHandler handler : toRemove) {
        handler.reset();
      }
    }
    return true;
  }
  void checkConnected() {
    if (!isConnected()) {
      throw new HelixException("HelixManager is not connected. Call HelixManager#connect()");
    }
  }
  void addListener(Object listenerPropertyKey propertyKeyChangeType changeType,
      EventType[] eventType) {
    PropertyType type = propertyKey.getType();
    synchronized (this) {
      for (CallbackHandler handler : ) {
        // compare property-key path and listener reference
        if (handler.getPath().equals(propertyKey.getPath())
            && handler.getListener().equals(listener)) {
          .info("Listener: " + listener + " on path: " + propertyKey.getPath()
              + " already exists. skip add");
          return;
        }
      }
      CallbackHandler newHandler =
          new CallbackHandler(thispropertyKeylistenereventTypechangeType);
      .add(newHandler);
      .info("Added listener: " + listener + " for type: " + type + " to path: "
          + newHandler.getPath());
    }
  }
  public void addIdealStateChangeListener(final IdealStateChangeListener listenerthrows Exception {
        new EventType[] {
        });
  }
        new EventType[] {
            .
        });
  }
  public void addConfigChangeListener(ConfigChangeListener listenerthrows Exception {
        new EventType[] {
          .
        });
  }
      throws Exception {
        new EventType[] {
          .
        });
  }
      throws Exception {
    Builder keyBuilder = new Builder();
    PropertyKey propertyKey = null;
    switch (scope) {
    case :
      propertyKey = keyBuilder.clusterConfigs();
      break;
    case :
      propertyKey = keyBuilder.instanceConfigs();
      break;
    case :
      propertyKey = keyBuilder.resourceConfigs();
      break;
    default:
      break;
    }
    if (propertyKey != null) {
      addListener(listenerpropertyKey.new EventType[] {
      });
    } else {
      .error("Can't add listener to config scope: " + scope);
    }
  }
  // TODO: Decide if do we still need this since we are exposing
  // ClusterMessagingService
  public void addMessageListener(MessageListener listenerString instanceName) {
    addListener(listenernew Builder().messages(instanceName), .,
        new EventType[] {
        });
  }
  public void addControllerMessageListener(MessageListener listener) {
        .new EventType[] {
        });
  }
      String instanceNameString sessionIdthrows Exception {
    addListener(listenernew Builder().currentStates(instanceNamesessionId),
        .new EventType[] {
        });
  }
  public void addHealthStateChangeListener(HealthStateChangeListener listenerString instanceName)
      throws Exception {
    addListener(listenernew Builder().healthReports(instanceName), .,
        new EventType[] {
        });
  }
        new EventType[] {
        });
  }
  public void addControllerListener(ControllerChangeListener listener) {
        new EventType[] {
        });
  }
    return ;
  }
    return ;
  }
  public String getClusterName() {
    return ;
  }
  public String getInstanceName() {
    return ;
  }
    ZkBaseDataAccessor<ZNRecordbaseDataAccessor = new ZkBaseDataAccessor<ZNRecord>();
    return baseDataAccessor;
  }
  void createClient() throws Exception {
    PathBasedZkSerializer zkSerializer =
        ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer()).build();
     =
    int retryCount = 0;
    while (retryCount < 3) {
      try {
        handleNewSession();
        break;
      } catch (HelixException e) {
        .error("fail to createClient."e);
        throw e;
      } catch (Exception e) {
        retryCount++;
        .error("fail to createClient. retry " + retryCounte);
        if (retryCount == 3) {
          throw e;
        }
      }
    }
  }
  public void connect() throws Exception {
    .info("ClusterManager.connect()");
    if (isConnected()) {
      .warn("Cluster manager: " +  + " for cluster: " + 
          + " already connected. skip connect");
      return;
    }
    try {
      createClient();
    } catch (Exception e) {
      .error("fail to connect " + e);
      disconnect();
      throw e;
    }
  }
  public void disconnect() {
    if ( == null) {
      .info("instanceName: " +  + " already disconnected");
      return;
    }
    .info("disconnect " +  + "(" +  + ") from " + );
    try {
      
stop all timer tasks
      stopTimerTasks();

      
shutdown thread pool first to avoid reset() being invoked in the middle of state transition
      // TODO reset user defined handlers only
      resetHandlers();
      .shutdown();
      if ( != null) {
        .reset();
      }
    } finally {
      .close();
       = null;
      .info("Cluster manager: " +  + " disconnected");
    }
  }
  public String getSessionId() {
    return ;
  }
  public boolean isConnected() {
    if ( == null) {
      return false;
    }
    ZkConnection zkconnection = (ZkConnection.getConnection();
    if (zkconnection != null) {
      States state = zkconnection.getZookeeperState();
      return state == .;
    }
    return false;
  }
  public long getLastNotificationTime() {
    return 0;
  }
  public void addPreConnectCallback(PreConnectCallback callback) {
    .info("Adding preconnect callback: " + callback);
    .add(callback);
  }
  public boolean isLeader() {
      return false;
    }
    if (!isConnected()) {
      return false;
    }
    try {
      if (leader != null) {
        String leaderName = leader.getInstanceName();
        String sessionId = leader.getSessionId();
        if (leaderName != null && leaderName.equals() && sessionId != null
            && sessionId.equals()) {
          return true;
        }
      }
    } catch (Exception e) {
      // log
    }
    return false;
  }
  public synchronized ZkHelixPropertyStore<ZNRecordgetHelixPropertyStore() {
    if ( == null) {
      String path = PropertyPathConfig.getPath(.);
       =
          new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(), path,
              null);
    }
    return ;
  }
  public synchronized HelixAdmin getClusterManagmentTool() {
    if ( != null) {
      return new ZKHelixAdmin();
    }
    .error("Couldn't get ZKClusterManagementTool because zkclient is null");
    return null;
  }
    // The caller can register message handler factories on messaging service before the
    // helix manager is connected. Thus we do not do connected check here.
    return ;
  }
  }
    return ;
  }
  public String getVersion() {
    return ;
  }
    return ;
  }
    return ;
  }
  // TODO: rename this and not expose this function as part of interface
  public void startTimerTasks() {
    for (HelixTimerTask task : ) {
      task.start();
    }
  }
  public void stopTimerTasks() {
    for (HelixTimerTask task : ) {
      task.stop();
    }
  }
  public void setLiveInstanceInfoProvider(LiveInstanceInfoProvider liveInstanceInfoProvider) {
     = liveInstanceInfoProvider;
  }

  
wait until we get a non-zero session-id. note that we might lose zkconnection right after we read session-id. but it's ok to get stale session-id and we will have another handle-new-session callback to correct this.
  void waitUntilConnected() {
    boolean isConnected;
    do {
      isConnected =
      if (!isConnected) {
        .error("fail to connect zkserver: " +  + " in "
            + . + "ms. expiredSessionId: " + 
            + ", clusterName: " + );
        continue;
      }
      ZkConnection zkConnection = ((ZkConnection.getConnection());
       = Long.toHexString(zkConnection.getZookeeper().getSessionId());

      
at the time we read session-id, zkconnection might be lost again wait until we get a non-zero session-id
    } while ("0".equals());
    .info("Handling new session, session id: " +  + ", instance: " + 
        + ", instanceTye: " +  + ", cluster: " +  + ", zkconnection: "
        + ((ZkConnection.getConnection()).getZookeeper());
  }
  void initHandlers(List<CallbackHandlerhandlers) {
    synchronized (this) {
      if (handlers != null) {
        for (CallbackHandler handler : handlers) {
          handler.init();
          .info("init handler: " + handler.getPath() + ", " + handler.getListener());
        }
      }
    }
  }
  void resetHandlers() {
    synchronized (this) {
      if ( != null) {
        // get a copy of the list and iterate over the copy list
        // in case handler.reset() modify the original handler list
        List<CallbackHandlertmpHandlers = new ArrayList<CallbackHandler>();
        tmpHandlers.addAll();
        for (CallbackHandler handler : tmpHandlers) {
          handler.reset();
          .info("reset handler: " + handler.getPath() + ", " + handler.getListener());
        }
      }
    }
  }

  
If zk state has changed into Disconnected for _maxDisconnectThreshold times during previous _timeWindowLengthMs Ms time window, we think that there are something wrong going on and disconnect the zkHelixManager from zk.
  boolean isFlapping() {
    if (.size() == 0) {
      return false;
    }
    long mostRecentTimestamp = .get(.size() - 1);
    // Remove disconnect history timestamp that are older than _flappingTimeWindowMs ago
    while ((.get(0) + ) < mostRecentTimestamp) {
    }
  }
  public void handleStateChanged(KeeperState statethrows Exception {
    switch (state) {
    case :
      ZkConnection zkConnection = (ZkConnection.getConnection();
      .info("KeeperState: " + state + ", zookeeper:" + zkConnection.getZookeeper());
      break;
    case :
      .info("KeeperState:" + state + ", disconnectedSessionId: " +  + ", instance: "
          +  + ", type: " + );

      
Track the time stamp that the disconnected happens, then check history and see if we should disconnect the helix-manager
      if (isFlapping()) {
        .error("instanceName: " +  + " is flapping. diconnect it. "
            + " maxDisconnectThreshold: " +  + " disconnects in "
            +  + "ms.");
        disconnect();
      }
      break;
    case :
      .info("KeeperState:" + state + ", expiredSessionId: " +  + ", instance: "
          +  + ", type: " + );
      break;
    default:
      break;
    }
  }
  public void handleNewSession() throws Exception {
    waitUntilConnected();

    
stop all timer tasks, reset all handlers, make sure cleanup completed for previous session disconnect if fail to cleanup
    if ( != null) {
    }
    resetHandlers();

    
clean up write-through cache
    .reset();

    
from here on, we are dealing with new session
    if (!ZKUtil.isClusterSetup()) {
      throw new HelixException("Cluster structure is not set up for cluster: " + );
    }
    switch () {
    case :
      break;
    case :
      break;
      break;
    case :
    case :
    default:
      break;
    }
    startTimerTasks();

    
init handlers ok to init message handler and data-accessor twice the second init will be skipped (see CallbackHandler)
  }
  void handleNewSessionAsParticipant() throws Exception {
    
auto-join
    ParticipantManagerHelper participantHelper =
    participantHelper.joinCluster();

    
Invoke PreConnectCallbacks
    for (PreConnectCallback callback : ) {
      callback.onPreConnect();
    }
    participantHelper.createLiveInstance();
    participantHelper.carryOverPreviousCurrentState();

    
setup message listener
    participantHelper.setupMsgHandler();

    
start health check timer task
    participantHelper.createHealthCheckPath();
  }
    if ( != null) {
    } else {
          new CallbackHandler(this.controller(),
              new DistributedLeaderElection(this),
              new EventType[] {
              }, .);
    }
  }
New to GrepCode? Check out our FAQ X