Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package org.apache.helix.manager.zk;
  
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
  *   http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 
 import java.util.List;
 import java.util.Map;
 
 
 public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
   private static final Logger LOG = Logger.getLogger(ZkCacheBaseDataAccessor.class);
 
   protected WriteThroughCache<T> _wtCache;
   protected ZkCallbackCache<T> _zkCache;
 
   final Map<StringCache<T>> _cacheMap;
 
   final String _chrootPath;
   final List<String_wtCachePaths;
   final List<String_zkCachePaths;
 
   final HelixGroupCommit<T> _groupCommit = new HelixGroupCommit<T>();
 
   // fire listeners
   private final ReentrantLock _eventLock = new ReentrantLock();
 
   private ZkClient _zkclient = null;
 
   public ZkCacheBaseDataAccessor(ZkBaseDataAccessor<T> baseAccessorList<StringwtCachePaths) {
     this(baseAccessornullwtCachePathsnull);
   }
 
   public ZkCacheBaseDataAccessor(ZkBaseDataAccessor<T> baseAccessorString chrootPath,
       List<StringwtCachePathsList<StringzkCachePaths) {
      = baseAccessor;
 
     if (chrootPath == null || chrootPath.equals("/")) {
        = null;
     } else {
       PathUtils.validatePath(chrootPath);
        = chrootPath;
     }
 
      = wtCachePaths;
      = zkCachePaths;
 
     // TODO: need to make sure no overlap between wtCachePaths and zkCachePaths
     // TreeMap key is ordered by key string length, so more general (i.e. short) prefix
     // comes first
      = new TreeMap<StringCache<T>>(new Comparator<String>() {
       @Override
       public int compare(String o1String o2) {
         int len1 = o1.split("/").length;
         int len2 = o2.split("/").length;
         return len1 - len2;
       }
     });
 
     start();
   }
  public ZkCacheBaseDataAccessor(String zkAddressZkSerializer serializerString chrootPath,
      List<StringwtCachePathsList<StringzkCachePaths) {
     =
        new ZkClient(zkAddress.,
            .serializer);
    if (chrootPath == null || chrootPath.equals("/")) {
       = null;
    } else {
      PathUtils.validatePath(chrootPath);
       = chrootPath;
    }
     = wtCachePaths;
     = zkCachePaths;
    // TODO: need to make sure no overlap between wtCachePaths and zkCachePaths
    // TreeMap key is ordered by key string length, so more general (i.e. short) prefix
    // comes first
     = new TreeMap<StringCache<T>>(new Comparator<String>() {
      @Override
      public int compare(String o1String o2) {
        int len1 = o1.split("/").length;
        int len2 = o2.split("/").length;
        return len1 - len2;
      }
    });
    start();
  }
  private String prependChroot(String clientPath) {
    PathUtils.validatePath(clientPath);
    if ( != null) {
      // handle clientPath = "/"
      if (clientPath.length() == 1) {
        return ;
      }
      return  + clientPath;
    } else {
      return clientPath;
    }
  }
  private List<StringprependChroot(List<StringclientPaths) {
    List<StringserverPaths = new ArrayList<String>();
    for (String clientPath : clientPaths) {
      serverPaths.add(prependChroot(clientPath));
    }
    return serverPaths;
  }

  
find the first path in paths that is a descendant
  private String firstCachePath(List<Stringpaths) {
    for (String cachePath : .keySet()) {
      for (String path : paths) {
        if (path.startsWith(cachePath)) {
          return path;
        }
      }
    }
    return null;
  }
  private Cache<T> getCache(String path) {
    for (String cachePath : .keySet()) {
      if (path.startsWith(cachePath)) {
        return .get(cachePath);
      }
    }
    return null;
  }
  private Cache<T> getCache(List<Stringpaths) {
    Cache<T> cache = null;
    for (String path : paths) {
      for (String cachePath : .keySet()) {
        if (cache == null && path.startsWith(cachePath)) {
          cache = .get(cachePath);
        } else if (cache != null && cache != .get(cachePath)) {
          throw new IllegalArgumentException("Couldn't do cross-cache async operations. paths: "
              + paths);
        }
      }
    }
    return cache;
  }
  private void updateCache(Cache<T> cacheList<StringcreatePathsboolean success,
      String updatePath, T dataStat stat) {
    if (createPaths == null || createPaths.isEmpty()) {
      if (success) {
        cache.update(updatePathdatastat);
      }
    } else {
      String firstPath = firstCachePath(createPaths);
      if (firstPath != null) {
        cache.updateRecursive(firstPath);
      }
    }
  }
  public boolean create(String path, T dataint options) {
    String clientPath = path;
    String serverPath = prependChroot(clientPath);
    Cache<T> cache = getCache(serverPath);
    if (cache != null) {
      try {
        cache.lockWrite();
        ZkBaseDataAccessor<T>.AccessResult result =
            .doCreate(serverPathdataoptions);
        boolean success = (result._retCode == .);
        updateCache(cacheresult._pathCreatedsuccessserverPathdata.);
        return success;
      } finally {
        cache.unlockWrite();
      }
    }
    // no cache
    return .create(serverPathdataoptions);
  }
  public boolean set(String path, T dataint options) {
    return set(pathdata, -1, options);
  }
  public boolean set(String path, T dataint expectVersionint options) {
    String clientPath = path;
    String serverPath = prependChroot(clientPath);
    Cache<T> cache = getCache(serverPath);
    if (cache != null) {
      try {
        cache.lockWrite();
        ZkBaseDataAccessor<T>.AccessResult result =
            .doSet(serverPathdataexpectVersionoptions);
        boolean success = (result._retCode == .);
        updateCache(cacheresult._pathCreatedsuccessserverPathdataresult._stat);
        return success;
      } catch (Exception e) {
        return false;
      } finally {
        cache.unlockWrite();
      }
    }
    // no cache
    return .set(serverPathdataexpectVersionoptions);
  }
  public boolean update(String pathDataUpdater<T> updaterint options) {
    String clientPath = path;
    String serverPath = prependChroot(clientPath);
    Cache<T> cache = getCache(serverPath);
    if (cache != null) {
      try {
        cache.lockWrite();
        ZkBaseDataAccessor<T>.AccessResult result =
            .doUpdate(serverPathupdateroptions);
        boolean success = (result._retCode == .);
        updateCache(cacheresult._pathCreatedsuccessserverPathresult._resultValue,
            result._stat);
        return success;
      } finally {
        cache.unlockWrite();
      }
    }
    // no cache
    return .commit(optionsserverPathupdater);
    // return _baseAccessor.update(serverPath, updater, options);
  }
  public boolean exists(String pathint options) {
    String clientPath = path;
    String serverPath = prependChroot(clientPath);
    Cache<T> cache = getCache(serverPath);
    if (cache != null) {
      boolean exists = cache.exists(serverPath);
      if (exists) {
        return true;
      }
    }
    // if not exists in cache, always fall back to zk
    return .exists(serverPathoptions);
  }
  public boolean remove(String pathint options) {
    String clientPath = path;
    String serverPath = prependChroot(clientPath);
    Cache<T> cache = getCache(serverPath);
    if (cache != null) {
      try {
        cache.lockWrite();
        boolean success = .remove(serverPathoptions);
        if (success) {
          cache.purgeRecursive(serverPath);
        }
        return success;
      } finally {
        cache.unlockWrite();
      }
    }
    // no cache
    return .remove(serverPathoptions);
  }
  public T get(String pathStat statint options) {
    String clientPath = path;
    String serverPath = prependChroot(clientPath);
    Cache<T> cache = getCache(serverPath);
    if (cache != null) {
      T record = null;
      ZNode znode = cache.get(serverPath);
      if (znode != null) {
        // TODO: shall return a deep copy instead of reference
        record = ((T) znode.getData());
        if (stat != null) {
          DataTree.copyStat(znode.getStat(), stat);
        }
        return record;
      } else {
        // if cache miss, fall back to zk and update cache
        try {
          cache.lockWrite();
          record =
              
                  .get(serverPathstatoptions | .);
          cache.update(serverPathrecordstat);
        } catch (ZkNoNodeException e) {
          if (AccessOption.isThrowExceptionIfNotExist(options)) {
            throw e;
          }
        } finally {
          cache.unlockWrite();
        }
        return record;
      }
    }
    // no cache
    return .get(serverPathstatoptions);
  }
  public Stat getStat(String pathint options) {
    String clientPath = path;
    String serverPath = prependChroot(clientPath);
    Cache<T> cache = getCache(serverPath);
    if (cache != null) {
      Stat stat = new Stat();
      ZNode znode = cache.get(serverPath);
      if (znode != null) {
        return znode.getStat();
      } else {
        // if cache miss, fall back to zk and update cache
        try {
          cache.lockWrite();
          T data = .get(serverPathstatoptions);
          cache.update(serverPathdatastat);
        } catch (ZkNoNodeException e) {
          return null;
        } finally {
          cache.unlockWrite();
        }
        return stat;
      }
    }
    // no cache
    return .getStat(serverPathoptions);
  }
  public boolean[] createChildren(List<StringpathsList<T> recordsint options) {
    final int size = paths.size();
    List<StringserverPaths = prependChroot(paths);
    Cache<T> cache = getCache(serverPaths);
    if (cache != null) {
      try {
        cache.lockWrite();
        boolean[] needCreate = new boolean[size];
        Arrays.fill(needCreatetrue);
        List<ZkBaseDataAccessor<T>.AccessResultresults =
            .doCreate(serverPathsrecordsneedCreateoptions);
        boolean[] success = new boolean[size];
        for (int i = 0; i < sizei++) {
          ZkBaseDataAccessor<T>.AccessResult result = results.get(i);
          success[i] = (result._retCode == .);
          updateCache(cacheresults.get(i).success[i], serverPaths.get(i),
              records.get(i), .);
        }
        return success;
      } finally {
        cache.unlockWrite();
      }
    }
    // no cache
    return .createChildren(serverPathsrecordsoptions);
  }
  public boolean[] setChildren(List<StringpathsList<T> recordsint options) {
    final int size = paths.size();
    List<StringserverPaths = prependChroot(paths);
    Cache<T> cache = getCache(serverPaths);
    if (cache != null) {
      try {
        cache.lockWrite();
        List<ZkBaseDataAccessor<T>.AccessResultresults =
            .doSet(serverPathsrecordsoptions);
        boolean[] success = new boolean[size];
        for (int i = 0; i < sizei++) {
          success[i] = (results.get(i). == .);
          updateCache(cacheresults.get(i).success[i], serverPaths.get(i),
              records.get(i), results.get(i).);
        }
        return success;
      } finally {
        cache.unlockWrite();
      }
    }
    return .setChildren(serverPathsrecordsoptions);
  }
  public boolean[] updateChildren(List<StringpathsList<DataUpdater<T>> updatersint options) {
    final int size = paths.size();
    List<StringserverPaths = prependChroot(paths);
    Cache<T> cache = getCache(serverPaths);
    if (cache != null) {
      try {
        cache.lockWrite();
        boolean[] success = new boolean[size];
        List<List<String>> pathsCreatedList =
            new ArrayList<List<String>>(Collections.<List<String>> nCopies(sizenull));
        List<ZkBaseDataAccessor<T>.AccessResultresults = .doUpdate(serverPathsupdatersoptions);
        for (int i = 0; i < sizei++) {
          ZkBaseDataAccessor<T>.AccessResult result = results.get(i);
          success[i] = (result._retCode == .);
            updateCache(cachepathsCreatedList.get(i), success[i], serverPaths.get(i),
              result._resultValueresults.get(i).);
        }
        return success;
      } finally {
        cache.unlockWrite();
      }
    }
    // no cache
    return .updateChildren(serverPathsupdatersoptions);
  }
  // TODO: change to use async_exists
  public boolean[] exists(List<Stringpathsint options) {
    final int size = paths.size();
    List<StringserverPaths = prependChroot(paths);
    boolean exists[] = new boolean[size];
    for (int i = 0; i < sizei++) {
      exists[i] = exists(serverPaths.get(i), options);
    }
    return exists;
  }
  public boolean[] remove(List<Stringpathsint options) {
    final int size = paths.size();
    List<StringserverPaths = prependChroot(paths);
    Cache<T> cache = getCache(serverPaths);
    if (cache != null) {
      try {
        cache.lockWrite();
        boolean[] success = .remove(serverPathsoptions);
        for (int i = 0; i < sizei++) {
          if (success[i]) {
            cache.purgeRecursive(serverPaths.get(i));
          }
        }
        return success;
      } finally {
        cache.unlockWrite();
      }
    }
    // no cache
    return .remove(serverPathsoptions);
  }
  public List<T> get(List<StringpathsList<Statstatsint options) {
    if (paths == null || paths.isEmpty()) {
      return Collections.emptyList();
    }
    final int size = paths.size();
    List<StringserverPaths = prependChroot(paths);
    List<T> records = new ArrayList<T>(Collections.<T> nCopies(sizenull));
    List<StatreadStats = new ArrayList<Stat>(Collections.<StatnCopies(sizenull));
    boolean needRead = false;
    boolean needReads[] = new boolean[size]; // init to false
    Cache<T> cache = getCache(serverPaths);
    if (cache != null) {
      try {
        cache.lockRead();
        for (int i = 0; i < sizei++) {
          ZNode zNode = cache.get(serverPaths.get(i));
          if (zNode != null) {
            // TODO: shall return a deep copy instead of reference
            records.set(i, (T) zNode.getData());
            readStats.set(izNode.getStat());
          } else {
            needRead = true;
            needReads[i] = true;
          }
        }
      } finally {
        cache.unlockRead();
      }
      // cache miss, fall back to zk and update cache
      if (needRead) {
        cache.lockWrite();
        try {
          List<ZkBaseDataAccessor<T>.AccessResultreadResults =
              .doGet(serverPathsneedReads);
          for (int i = 0; i < sizei++) {
            if (needReads[i]) {
              records.set(ireadResults.get(i).);
              readStats.set(ireadResults.get(i).);
              cache.update(serverPaths.get(i), records.get(i), readStats.get(i));
            }
          }
        } finally {
          cache.unlockWrite();
        }
      }
      if (stats != null) {
        stats.clear();
        stats.addAll(readStats);
      }
      return records;
    }
    // no cache
    return .get(serverPathsstatsoptions);
  }
  // TODO: add cache
  public Stat[] getStats(List<Stringpathsint options) {
    List<StringserverPaths = prependChroot(paths);
    return .getStats(serverPathsoptions);
  }
  public List<StringgetChildNames(String parentPathint options) {
    String serverParentPath = prependChroot(parentPath);
    Cache<T> cache = getCache(serverParentPath);
    if (cache != null) {
      // System.out.println("zk-cache");
      ZNode znode = cache.get(serverParentPath);
      if (znode != null && znode.getChildSet() != Collections.<StringemptySet()) {
        // System.out.println("zk-cache-hit: " + parentPath);
        List<StringchildNames = new ArrayList<String>(znode.getChildSet());
        Collections.sort(childNames);
        return childNames;
      } else {
        // System.out.println("zk-cache-miss");
        try {
          cache.lockWrite();
          List<StringchildNames = .getChildNames(serverParentPathoptions);
          // System.out.println("\t--" + childNames);
          cache.addToParentChildSet(serverParentPathchildNames);
          return childNames;
        } finally {
          cache.unlockWrite();
        }
      }
    }
    // no cache
    return .getChildNames(serverParentPathoptions);
  }
  public List<T> getChildren(String parentPathList<Statstatsint options) {
    List<StringchildNames = getChildNames(parentPathoptions);
    if (childNames == null) {
      return null;
    }
    List<Stringpaths = new ArrayList<String>();
    for (String childName : childNames) {
      String path = parentPath + "/" + childName;
      paths.add(path);
    }
    return get(pathsstatsoptions);
  }
  public void subscribeDataChanges(String pathIZkDataListener listener) {
    String serverPath = prependChroot(path);
    .subscribeDataChanges(serverPathlistener);
  }
  public void unsubscribeDataChanges(String pathIZkDataListener listener) {
    String serverPath = prependChroot(path);
    .unsubscribeDataChanges(serverPathlistener);
  }
  public List<StringsubscribeChildChanges(String pathIZkChildListener listener) {
    String serverPath = prependChroot(path);
    return .subscribeChildChanges(serverPathlistener);
  }
  public void unsubscribeChildChanges(String pathIZkChildListener listener) {
    String serverPath = prependChroot(path);
    .unsubscribeChildChanges(serverPathlistener);
  }
  public void subscribe(String parentPathHelixPropertyListener listener) {
    String serverPath = prependChroot(parentPath);
    .subscribe(serverPathlistener);
  }
  public void unsubscribe(String parentPathHelixPropertyListener listener) {
    String serverPath = prependChroot(parentPath);
    .unsubscribe(serverPathlistener);
  }
  public void start() {
    .info("START: Init ZkCacheBaseDataAccessor: " +  + ", " +  + ", "
        + );
    // start event thread
    try {
      if ( != null) {
        .warn( + " has already started");
      } else {
        if ( == null || .isEmpty()) {
          .warn("ZkCachePaths is null or empty. Will not start ZkCacheEventThread");
        } else {
          .debug("Starting ZkCacheEventThread...");
           = new ZkCacheEventThread("");
          .start();
        }
      }
    } catch (InterruptedException e) {
      .error("Current thread is interrupted when starting ZkCacheEventThread. "e);
    } finally {
      .unlock();
    }
    .debug("Start ZkCacheEventThread...done");
    if ( != null && !.isEmpty()) {
      for (String path : ) {
        .put(path);
      }
    }
    if ( != null && !.isEmpty()) {
      for (String path : ) {
        .put(path);
      }
    }
  }
  public void stop() {
    try {
      if ( != null) {
        .close();
         = null;
      }
      if ( == null) {
        .warn( + " has already stopped");
        return;
      }
      .debug("Stopping ZkCacheEventThread...");
      .interrupt();
      .join(2000);
       = null;
    } catch (InterruptedException e) {
      .error("Current thread is interrupted when stopping ZkCacheEventThread.");
    } finally {
      .unlock();
    }
    .debug("Stop ZkCacheEventThread...done");
  }
  public void reset() {
    if ( != null) {
      .reset();
    }
    if ( != null) {
      .reset();
    }
  }
New to GrepCode? Check out our FAQ X