Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package org.apache.solr.cloud;
  
  /*
   * Licensed to the Apache Software Foundation (ASF) under one or more
   * contributor license agreements.  See the NOTICE file distributed with
   * this work for additional information regarding copyright ownership.
   * The ASF licenses this file to You under the Apache License, Version 2.0
   * (the "License"); you may not use this file except in compliance with
   * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
 import java.util.List;
 import java.util.Map;
 
 
The monkey can stop random or specific jetties used with SolrCloud. It can also run in a background thread and start and stop jetties randomly. TODO: expire multiple sessions / connectionloss at once TODO: kill multiple jetties at once TODO: ? add random headhunter mode that always kills the leader TODO: chaosmonkey should be able to do cluster stop/start tests
 
 public class ChaosMonkey {
   private static Logger log = LoggerFactory.getLogger(ChaosMonkey.class);
   
   private static final int CONLOSS_PERCENT = 10; // 0 - 10 = 0 - 100%
   private static final int EXPIRE_PERCENT = 10; // 0 - 10 = 0 - 100%
   
   private static final boolean CONN_LOSS = Boolean.valueOf(System.getProperty("solr.tests.cloud.cm.connloss""true"));
   private static final boolean EXP = Boolean.valueOf(System.getProperty("solr.tests.cloud.cm.exp""true"));
   
   private ZkTestServer zkServer;
   private String collection;
   private volatile boolean stop = false;
   private AtomicInteger stops = new AtomicInteger();
   private AtomicInteger starts = new AtomicInteger();
   private AtomicInteger expires = new AtomicInteger();
   private AtomicInteger connloss = new AtomicInteger();
   
   private boolean expireSessions;
   private boolean causeConnectionLoss;
   private boolean aggressivelyKillLeaders;
   private volatile long startTime;
   
 
   private Thread monkeyThread;
   
   public ChaosMonkey(ZkTestServer zkServerZkStateReader zkStateReader,
       String collectionMap<String,List<CloudJettyRunner>> shardToJetty,
       Map<String,CloudJettyRunnershardToLeaderJetty) {
     this. = shardToJetty;
     this. = shardToLeaderJetty;
     this. = zkServer;
     this. = zkStateReader;
     this. = collection;
     Random random = LuceneTestCase.random();
      = //= random.nextBoolean();
     
      = ;//= random.nextBoolean();
     monkeyLog("init - expire sessions:" + 
         + " cause connection loss:" + );
   }
  
  // TODO: expire all clients at once?
  public void expireSession(final JettySolrRunner jetty) {
    monkeyLog("expire session for " + jetty.getLocalPort() + " !");
    
    SolrDispatchFilter solrDispatchFilter = (SolrDispatchFilterjetty
        .getDispatchFilter().getFilter();
    if (solrDispatchFilter != null) {
      CoreContainer cores = solrDispatchFilter.getCores();
      if (cores != null) {
        causeConnectionLoss(jetty);
      }
    }
    
//    Thread thread = new Thread() {
//      {
//        setDaemon(true);
//      }
//      public void run() {
//        SolrDispatchFilter solrDispatchFilter = (SolrDispatchFilter) jetty.getDispatchFilter().getFilter();
//        if (solrDispatchFilter != null) {
//          CoreContainer cores = solrDispatchFilter.getCores();
//          if (cores != null) {
//            try {
//              Thread.sleep(ZkTestServer.TICK_TIME * 2 + 800);
//            } catch (InterruptedException e) {
//              // we act as only connection loss
//              return;
//            }
//            long sessionId = cores.getZkController().getZkClient().getSolrZooKeeper().getSessionId();
//            zkServer.expire(sessionId);
//          }
//        }
//      }
//    };
//    thread.start();
  }
  
    String sliceName = getRandomSlice();
    
    if (jetty != null) {
      expireSession(jetty.jetty);
    }
  }
  
    monkeyLog("cause connection loss!");
    
    String sliceName = getRandomSlice();
    if (jetty != null) {
      causeConnectionLoss(jetty.jetty);
    }
  }
  
  public static void causeConnectionLoss(JettySolrRunner jetty) {
    SolrDispatchFilter solrDispatchFilter = (SolrDispatchFilterjetty
        .getDispatchFilter().getFilter();
    if (solrDispatchFilter != null) {
      CoreContainer cores = solrDispatchFilter.getCores();
      if (cores != null) {
        SolrZkClient zkClient = cores.getZkController().getZkClient();
        zkClient.getSolrZooKeeper().closeCnxn();
      }
    }
  }
  public CloudJettyRunner stopShard(String sliceint indexthrows Exception {
    CloudJettyRunner cjetty = .get(slice).get(index);
    stopJetty(cjetty);
    return cjetty;
  }
  public void stopJetty(CloudJettyRunner cjettythrows Exception {
    stop(cjetty.jetty);
  }
  public void killJetty(CloudJettyRunner cjettythrows Exception {
    kill(cjetty);
  }
  
  public void stopJetty(JettySolrRunner jettythrows Exception {
    stopJettySolrRunner(jetty);
  }
  
  private static void stopJettySolrRunner(JettySolrRunner jettythrows Exception {
    assert(jetty != null);
    monkeyLog("stop shard! " + jetty.getLocalPort());
    // get a clean shutdown so that no dirs are left open...
    FilterHolder fh = jetty.getDispatchFilter();
    if (fh != null) {
      SolrDispatchFilter sdf = (SolrDispatchFilterfh.getFilter();
      if (sdf != null) {
        sdf.destroy();
      }
    }
    jetty.stop();
    
    if (!jetty.isStopped()) {
      throw new RuntimeException("could not stop jetty");
    }
  }
  
  public static void kill(CloudJettyRunner cjettythrows Exception {
    JettySolrRunner jetty = cjetty.jetty;
    monkeyLog("kill shard! " + jetty.getLocalPort());
    
    jetty.stop();
    
    stop(jetty);
    
    if (!jetty.isStopped()) {
      throw new RuntimeException("could not kill jetty");
    }
  }
  
  public void stopShard(String slicethrows Exception {
    List<CloudJettyRunnerjetties = .get(slice);
    for (CloudJettyRunner jetty : jetties) {
      stopJetty(jetty);
    }
  }
  
  public void stopShardExcept(String sliceString shardNamethrows Exception {
    List<CloudJettyRunnerjetties = .get(slice);
    for (CloudJettyRunner jetty : jetties) {
      if (!jetty.nodeName.equals(shardName)) {
        stopJetty(jetty);
      }
    }
  }
  
  public JettySolrRunner getShard(String sliceint indexthrows Exception {
    JettySolrRunner jetty = .get(slice).get(index).;
    return jetty;
  }
  
  public CloudJettyRunner stopRandomShard() throws Exception {
    String sliceName = getRandomSlice();
    
    return stopRandomShard(sliceName);
  }
  
  public CloudJettyRunner stopRandomShard(String slicethrows Exception {
    if (cjetty != null) {
      stopJetty(cjetty);
    }
    return cjetty;
  }
  
  
  public CloudJettyRunner killRandomShard() throws Exception {
    // add all the shards to a list
    String sliceName = getRandomSlice();
    
    return killRandomShard(sliceName);
  }
  private String getRandomSlice() {
    
    List<StringsliceKeyList = new ArrayList<String>(slices.size());
    sliceKeyList.addAll(slices.keySet());
    String sliceName = sliceKeyList.get(LuceneTestCase.random().nextInt(sliceKeyList.size()));
    return sliceName;
  }
  
  public CloudJettyRunner killRandomShard(String slicethrows Exception {
    if (cjetty != null) {
      killJetty(cjetty);
    }
    return cjetty;
  }
  
  public CloudJettyRunner getRandomJetty(String sliceboolean aggressivelyKillLeadersthrows KeeperExceptionInterruptedException {
    
    int numActive = 0;
    
    numActive = checkIfKillIsLegal(slicenumActive);
    
    // TODO: stale state makes this a tough call
    if (numActive < 2) {
      // we cannot kill anyone
      monkeyLog("only one active node in shard - monkey cannot kill :(");
      return null;
    }
    
    // let's check the deadpool count
    int numRunning = 0;
    for (CloudJettyRunner cjetty : .get(slice)) {
      if (!.contains(cjetty)) {
        numRunning++;
      }
    }
    
    if (numRunning < 2) {
      // we cannot kill anyone
      monkeyLog("only one active node in shard - monkey cannot kill :(");
      return null;
    }
    
    Random random = LuceneTestCase.random();
    int chance = random.nextInt(10);
    CloudJettyRunner cjetty;
    if (chance <= 5 && aggressivelyKillLeaders) {
      // if killLeader, really aggressively go after leaders
      cjetty = .get(slice);
    } else {
      // get random shard
      List<CloudJettyRunnerjetties = .get(slice);
      int index = random.nextInt(jetties.size());
      cjetty = jetties.get(index);
      
      ZkNodeProps leader = null;
      try {
        leader = .getLeaderRetry(slice);
      } catch (Throwable t) {
        .error("Could not get leader"t);
        return null;
      }
      
      FilterHolder fh = cjetty.jetty.getDispatchFilter();
      if (fh == null) {
        monkeyLog("selected jetty not running correctly - skip");
        return null;
      }
      SolrDispatchFilter df = ((SolrDispatchFilterfh.getFilter());
      if (df == null) {
        monkeyLog("selected jetty not running correctly - skip");
        return null;
      }
      CoreContainer cores = df.getCores();
      if (cores == null) {
        monkeyLog("selected jetty not running correctly - skip");
        return null;
      }
      SolrCore core = cores.getCore(leader.getStr(.));
      if (core == null) {
        monkeyLog("selected jetty not running correctly - skip");
        return null;
      }
      // cluster state can be stale - also go by our 'near real-time' is leader prop
      boolean rtIsLeader;
      try {
        rtIsLeader = core.getCoreDescriptor().getCloudDescriptor().isLeader();
      } finally {
        core.close();
      }
      
      boolean isLeader = leader.getStr(.).equals(jetties.get(index).)
          || rtIsLeader;
      if (!aggressivelyKillLeaders && isLeader) {
        // we don't kill leaders...
        monkeyLog("abort! I don't kill leaders");
        return null;
      } 
    }
    if (cjetty.jetty.getLocalPort() == -1) {
      // we can't kill the dead
      monkeyLog("abort! This guy is already dead");
      return null;
    }
    
    //System.out.println("num active:" + numActive + " for " + slice + " sac:" + jetty.getLocalPort());
    monkeyLog("chose a victim! " + cjetty.jetty.getLocalPort());
  
    return cjetty;
  }
  private int checkIfKillIsLegal(String sliceint numActive)
      throws KeeperExceptionInterruptedException {
    for (CloudJettyRunner cloudJetty : .get(slice)) {
      
      // get latest cloud state
      
          .get(slice);
      
      ZkNodeProps props = theShards.getReplicasMap().get(cloudJetty.coreNodeName);
      if (props == null) {
        throw new RuntimeException("shard name " + cloudJetty.coreNodeName + " not found in " + theShards.getReplicasMap().keySet());
      }
      
      String state = props.getStr(.);
      String nodeName = props.getStr(.);
      
      if (cloudJetty.jetty.isRunning()
          && state.equals(.)
          && .getClusterState().liveNodesContain(nodeName)) {
        numActive++;
      }
    }
    return numActive;
  }
  
    // get latest cloud state
    // get random shard
    List<SolrServerclients = .get(slice);
    int index = LuceneTestCase.random().nextInt(clients.size() - 1);
    SolrServer client = clients.get(index);
    return client;
  }
  
  // synchronously starts and stops shards randomly, unless there is only one
  // active shard up for a slice or if there is one active and others recovering
  public void startTheMonkey(boolean killLeadersfinal int roundPauseUpperLimit) {
    monkeyLog("starting");
    this. = killLeaders;
     = System.currentTimeMillis();
    // TODO: when kill leaders is on, lets kill a higher percentage of leaders
    
     = false;
     = new Thread() {
      @Override
      public void run() {
        while (!) {
          try {
    
            Random random = LuceneTestCase.random();
            Thread.sleep(random.nextInt(roundPauseUpperLimit));
            if (random.nextBoolean()) {
             if (!.isEmpty()) {
               int index = random.nextInt(.size());
               JettySolrRunner jetty = .get(index).;
               if (!ChaosMonkey.start(jetty)) {
                 continue;
               }
               //System.out.println("started on port:" + jetty.getLocalPort());
               .remove(index);
               .incrementAndGet();
               continue;
             }
            }
            
            int rnd = random.nextInt(10);
            if ( && rnd < ) {
              expireRandomSession();
            } 
            
            if ( && rnd < ) {
              randomConnectionLoss();
            }
            
            CloudJettyRunner cjetty;
            if (random.nextBoolean()) {
              cjetty = stopRandomShard();
            } else {
              cjetty = killRandomShard();
            }
            if (cjetty == null) {
              // we cannot kill
            } else {
              .add(cjetty);
            }
            
          } catch (InterruptedException e) {
            //
          } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
          }
        }
        monkeyLog("finished");
        monkeyLog("I ran for " + (System.currentTimeMillis() - )/1000.0f + "sec. I stopped " +  + " and I started " + 
            + ". I also expired " + .get() + " and caused " + 
            + " connection losses");
      }
    };
  }
  
  public static void monkeyLog(String msg) {
    .info("monkey: " + msg);
  }
  
  public void stopTheMonkey() {
     = true;
    try {
      .join();
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }
  public int getStarts() {
    return .get();
  }
  public static void stop(List<JettySolrRunnerjettysthrows Exception {
    for (JettySolrRunner jetty : jettys) {
      stop(jetty);
    }
  }
  
  public static void stop(JettySolrRunner jettythrows Exception {
    stopJettySolrRunner(jetty);
  }
  
  public static void start(List<JettySolrRunnerjettysthrows Exception {
    for (JettySolrRunner jetty : jettys) {
      start(jetty);
    }
  }
  
  public static boolean start(JettySolrRunner jettythrows Exception {
    try {
      jetty.start();
    } catch (Exception e) {
      jetty.stop();
      Thread.sleep(3000);
      try {
        jetty.start();
      } catch (Exception e2) {
        jetty.stop();
        Thread.sleep(10000);
        try {
          jetty.start();
        } catch (Exception e3) {
          jetty.stop();
          Thread.sleep(30000);
          try {
            jetty.start();
          } catch (Exception e4) {
            .error("Could not get the port to start jetty again"e4);
            // we coud not get the port
            jetty.stop();
            return false;
          }
        }
      }
    }
    return true;
  }
New to GrepCode? Check out our FAQ X