Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package me.prettyprint.cassandra.connection;
  
  import java.util.HashSet;
  import java.util.List;
  import java.util.Set;
  
 
 
 public class HConnectionManager {
 
   private static final Logger log = LoggerFactory.getLogger(HConnectionManager.class);
   private static final Logger perf4jLogger =
     LoggerFactory.getLogger("me.prettyprint.cassandra.hector.TimingLogger");
 
 
   private final ClockResolution clock;
 
 
 
   public HConnectionManager(CassandraHostConfigurator cassandraHostConfigurator) {
      = cassandraHostConfigurator.getLoadBalancingPolicy();
      = cassandraHostConfigurator.getClockResolution();
     if ( cassandraHostConfigurator.getRetryDownedHosts() ) {
        = new CassandraHostRetryService(thiscassandraHostConfigurator);
     }
     for ( CassandraHost host : cassandraHostConfigurator.buildCassandraHosts() ) {
       try {
         ConcurrentHClientPool chcp = new ConcurrentHClientPool(host);
         .put(host,chcp);
       } catch (HectorTransportException hte) {
         .error("Could not start connection pool for host {}"host);
         if (  != null ) {
           .add(host);
         }
       }
     }
 
     if ( cassandraHostConfigurator.getAutoDiscoverHosts() ) {
        = new NodeAutoDiscoverService(thiscassandraHostConfigurator);
     }
      = JmxMonitor.getInstance(this).getCassandraMonitor();
   }

  
Returns true if the host was successfully added. In any sort of failure exceptions are caught and logged, returning false.

Parameters:
cassandraHost
Returns:
 
   public boolean addCassandraHost(CassandraHost cassandraHost) {
     if ( !getHosts().contains(cassandraHost) ) {
       ConcurrentHClientPool pool = null;
       try {
         pool = new ConcurrentHClientPool(cassandraHost);
         .putIfAbsent(cassandraHostpool);
         .info("Added host {} to pool"cassandraHost.getName());
         return true;
       } catch (HectorTransportException hte) {
         .error("Transport exception host to HConnectionManager: " + cassandraHosthte);
       } catch (Exception ex) {
         .error("General exception host to HConnectionManager: " + cassandraHostex);
       }
     } else {
       .info("Host already existed for pool {}"cassandraHost.getName());
    }
    return false;
  }

  
Remove the me.prettyprint.cassandra.service.CassandraHost from the pool, bypassing retry service. This would be called on a host that is known to be going away. Gracefully shuts down the underlying connections via ConcurrentHClientPool.shutdown()

Parameters:
cassandraHost
  public void removeCassandraHost(CassandraHost cassandraHost) {
    boolean removed = getHosts().contains(cassandraHost);
    if ( removed ) {
      ConcurrentHClientPool pool = .remove(cassandraHost);
      if ( pool != null ) {
        pool.shutdown();
      } else {
        removed = false;
        .info("removeCassandraHost attempt miss for CassandraHost {} May have been beaten by another thread?"cassandraHost);
      }
    }
    .info("Remove status for CassandraHost pool {} was {}"cassandraHostremoved);
  }
  public Set<CassandraHostgetHosts() {
    return Collections.unmodifiableSet(.keySet());
  }
  public List<StringgetStatusPerPool() {
    List<Stringstats = new ArrayList<String>();
    for (ConcurrentHClientPool clientPool : .values()) {
        stats.add(clientPool.getStatusAsString());
    }
    return stats;
  }
  public void operateWithFailover(Operation<?> opthrows HectorException {
    final StopWatch stopWatch = new Slf4JStopWatch();
    int retries = Math.min(op.failoverPolicy.numRetries.size());
    HThriftClient client = null;
    boolean success = false;
    boolean retryable = false;
    Set<CassandraHostexcludeHosts = new HashSet<CassandraHost>();
    // TODO start timer for limiting retry time spent
    while ( !success ) {
      try {
        // TODO how to 'timeout' on this op when underlying pool is exhausted
        client =  getClientFromLBPolicy(excludeHosts);
        Cassandra.Client c = client.getCassandra(op.keyspaceName);
        // Keyspace can be null for some system_* api calls
        if ( !op.credentials.isEmpty() ) {
          c.login(new AuthenticationRequest(op.credentials));
        }
        op.executeAndSetResult(cclient.cassandraHost);
        success = true;
        stopWatch.stop(op.stopWatchTagName + ".success_");
        break;
      } catch (Exception ex) {
        if ( he instanceof HInvalidRequestException || he instanceof HCassandraInternalException ) {
          throw he;
        } else if ( he instanceof HectorTransportException) {
          --retries;
          client.close();
          markHostAsDown(client);
          excludeHosts.add(client.cassandraHost);
          retryable = true;
          if ( retries > 0 ) {
          }
        } else if (he instanceof HTimedOutException || he instanceof HUnavailableException ) {
          // DO NOT drecrement retries, we will be keep retrying on timeouts until it comes back
          retryable = true;
          client.close();
          // TODO timecheck on how long we've been waiting on timeouts here
          // suggestion per user moores on hector-users
        } else if ( he instanceof PoolExhaustedException ) {
          retryable = true;
          --retries;
          if ( .size() == 1 ) {
            throw he;
          }
          excludeHosts.add(client.cassandraHost);
        }
        if ( retries <= 0 || retryable == falsethrow he;
        .error("Could not fullfill request on this host {}"client);
        .error("Exception: "he);
        sleepBetweenHostSkips(op.failoverPolicy);
      } finally {
        if ( !success ) {
          .incCounter(op.failCounter);
          stopWatch.stop(op.stopWatchTagName + ".fail_");
        }
        releaseClient(client);
      }
    }
  }

  
Sleeps for the specified time as determined by sleepBetweenHostsMilli. In many cases failing over to other hosts is done b/c the cluster is too busy, so the sleep b/w hosts may help reduce load on the cluster.
    private void sleepBetweenHostSkips(FailoverPolicy failoverPolicy) {
      if (failoverPolicy.sleepBetweenHostsMilli > 0) {
        if ( .isDebugEnabled() ) {
          .debug("Will sleep for {} millisec"failoverPolicy.sleepBetweenHostsMilli);
        }
        try {
          Thread.sleep(failoverPolicy.sleepBetweenHostsMilli);
        } catch (InterruptedException e) {
          .warn("Sleep between hosts interrupted"e);
        }
      }
    }
  private HThriftClient getClientFromLBPolicy(Set<CassandraHostexcludeHosts) {
    HThriftClient client;
    if ( .isEmpty() ) {
      throw new HectorException("All host pools marked down. Retry burden pushed out to client.");
    }
    try {
      client = .getPool(.values(), excludeHosts).borrowClient();
    } catch (Exception e) {
      throw new HectorException("General exception in getClientFromLBPolicy",e);
    }
    return client;
  }
  void releaseClient(HThriftClient client) {
    if ( client == null ) return;
    ConcurrentHClientPool pool = .get(client.cassandraHost);
    if ( pool != null ) {
      pool.releaseClient(client);
    } else {
      .info("Client {} released to inactive or dead pool. Closing."client);
      client.close();
    }
  }
    return getClientFromLBPolicy(null);
  }
  void markHostAsDown(HThriftClient client) {
    .error("MARK HOST AS DOWN TRIGGERED for host {}"client.cassandraHost.getName());
    ConcurrentHClientPool pool = .remove(client.cassandraHost);
    if ( pool != null ) {
      .error("Pool state on shutdown: {}"pool.getStatusAsString());
      pool.shutdown();
      .add(client.cassandraHost);
    }
    client.close();
  }
  }
    return Collections.unmodifiableCollection(.values());
  }
  public long createClock() {
    return this..createClock();
  }
  public void shutdown() {
    .info("Shutdown called on HConnectionManager");
    if (  != null )
    if (  != null )
    for (ConcurrentHClientPool pool : .values()) {
      try {
        pool.shutdown();
      } catch (IllegalArgumentException iae) {
        .error("Out of order in HConnectionManager shutdown()?: {}"iae.getMessage());
      }
    }
  }
New to GrepCode? Check out our FAQ X