Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   *
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
   * regarding copyright ownership.  The ASF licenses this file
   * to you under the Apache License, Version 2.0 (the
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.apache.hadoop.hbase.zookeeper;
 
 import java.io.File;
 import java.io.Reader;
 import java.util.List;
 
TODO: Most of the code in this class is ripped from ZooKeeper tests. Instead of redoing it, we should contribute updates to their code which let us more easily access testing helper objects.
 
 public class MiniZooKeeperCluster {
   private static final Log LOG = LogFactory.getLog(MiniZooKeeperCluster.class);
 
   private static final int TICK_TIME = 2000;
   private static final int CONNECTION_TIMEOUT = 30000;
 
   private boolean started;

  
The default port. If zero, we use a random port.
 
   private int defaultClientPort = 0;
 
   private int clientPort;
 
   private List<IntegerclientPortList;
 
   private int activeZKServerIndex;
   private int tickTime = 0;
 
 
   public MiniZooKeeperCluster() {
     this(new Configuration());
   }
 
   public MiniZooKeeperCluster(Configuration configuration) {
     this. = false;
     this. = configuration;
      = -1;
      = new ArrayList<Integer>();
   }
 
   public void setDefaultClientPort(int clientPort) {
     if (clientPort <= 0) {
       throw new IllegalArgumentException("Invalid default ZK client port: "
           + clientPort);
     }
     this. = clientPort;
   }

  
Selects a ZK client port. Returns the default port if specified. Otherwise, returns a random port. The random port is selected from the range between 49152 to 65535. These ports cannot be registered with IANA and are intended for dynamic allocation (see http://bit.ly/dynports).
  private int selectClientPort() {
    if ( > 0) {
      return ;
    }
    return 0xc000 + new Random().nextInt(0x3f00);
  }
  public void setTickTime(int tickTime) {
    this. = tickTime;
  }
  public int getBackupZooKeeperServerNum() {
    return .size()-1;
  }
  public int getZooKeeperServerNum() {
    return .size();
  }
  // / XXX: From o.a.zk.t.ClientBase
  private static void setupTestEnv() {
    // during the tests we run with 100K prealloc in the logs.
    // on windows systems prealloc of 64M was seen to take ~15seconds
    // resulting in test failure (client timeout on first session).
    // set env and directly in order to handle static init/gc issues
    System.setProperty("zookeeper.preAllocSize""100");
    FileTxnLog.setPreallocSize(100 * 1024);
  }
  public int startup(File baseDirthrows IOExceptionInterruptedException {
    return startup(baseDir,1);
  }

  

Parameters:
baseDir
numZooKeeperServers
Returns:
ClientPort server bound to, -1 if there was a binding problem and we couldn't pick another port.
Throws:
java.io.IOException
java.lang.InterruptedException
  public int startup(File baseDirint numZooKeeperServersthrows IOException,
    if (numZooKeeperServers <= 0)
      return -1;
    setupTestEnv();
    shutdown();
    int tentativePort = selectClientPort();
    // running all the ZK servers
    for (int i = 0; i < numZooKeeperServersi++) {
      File dir = new File(baseDir"zookeeper_"+i).getAbsoluteFile();
      createDir(dir);
      int tickTimeToUse;
      if (this. > 0) {
        tickTimeToUse = this.;
      } else {
        tickTimeToUse = ;
      }
      ZooKeeperServer server = new ZooKeeperServer(dirdirtickTimeToUse);
      NIOServerCnxnFactory standaloneServerFactory;
      while (true) {
        try {
          standaloneServerFactory = new NIOServerCnxnFactory();
          standaloneServerFactory.configure(
            new InetSocketAddress(tentativePort),
              1000));
        } catch (BindException e) {
          .debug("Failed binding ZK Server to client port: " +
              tentativePorte);
          // We're told to use some port but it's occupied, fail
          if ( > 0) return -1;
          // This port is already in use, try to use another.
          tentativePort = selectClientPort();
          continue;
        }
        break;
      }
      // Start up this ZK server
      standaloneServerFactory.startup(server);
      if (!waitForServerUp(tentativePort)) {
        throw new IOException("Waiting for startup of standalone server");
      }
      // We have selected this port as a client port.
      .add(tentativePort);
      .add(standaloneServerFactory);
      .add(server);
      tentativePort++; //for the next server
    }
    // set the first one to be active ZK; Others are backups
     = 0;
     = true;
    .info("Started MiniZK Cluster and connect 1 ZK server " +
        "on client port: " + );
    return ;
  }
  private void createDir(File dirthrows IOException {
    try {
      if (!dir.exists()) {
        dir.mkdirs();
      }
    } catch (SecurityException e) {
      throw new IOException("creating dir: " + dire);
    }
  }

  
  public void shutdown() throws IOException {
    if (!) {
      return;
    }
    // shut down all the zk servers
    for (int i = 0; i < .size(); i++) {
      NIOServerCnxnFactory standaloneServerFactory =
      int clientPort = .get(i);
      standaloneServerFactory.shutdown();
      if (!waitForServerDown(clientPort)) {
        throw new IOException("Waiting for shutdown of standalone server");
      }
    }
    for (ZooKeeperServer zkServer) {
      //explicitly close ZKDatabase since ZookeeperServer does not close them
      zkServer.getZKDatabase().close();
    }
    // clear everything
     = false;
     = 0;
    .info("Shutdown MiniZK cluster with all ZK servers");
  }

  

Returns:
clientPort return clientPort if there is another ZK backup can run when killing the current active; return -1, if there is no backups.
Throws:
java.io.IOException
java.lang.InterruptedException
                                        InterruptedException {
    if (! ||  < 0 ) {
      return -1;
    }
    // Shutdown the current active one
    NIOServerCnxnFactory standaloneServerFactory =
    int clientPort = .get();
    standaloneServerFactory.shutdown();
    if (!waitForServerDown(clientPort)) {
      throw new IOException("Waiting for shutdown of standalone server");
    }
    // remove the current active zk server
    .info("Kill the current active ZK servers in the cluster " +
        "on client port: " + clientPort);
    if (.size() == 0) {
      // there is no backup servers;
      return -1;
    }
    clientPort = .get();
    .info("Activate a backup zk server in the cluster " +
        "on client port: " + clientPort);
    // return the next back zk server's port
    return clientPort;
  }

  
Kill one back up ZK servers

  public void killOneBackupZooKeeperServer() throws IOException,
                                        InterruptedException {
    if (! ||  < 0 ||
        .size() <= 1) {
      return ;
    }
    int backupZKServerIndex = +1;
    // Shutdown the current active one
    NIOServerCnxnFactory standaloneServerFactory =
      .get(backupZKServerIndex);
    int clientPort = .get(backupZKServerIndex);
    standaloneServerFactory.shutdown();
    if (!waitForServerDown(clientPort)) {
      throw new IOException("Waiting for shutdown of standalone server");
    }
    .get(backupZKServerIndex).getZKDatabase().close();
    // remove this backup zk server
    .remove(backupZKServerIndex);
    .remove(backupZKServerIndex);
    .remove(backupZKServerIndex);
    .info("Kill one backup ZK servers in the cluster " +
        "on client port: " + clientPort);
  }
  // XXX: From o.a.zk.t.ClientBase
  private static boolean waitForServerDown(int portlong timeout) {
    long start = System.currentTimeMillis();
    while (true) {
      try {
        Socket sock = new Socket("localhost"port);
        try {
          OutputStream outstream = sock.getOutputStream();
          outstream.write("stat".getBytes());
          outstream.flush();
        } finally {
          sock.close();
        }
      } catch (IOException e) {
        return true;
      }
      if (System.currentTimeMillis() > start + timeout) {
        break;
      }
      try {
        Thread.sleep(250);
      } catch (InterruptedException e) {
        // ignore
      }
    }
    return false;
  }
  // XXX: From o.a.zk.t.ClientBase
  private static boolean waitForServerUp(int portlong timeout) {
    long start = System.currentTimeMillis();
    while (true) {
      try {
        Socket sock = new Socket("localhost"port);
        BufferedReader reader = null;
        try {
          OutputStream outstream = sock.getOutputStream();
          outstream.write("stat".getBytes());
          outstream.flush();
          Reader isr = new InputStreamReader(sock.getInputStream());
          reader = new BufferedReader(isr);
          String line = reader.readLine();
          if (line != null && line.startsWith("Zookeeper version:")) {
            return true;
          }
        } finally {
          sock.close();
          if (reader != null) {
            reader.close();
          }
        }
      } catch (IOException e) {
        // ignore as this is expected
        .info("server localhost:" + port + " not up " + e);
      }
      if (System.currentTimeMillis() > start + timeout) {
        break;
      }
      try {
        Thread.sleep(250);
      } catch (InterruptedException e) {
        // ignore
      }
    }
    return false;
  }
  public int getClientPort() {
    return ;
  }
New to GrepCode? Check out our FAQ X