Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package org.apache.helix.manager.zk;
  
  /*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
  *   http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
 
 import java.util.List;
 
ZKClient does not provide some functionalities, this will be used for quick fixes if any bug found in ZKClient or if we need additional features but can't wait for the new ZkClient jar Ideally we should commit the changes we do here to ZKClient.
 
 
 public class ZkClient extends org.I0Itec.zkclient.ZkClient {
   private static Logger LOG = Logger.getLogger(ZkClient.class);
   public static final int DEFAULT_CONNECTION_TIMEOUT = 60 * 1000;
   public static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
   // public static String sessionId;
   // public static String sessionPassword;
 
 
   public ZkClient(IZkConnection connectionint connectionTimeout,
       PathBasedZkSerializer zkSerializer) {
     super(connectionconnectionTimeoutnew ByteArraySerializer());
      = zkSerializer;
     if (.isTraceEnabled()) {
       StackTraceElement[] calls = Thread.currentThread().getStackTrace();
       .trace("creating a zkclient. callstack: " + Arrays.asList(calls));
     }
   }
 
   public ZkClient(IZkConnection connectionint connectionTimeoutZkSerializer zkSerializer) {
     this(connectionconnectionTimeoutnew BasicZkSerializer(zkSerializer));
   }
 
   public ZkClient(IZkConnection connectionint connectionTimeout) {
     this(connectionconnectionTimeoutnew SerializableSerializer());
   }
 
   public ZkClient(IZkConnection connection) {
     this(connection.new SerializableSerializer());
   }
 
   public ZkClient(String zkServersint sessionTimeoutint connectionTimeout,
       ZkSerializer zkSerializer) {
     this(new ZkConnection(zkServerssessionTimeout), connectionTimeoutzkSerializer);
   }
 
   public ZkClient(String zkServersint sessionTimeoutint connectionTimeout,
       PathBasedZkSerializer zkSerializer) {
     this(new ZkConnection(zkServerssessionTimeout), connectionTimeoutzkSerializer);
   }
 
   public ZkClient(String zkServersint sessionTimeoutint connectionTimeout) {
     this(new ZkConnection(zkServerssessionTimeout), connectionTimeout,
         new SerializableSerializer());
   }
 
   public ZkClient(String zkServersint connectionTimeout) {
     this(new ZkConnection(zkServers), connectionTimeoutnew SerializableSerializer());
   }
 
   public ZkClient(String zkServers) {
    this(new ZkConnection(zkServers), .new SerializableSerializer());
  }
  {
  }
  public void setZkSerializer(ZkSerializer zkSerializer) {
     = new BasicZkSerializer(zkSerializer);
  }
  public void setZkSerializer(PathBasedZkSerializer zkSerializer) {
     = zkSerializer;
  }
    return ;
  }
  public void close() throws ZkInterruptedException {
    if (.isTraceEnabled()) {
      StackTraceElement[] calls = Thread.currentThread().getStackTrace();
      .trace("closing a zkclient. callStack: " + Arrays.asList(calls));
    }
    getEventLock().lock();
    try {
      if ( == null) {
        return;
      }
      .info("Closing zkclient: " + ((ZkConnection).getZookeeper());
      super.close();
    } catch (ZkInterruptedException e) {
      
HELIX-264: calling ZkClient#close() in its own eventThread context will throw ZkInterruptedException and skip ZkConnection#close()
      if ( != null) {
        try {
          
ZkInterruptedException#construct() honors InterruptedException by calling Thread.currentThread().interrupt(); clear it first, so we can safely close the zk-connection
          Thread.interrupted();
          .close();
           = null;

          
restore interrupted status of current thread
          Thread.currentThread().interrupt();
        } catch (InterruptedException e1) {
          throw new ZkInterruptedException(e1);
        }
      }
    } finally {
      getEventLock().unlock();
      .info("Closed zkclient");
    }
  }
  public Stat getStat(final String path) {
    long startT = System.nanoTime();
    try {
      Stat stat = retryUntilConnected(new Callable<Stat>() {
        @Override
        public Stat call() throws Exception {
          Stat stat = ((ZkConnection).getZookeeper().exists(pathfalse);
          return stat;
        }
      });
      return stat;
    } finally {
      long endT = System.nanoTime();
      if (.isTraceEnabled()) {
        .trace("exists, path: " + path + ", time: " + (endT - startT) + " ns");
      }
    }
  }
  // override exists(path, watch), so we can record all exists requests
  protected boolean exists(final String pathfinal boolean watch) {
    long startT = System.nanoTime();
    try {
      return retryUntilConnected(new Callable<Boolean>() {
        @Override
        public Boolean call() throws Exception {
          return .exists(pathwatch);
        }
      });
    } finally {
      long endT = System.nanoTime();
      if (.isTraceEnabled()) {
        .trace("exists, path: " + path + ", time: " + (endT - startT) + " ns");
      }
    }
  }
  // override getChildren(path, watch), so we can record all getChildren requests
  protected List<StringgetChildren(final String pathfinal boolean watch) {
    long startT = System.nanoTime();
    try {
      return retryUntilConnected(new Callable<List<String>>() {
        @Override
        public List<Stringcall() throws Exception {
          return .getChildren(pathwatch);
        }
      });
    } finally {
      long endT = System.nanoTime();
      if (.isTraceEnabled()) {
        .trace("getChildren, path: " + path + ", time: " + (endT - startT) + " ns");
      }
    }
  }
  @SuppressWarnings("unchecked")
  public <T extends Object> T deserialize(byte[] dataString path) {
    if (data == null) {
      return null;
    }
    return (T) .deserialize(datapath);
  }
  // override readData(path, stat, watch), so we can record all read requests
  @SuppressWarnings("unchecked")
  protected <T extends Object> T readData(final String pathfinal Stat statfinal boolean watch) {
    long startT = System.nanoTime();
    try {
      byte[] data = retryUntilConnected(new Callable<byte[]>() {
        @Override
        public byte[] call() throws Exception {
          return .readData(pathstatwatch);
        }
      });
      return (T) deserialize(datapath);
    } finally {
      long endT = System.nanoTime();
      if (.isTraceEnabled()) {
        .trace("getData, path: " + path + ", time: " + (endT - startT) + " ns");
      }
    }
  }
  @SuppressWarnings("unchecked")
  public <T extends Object> T readDataAndStat(String pathStat stat,
      boolean returnNullIfPathNotExists) {
    T data = null;
    try {
      data = (T) super.readData(pathstat);
    } catch (ZkNoNodeException e) {
      if (!returnNullIfPathNotExists) {
        throw e;
      }
    }
    return data;
  }
  public String getServers() {
    return .getServers();
  }
  public byte[] serialize(Object dataString path) {
    return .serialize(datapath);
  }
  public void writeData(final String pathObject datatfinal int expectedVersion) {
    long startT = System.nanoTime();
    try {
      final byte[] data = serialize(datatpath);
      retryUntilConnected(new Callable<Object>() {
        @Override
        public Object call() throws Exception {
          .writeData(pathdataexpectedVersion);
          return null;
        }
      });
    } finally {
      long endT = System.nanoTime();
      if (.isTraceEnabled()) {
        .trace("setData, path: " + path + ", time: " + (endT - startT) + " ns");
      }
    }
  }
  public Stat writeDataGetStat(final String pathObject datatfinal int expectedVersion)
      throws InterruptedException {
    long start = System.nanoTime();
    try {
      final byte[] bytes = .serialize(datatpath);
      return retryUntilConnected(new Callable<Stat>() {
        @Override
        public Stat call() throws Exception {
          return ((ZkConnection).getZookeeper().setData(pathbytesexpectedVersion);
        }
      });
    } finally {
      long end = System.nanoTime();
      if (.isTraceEnabled()) {
        .trace("setData, path: " + path + ", time: " + (end - start) + " ns");
      }
    }
  }
  public String create(final String pathObject datafinal CreateMode mode)
    if (path == null) {
      throw new NullPointerException("path must not be null.");
    }
    long startT = System.nanoTime();
    try {
      final byte[] bytes = data == null ? null : serialize(datapath);
      return retryUntilConnected(new Callable<String>() {
        @Override
        public String call() throws Exception {
          return .create(pathbytesmode);
        }
      });
    } finally {
      long endT = System.nanoTime();
      if (.isTraceEnabled()) {
        .trace("create, path: " + path + ", time: " + (endT - startT) + " ns");
      }
    }
  }
  public boolean delete(final String path) {
    return this.delete(path, -1);
  }
  public boolean delete(final String pathfinal int version) {
    long startT = System.nanoTime();
    try {
      try {
        retryUntilConnected(new Callable<Object>() {
          @Override
          public Object call() throws Exception {
            ZkConnection connection = (ZkConnection;
            connection.getZookeeper().delete(pathversion);
            return null;
          }
        });
        return true;
      } catch (ZkNoNodeException e) {
        return false;
      }
    } finally {
      long endT = System.nanoTime();
      if (.isTraceEnabled()) {
        .trace("delete, path: " + path + ", time: " + (endT - startT) + " ns");
      }
    }
  }
  public void asyncCreate(final String pathObject datatfinal CreateMode mode,
      final CreateCallbackHandler cb) {
    final byte[] data = (datat == null ? null : serialize(datatpath));
      @Override
      public Object call() throws Exception {
        ((ZkConnection).getZookeeper().create(pathdata.// Arrays.asList(DEFAULT_ACL),
            modecbnull);
        return null;
      }
    });
  }
  public void asyncSetData(final String pathObject datatfinal int version,
      final SetDataCallbackHandler cb) {
    final byte[] data = serialize(datatpath);
      @Override
      public Object call() throws Exception {
        ((ZkConnection).getZookeeper().setData(pathdataversioncbnull);
        return null;
      }
    });
  }
  public void asyncGetData(final String pathfinal GetDataCallbackHandler cb) {
      @Override
      public Object call() throws Exception {
        ((ZkConnection).getZookeeper().getData(pathnullcbnull);
        return null;
      }
    });
  }
  public void asyncExists(final String pathfinal ExistsCallbackHandler cb) {
      @Override
      public Object call() throws Exception {
        ((ZkConnection).getZookeeper().exists(pathnullcbnull);
        return null;
      }
    });
  }
  public void asyncDelete(final String pathfinal DeleteCallbackHandler cb) {
      @Override
      public Object call() throws Exception {
        ((ZkConnection).getZookeeper().delete(path, -1, cbnull);
        return null;
      }
    });
  }
New to GrepCode? Check out our FAQ X