Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Copyright The Apache Software Foundation Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 package org.apache.hadoop.hbase.zookeeper.lock;
 
 import java.util.List;
 
 
ZooKeeper based HLock implementation. Based on the Shared Locks recipe. (see: ZooKeeper Recipes and Solutions )
 
 public abstract class ZKInterProcessLockBase implements InterProcessLock {
 
   private static final Log LOG = LogFactory.getLog(ZKInterProcessLockBase.class);

  
ZNode prefix used by processes acquiring reader locks
 
   protected static final String READ_LOCK_CHILD_NODE_PREFIX = "read-";

  
ZNode prefix used by processes acquiring writer locks
 
   protected static final String WRITE_LOCK_CHILD_NODE_PREFIX = "write-";
 
   protected final ZooKeeperWatcher zkWatcher;
   protected final String parentLockNode;
   protected final String fullyQualifiedZNode;
   protected final String childZNode;
   protected final byte[] metadata;
   protected final MetadataHandler handler;
 
   // If we acquire a lock, update this field
   protected final AtomicReference<AcquiredLockacquiredLock =
       new AtomicReference<AcquiredLock>(null);

  
Represents information about a lock held by this thread.
 
   protected static class AcquiredLock {
     private final String path;
     private final int version;

    
Store information about a lock.

Parameters:
path The path to a lock's ZNode
version The current version of the lock's ZNode
 
     public AcquiredLock(String pathint version) {
       this. = path;
       this. = version;
     }
 
     public String getPath() {
       return ;
     }
 
     public int getVersion() {
       return ;
     }
 
     @Override
     public String toString() {
      return "AcquiredLockInfo{" +
          "path='" +  + '\'' +
          ", version=" +  +
          '}';
    }
  }
  protected static class ZNodeComparator implements Comparator<String> {
    public static final ZNodeComparator COMPARATOR = new ZNodeComparator();
    private ZNodeComparator() {
    }

    
Parses sequenceId from the znode name. Zookeeper documentation states: The sequence number is always fixed length of 10 digits, 0 padded
    public static long getChildSequenceId(String childZNode) {
      Preconditions.checkNotNull(childZNode);
      assert childZNode.length() >= 10;
      String sequenceIdStr = childZNode.substring(childZNode.length() - 10);
      return Long.parseLong(sequenceIdStr);
    }
    @Override
    public int compare(String zNode1String zNode2) {
      long seq1 = getChildSequenceId(zNode1);
      long seq2 = getChildSequenceId(zNode2);
      if (seq1 == seq2) {
        return 0;
      } else {
        return seq1 < seq2 ? -1 : 1;
      }
    }
  }

  
Called by implementing classes.

Parameters:
zkWatcher
parentLockNode The lock ZNode path
metadata
handler
childNode The prefix for child nodes created under the parent
  protected ZKInterProcessLockBase(ZooKeeperWatcher zkWatcher,
      String parentLockNodebyte[] metadataMetadataHandler handlerString childNode) {
    this. = zkWatcher;
    this. = parentLockNode;
    this. = ZKUtil.joinZNode(parentLockNodechildNode);
    this. = metadata;
    this. = handler;
    this. = childNode;
  }

  
  public void acquire() throws IOExceptionInterruptedException {
    tryAcquire(-1);
  }

  
  public boolean tryAcquire(long timeoutMs)
    boolean hasTimeout = timeoutMs != -1;
    long waitUntilMs =
        hasTimeout ?EnvironmentEdgeManager.currentTimeMillis() + timeoutMs : -1;
    String createdZNode;
    try {
      createdZNode = createLockZNode();
    } catch (KeeperException ex) {
      throw new IOException("Failed to create znode: " + ex);
    }
    while (true) {
      List<Stringchildren;
      try {
        children = ZKUtil.listChildrenNoWatch();
      } catch (KeeperException e) {
        .error("Unexpected ZooKeeper error when listing children"e);
        throw new IOException("Unexpected ZooKeeper exception"e);
      }
      String pathToWatch;
      if ((pathToWatch = getLockPath(createdZNodechildren)) == null) {
        break;
      }
      CountDownLatch deletedLatch = new CountDownLatch(1);
      String zkPathToWatch =
          ZKUtil.joinZNode(pathToWatch);
      DeletionListener deletionListener =
          new DeletionListener(zkPathToWatchdeletedLatch);
      .registerListener(deletionListener);
      try {
        if (ZKUtil.setWatchIfNodeExists(zkPathToWatch)) {
          // Wait for the watcher to fire
          if (hasTimeout) {
            long remainingMs = waitUntilMs - EnvironmentEdgeManager.currentTimeMillis();
            if (remainingMs < 0 ||
                !deletedLatch.await(remainingMs.)) {
              .warn("Unable to acquire the lock in " + timeoutMs +
                  " milliseconds.");
              try {
                ZKUtil.deleteNode(createdZNode);
              } catch (KeeperException e) {
                .warn("Unable to remove ZNode " + createdZNode);
              }
              return false;
            }
          } else {
            deletedLatch.await();
          }
          if (deletionListener.hasException()) {
            Throwable t = deletionListener.getException();
            throw new IOException("Exception in the watcher"t);
          }
        }
      } catch (KeeperException e) {
        throw new IOException("Unexpected ZooKeeper exception"e);
      } finally {
        .unregisterListener(deletionListener);
      }
    }
    updateAcquiredLock(createdZNode);
    .debug("Acquired a lock for " + createdZNode);
    return true;
  }
  private String createLockZNode() throws KeeperException {
    try {
    } catch (KeeperException.NoNodeException nne) {
      //create parents, retry
      return createLockZNode();
    }
  }

  
Check if a child znode represents a read lock.

Parameters:
child The child znode we want to check.
Returns:
whether the child znode represents a read lock
  protected static boolean isChildReadLock(String child) {
    int idx = child.lastIndexOf(.);
    String suffix = child.substring(idx + 1);
  }

  
Check if a child znode represents a write lock.

Parameters:
child The child znode we want to check.
Returns:
whether the child znode represents a write lock
  protected static boolean isChildWriteLock(String child) {
    int idx = child.lastIndexOf(.);
    String suffix = child.substring(idx + 1);
  }

  
Check if a child znode represents the same type(read or write) of lock

Parameters:
child The child znode we want to check.
Returns:
whether the child znode represents the same type(read or write) of lock
  protected boolean isChildOfSameType(String child) {
    int idx = child.lastIndexOf(.);
    String suffix = child.substring(idx + 1);
    return suffix.startsWith(this.);
  }

  
Update state as to indicate that a lock is held

Parameters:
createdZNode The lock znode
Throws:
java.io.IOException If an unrecoverable ZooKeeper error occurs
  protected void updateAcquiredLock(String createdZNodethrows IOException {
    Stat stat = new Stat();
    byte[] data = null;
    Exception ex = null;
    try {
      data = ZKUtil.getDataNoWatch(createdZNodestat);
    } catch (KeeperException e) {
      .warn("Cannot getData for znode:" + createdZNodee);
      ex = e;
    }
    if (data == null) {
      .error("Can't acquire a lock on a non-existent node " + createdZNode);
      throw new IllegalStateException("ZNode " + createdZNode +
          "no longer exists!"ex);
    }
    AcquiredLock newLock = new AcquiredLock(createdZNodestat.getVersion());
    if (!.compareAndSet(nullnewLock)) {
      .error("The lock " +  +
          " has already been acquired by another process!");
          " is held by another process");
    }
  }

  
  public void release() throws IOExceptionInterruptedException {
    AcquiredLock lock = .get();
    if (lock == null) {
      .error("Cannot release lock" +
          ", process does not have a lock for " + );
      throw new IllegalStateException("No lock held for " + );
    }
    try {
      if (ZKUtil.checkExists(lock.getPath()) != -1) {
        boolean ret = ZKUtil.deleteNode(lock.getPath(), lock.getVersion());
        if (!ret && ZKUtil.checkExists(lock.getPath()) != -1) {
          throw new IllegalStateException("Couldn't delete " + lock.getPath());
        }
        if (!.compareAndSet(locknull)) {
          .debug("Current process no longer holds " + lock + " for " +
              );
          throw new IllegalStateException("Not holding a lock for " +
               +"!");
        }
      }
      if (.isDebugEnabled()) {
        .debug("Released " + lock.getPath());
      }
    } catch (BadVersionException e) {
      throw new IllegalStateException(e);
    } catch (KeeperException e) {
      throw new IOException(e);
    }
  }

  
Process metadata stored in a ZNode using a callback

Parameters:
lockZNode The node holding the metadata
Returns:
True if metadata was ready and processed, false otherwise.
  protected boolean handleLockMetadata(String lockZNode) {
    return handleLockMetadata(lockZNode);
  }

  
Process metadata stored in a ZNode using a callback object passed to this instance.

Parameters:
lockZNode The node holding the metadata
handler the metadata handler
Returns:
True if metadata was ready and processed, false on exception.
  protected boolean handleLockMetadata(String lockZNodeMetadataHandler handler) {
    if (handler == null) {
      return false;
    }
    try {
      byte[] metadata = ZKUtil.getData(lockZNode);
      handler.handleMetadata(metadata);
    } catch (KeeperException ex) {
      .warn("Error processing lock metadata in " + lockZNode);
      return false;
    }
    return true;
  }
  public void reapAllLocks() throws IOException {
  }

  
Will delete all lock znodes of this type (either read or write) which are "expired" according to timeout. Assumption is that the clock skew between zookeeper and this servers is negligible. Referred in zk recipe as "Revocable Shared Locks with Freaking Laser Beams". (http://zookeeper.apache.org/doc/trunk/recipes.html).
  public void reapExpiredLocks(long timeoutthrows IOException {
    List<Stringchildren;
    try {
      children = ZKUtil.listChildrenNoWatch();
    } catch (KeeperException e) {
      .error("Unexpected ZooKeeper error when listing children"e);
      throw new IOException("Unexpected ZooKeeper exception"e);
    }
    if (children == nullreturn;
    KeeperException deferred = null;
    Stat stat = new Stat();
    long expireDate = System.currentTimeMillis() - timeout//we are using cTime in zookeeper
    for (String child : children) {
      if (isChildOfSameType(child)) {
        String znode = ZKUtil.joinZNode(child);
        try {
          ZKUtil.getDataNoWatch(znodestat);
          if (stat.getCtime() < expireDate) {
            .info("Reaping lock for znode:" + znode);
            ZKUtil.deleteNodeFailSilent(znode);
          }
        } catch (KeeperException ex) {
          .warn("Error reaping the znode for write lock :" + znode);
          deferred = ex;
        }
      }
    }
    if (deferred != null) {
      throw new IOException("ZK exception while reaping locks:"deferred);
    }
  }

  
Visits the locks (both held and attempted) with the given MetadataHandler.

Throws:
java.lang.InterruptedException If there is an unrecoverable error
  public void visitLocks(MetadataHandler handlerthrows IOException {
    List<Stringchildren;
    try {
      children = ZKUtil.listChildrenNoWatch();
    } catch (KeeperException e) {
      .error("Unexpected ZooKeeper error when listing children"e);
      throw new IOException("Unexpected ZooKeeper exception"e);
    }
    if (children != null && children.size() > 0) {
      for (String child : children) {
        if (isChildOfSameType(child)) {
          String znode = ZKUtil.joinZNode(child);
          String childWatchesZNode = getLockPath(childchildren);
          if (childWatchesZNode == null) {
            .info("Lock is held by: " + child);
          }
          handleLockMetadata(znodehandler);
        }
      }
    }
  }

  
Determine based on a list of children under a ZNode, whether or not a process which created a specified ZNode has obtained a lock. If a lock is not obtained, return the path that we should watch awaiting its deletion. Otherwise, return null. This method is abstract as the logic for determining whether or not a lock is obtained depends on the type of lock being implemented.

Parameters:
myZNode The ZNode created by the process attempting to acquire a lock
children List of all child ZNodes under the lock's parent ZNode
Returns:
The path to watch, or null if myZNode can represent a correctly acquired lock.
  protected abstract String getLockPath(String myZNodeList<Stringchildren)
  throws IOException;
New to GrepCode? Check out our FAQ X