Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package org.apache.helix.manager.zk;
  
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
  *   http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 
 import static org.apache.helix.HelixConstants.ChangeType.CONFIG;
 import static org.apache.helix.HelixConstants.ChangeType.CURRENT_STATE;
 import static org.apache.helix.HelixConstants.ChangeType.EXTERNAL_VIEW;
 import static org.apache.helix.HelixConstants.ChangeType.IDEAL_STATE;
 import static org.apache.helix.HelixConstants.ChangeType.LIVE_INSTANCE;
 import static org.apache.helix.HelixConstants.ChangeType.MESSAGE;
 import static org.apache.helix.HelixConstants.ChangeType.MESSAGES_CONTROLLER;
 
 import java.util.List;
 import java.util.Map;
 
This is a copy of CallbackHandler We need to synchronize on ZkHelixConnection instead ofHelixManager to avoid dead-lock. Otherwise an example deadlock scenario would be: 1) main-thread calls ZkHelixConnection#disconnect(), results in: - ZkHelixController#reset(), holding ZkHelixConnection, waiting HelixConnectionAdaptor 2) zk-event-thread calls CallbackHandler#handleChildChange(), results in: - CallbackHandler#invoke(), holding HelixConnectionAdaptor, waiting ZkHelixConnection TODO remove code duplication
 
 public class ZkCallbackHandler implements IZkChildListenerIZkDataListener
 
 {
   private static Logger logger = Logger.getLogger(ZkCallbackHandler.class);

  
define the next possible notification types
 
   private static Map<TypeList<Type>> nextNotificationType = new HashMap<TypeList<Type>>();
   static {
   }
 
   private final String _path;
   private final Object _listener;
   private final EventType[] _eventTypes;
  private final ChangeType _changeType;
  private final ZkClient _zkClient;
  private final HelixRole _role;
  private final HelixManager _manager;
  private final String _instanceName;
  private final HelixConnection _connection;
  private final HelixDataAccessor _accessor;
  private final PropertyKey _propertyKey;

  
maintain the expected notification types this is fix for HELIX-195: race condition between FINALIZE callbacks and Zk callbacks
  public ZkCallbackHandler(HelixRole roleZkClient client,
      PropertyKey propertyKey,
      Object listenerEventType[] eventTypesChangeType changeType) {
    if (listener == null) {
      throw new HelixException("listener could not be null");
    }
     = role;
     = new HelixConnectionAdaptor(role);
     = role.getId().stringify();
     = role.getConnection();
     = client;
     = propertyKey;
     = propertyKey.getPath();
     = listener;
     = eventTypes;
     = changeType;
    init();
  }
  public Object getListener() {
    return ;
  }
  public String getPath() {
    return ;
  }
  public void invoke(NotificationContext changeContextthrows Exception {
    // This allows the listener to work with one change at a time
    synchronized () {
      Type type = changeContext.getType();
      if (!.contains(type)) {
        .warn("Skip processing callbacks for listener: " +  + ", path: " + 
            + ", expected types: " +  + " but was " + type);
        return;
      }
      // Builder keyBuilder = _accessor.keyBuilder();
      long start = System.currentTimeMillis();
      if (.isInfoEnabled()) {
        .info(Thread.currentThread().getId() + " START:INVOKE " +  + " listener:"
            + .getClass().getCanonicalName());
      }
      if ( == ) {
        IdealStateChangeListener idealStateChangeListener = (IdealStateChangeListener;
        subscribeForChanges(changeContexttruetrue);
        List<IdealStateidealStates = .getChildValues();
        idealStateChangeListener.onIdealStateChange(idealStateschangeContext);
      } else if ( == .) {
        subscribeForChanges(changeContexttruetrue);
        if ( instanceof ConfigChangeListener) {
          ConfigChangeListener configChangeListener = (ConfigChangeListener;
          List<InstanceConfigconfigs = .getChildValues();
          configChangeListener.onConfigChange(configschangeContext);
        } else if ( instanceof InstanceConfigChangeListener) {
          List<InstanceConfigconfigs = .getChildValues();
          listener.onInstanceConfigChange(configschangeContext);
        }
      } else if ( == ) {
        subscribeForChanges(changeContexttruetrue);
        listener.onConfigChange(configschangeContext);
      } else if ( == ) {
        LiveInstanceChangeListener liveInstanceChangeListener =
            (LiveInstanceChangeListener;
        subscribeForChanges(changeContexttruetrue);
        List<LiveInstanceliveInstances = .getChildValues();
        liveInstanceChangeListener.onLiveInstanceChange(liveInstanceschangeContext);
      } else if ( == ) {
        CurrentStateChangeListener currentStateChangeListener =
            (CurrentStateChangeListener;
        subscribeForChanges(changeContexttruetrue);
        String instanceName = PropertyPathConfig.getInstanceNameFromPath();
        List<CurrentStatecurrentStates = .getChildValues();
        currentStateChangeListener.onStateChange(instanceNamecurrentStateschangeContext);
      } else if ( == ) {
        MessageListener messageListener = (MessageListener;
        subscribeForChanges(changeContexttruefalse);
        String instanceName = PropertyPathConfig.getInstanceNameFromPath();
        List<Messagemessages = .getChildValues();
        messageListener.onMessage(instanceNamemessageschangeContext);
      } else if ( == ) {
        MessageListener messageListener = (MessageListener;
        subscribeForChanges(changeContexttruefalse);
        List<Messagemessages = .getChildValues();
        messageListener.onMessage(messageschangeContext);
      } else if ( == ) {
        ExternalViewChangeListener externalViewListener = (ExternalViewChangeListener;
        subscribeForChanges(changeContexttruetrue);
        List<ExternalViewexternalViewList = .getChildValues();
        externalViewListener.onExternalViewChange(externalViewListchangeContext);
      } else if ( == .) {
        ControllerChangeListener controllerChangelistener = (ControllerChangeListener;
        subscribeForChanges(changeContexttruefalse);
        controllerChangelistener.onControllerChange(changeContext);
      } else if ( == .) {
        HealthStateChangeListener healthStateChangeListener = (HealthStateChangeListener;
        subscribeForChanges(changeContexttruetrue); // TODO: figure out
        // settings here
        String instanceName = PropertyPathConfig.getInstanceNameFromPath();
        List<HealthStathealthReportList = .getChildValues();
        healthStateChangeListener.onHealthChange(instanceNamehealthReportListchangeContext);
      }
      long end = System.currentTimeMillis();
      if (.isInfoEnabled()) {
        .info(Thread.currentThread().getId() + " END:INVOKE " +  + " listener:"
            + .getClass().getCanonicalName() + " Took: " + (end - start) + "ms");
      }
    }
  }
  private void subscribeChildChange(String pathNotificationContext context) {
    NotificationContext.Type type = context.getType();
    if (type == .. || type == ..) {
      .info( + " subscribes child-change. path: " + path
          + ", listener: " + );
      .subscribeChildChanges(paththis);
    } else if (type == ..) {
      .info( + " unsubscribe child-change. path: " + path
          + ", listener: " + );
      .unsubscribeChildChanges(paththis);
    }
  }
  private void subscribeDataChange(String pathNotificationContext context) {
    NotificationContext.Type type = context.getType();
    if (type == .. || type == ..) {
      if (.isDebugEnabled()) {
        .debug( + " subscribe data-change. path: " + path
            + ", listener: " + );
      }
      .subscribeDataChanges(paththis);
    } else if (type == ..) {
      .info( + " unsubscribe data-change. path: " + path
          + ", listener: " + );
      .unsubscribeDataChanges(paththis);
    }
  }
  // TODO watchParent is always true. consider remove it
  private void subscribeForChanges(NotificationContext contextString pathboolean watchParent,
      boolean watchChild) {
    if (watchParent) {
      subscribeChildChange(pathcontext);
    }
    if (watchChild) {
      try {
        switch () {
        case :
        case :
        case : {
          // check if bucketized
          BaseDataAccessor<ZNRecordbaseAccessor = new ZkBaseDataAccessor<ZNRecord>();
          List<ZNRecordrecords = baseAccessor.getChildren(pathnull, 0);
          for (ZNRecord record : records) {
            HelixProperty property = new HelixProperty(record);
            String childPath = path + "/" + record.getId();
            int bucketSize = property.getBucketSize();
            if (bucketSize > 0) {
              // subscribe both data-change and child-change on bucketized parent node
              // data-change gives a delete-callback which is used to remove watch
              subscribeChildChange(childPathcontext);
              subscribeDataChange(childPathcontext);
              // subscribe data-change on bucketized child
              List<StringbucketizedChildNames = .getChildren(childPath);
              if (bucketizedChildNames != null) {
                for (String bucketizedChildName : bucketizedChildNames) {
                  String bucketizedChildPath = childPath + "/" + bucketizedChildName;
                  subscribeDataChange(bucketizedChildPathcontext);
                }
              }
            } else {
              subscribeDataChange(childPathcontext);
            }
          }
          break;
        }
        default: {
          List<StringchildNames = .getChildren(path);
          if (childNames != null) {
            for (String childName : childNames) {
              String childPath = path + "/" + childName;
              subscribeDataChange(childPathcontext);
            }
          }
          break;
        }
        }
      } catch (ZkNoNodeException e) {
        .warn("fail to subscribe child/data change. path: " + path + ", listener: "
            + e);
      }
    }
  }
  public EventType[] getEventTypes() {
    return ;
  }

  
Invoke the listener so that it sets up the initial values from the zookeeper if any exists
  public void init() {
    try {
      NotificationContext changeContext = new NotificationContext();
      changeContext.setType(..);
      invoke(changeContext);
    } catch (Exception e) {
      String msg = "Exception while invoking init callback for listener:" + ;
      ZKExceptionHandler.getInstance().handle(msge);
    }
  }
  public void handleDataChange(String dataPathObject data) {
    try {
      updateNotificationTime(System.nanoTime());
      if (dataPath != null && dataPath.startsWith()) {
        NotificationContext changeContext = new NotificationContext();
        changeContext.setType(..);
        invoke(changeContext);
      }
    } catch (Exception e) {
      String msg =
          "exception in handling data-change. path: " + dataPath + ", listener: " + ;
      ZKExceptionHandler.getInstance().handle(msge);
    }
  }
  public void handleDataDeleted(String dataPath) {
    try {
      updateNotificationTime(System.nanoTime());
      if (dataPath != null && dataPath.startsWith()) {
        .info( + " unsubscribe data-change. path: " + dataPath
            + ", listener: " + );
        .unsubscribeDataChanges(dataPaththis);
        // only needed for bucketized parent, but OK if we don't have child-change
        // watch on the bucketized parent path
        .info( + " unsubscribe child-change. path: " + dataPath
            + ", listener: " + );
        .unsubscribeChildChanges(dataPaththis);
        // No need to invoke() since this event will handled by child-change on parent-node
        // NotificationContext changeContext = new NotificationContext(_manager);
        // changeContext.setType(NotificationContext.Type.CALLBACK);
        // invoke(changeContext);
      }
    } catch (Exception e) {
      String msg =
          "exception in handling data-delete-change. path: " + dataPath + ", listener: "
              + ;
      ZKExceptionHandler.getInstance().handle(msge);
    }
  }
  public void handleChildChange(String parentPathList<StringcurrentChilds) {
    try {
      updateNotificationTime(System.nanoTime());
      if (parentPath != null && parentPath.startsWith()) {
        NotificationContext changeContext = new NotificationContext();
        if (currentChilds == null) {
          // parentPath has been removed
          if (parentPath.equals()) {
            // _path has been removed, remove this listener
            .removeListener();
          }
          changeContext.setType(..);
        } else {
          changeContext.setType(..);
        }
        invoke(changeContext);
      }
    } catch (Exception e) {
      String msg =
          "exception in handling child-change. instance: " + 
              + ", parentPath: " + parentPath + ", listener: " + ;
      ZKExceptionHandler.getInstance().handle(msge);
    }
  }

  
Invoke the listener for the last time so that the listener could clean up resources
  public void reset() {
    try {
      NotificationContext changeContext = new NotificationContext();
      changeContext.setType(..);
      invoke(changeContext);
    } catch (Exception e) {
      String msg = "Exception while resetting the listener:" + ;
      ZKExceptionHandler.getInstance().handle(msge);
    }
  }
  private void updateNotificationTime(long nanoTime) {
    long l = .get();
    while (nanoTime > l) {
      boolean b = .compareAndSet(lnanoTime);
      if (b) {
        break;
      } else {
        l = .get();
      }
    }
  }
New to GrepCode? Check out our FAQ X