Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
   /*
    *      Copyright (C) 2012 DataStax Inc.
    *
    *   Licensed under the Apache License, Version 2.0 (the "License");
    *   you may not use this file except in compliance with the License.
    *   You may obtain a copy of the License at
    *
    *      http://www.apache.org/licenses/LICENSE-2.0
    *
   *   Unless required by applicable law or agreed to in writing, software
   *   distributed under the License is distributed on an "AS IS" BASIS,
   *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *   See the License for the specific language governing permissions and
   *   limitations under the License.
   */
  package com.datastax.driver.core;
  
  import java.util.List;
  import java.util.Set;
  
  import org.slf4j.Logger;
  
information and known state of a Cassandra cluster.

This is the main entry point of the driver. A simple example of access to a Cassandra cluster would be:

   Cluster cluster = Cluster.builder().addContactPoint("192.168.0.1").build();
   Session session = cluster.connect("db1");

   for (Row row : session.execute("SELECT * FROM table1"))
       // do something ...
 

A cluster object maintains a permanent connection to one of the cluster nodes which it uses solely to maintain information on the state and current topology of the cluster. Using the connection, the driver will discover all the nodes currently in the cluster as well as new nodes joining the cluster subsequently.

  
  public class Cluster {
  
      private static final Logger logger = LoggerFactory.getLogger(Cluster.class);
  
      private static final int DEFAULT_THREAD_KEEP_ALIVE = 30;
  
      final Manager manager;

    
Constructs a new Cluster instance.

This constructor is mainly exposed so Cluster can be sub-classed as a mean to make testing/mocking easier or to "intecept" it's method call. Most users shouldn't extend this class however and should prefer either using the builder() or calling buildFrom(com.datastax.driver.core.Cluster.Initializer) with a custom Initializer.

Parameters:
contactPoints the list of contact points to use for the new cluster.
configuration the configuration for the new cluster.
init whether or not initialization should be perform by this constructor. Passing false is equivalent to using Cluster.Builder.withDeferredInitialization() on a Cluster.Builder.
  
      protected Cluster(List<InetAddresscontactPointsConfiguration configurationboolean init) {
          // Note: we don't want to make init part of Configuration. In 2.0, the default is not init
          // so there is not point in breaking Configuration API for that. However, as a workaround
          // until upgrade, we still want to allow optional lazy initialization of the control
          // connection (see #JAVA-161)
          this. = new Manager(contactPointsconfiguration);
          if (init)
              this..init();
      }

    
Build a new cluster based on the provided initializer.

Note that for building a cluster programmatically, Cluster.Builder provides a slightly less verbose shortcut with Cluster.Builder.build().

Also note that that all the contact points provided by initializer must share the same port.

Parameters:
initializer the Cluster.Initializer to use
Returns:
the newly created Cluster instance
Throws:
com.datastax.driver.core.exceptions.NoHostAvailableException if no host amongst the contact points can be reached.
java.lang.IllegalArgumentException if the list of contact points provided by initializer is empty or if not all those contact points have the same port.
com.datastax.driver.core.exceptions.AuthenticationException if an authentication error occurs while contacting the initial contact points.
 
     public static Cluster buildFrom(Initializer initializer) {
         List<InetAddresscontactPoints = initializer.getContactPoints();
         if (contactPoints.isEmpty())
             throw new IllegalArgumentException("Cannot build a cluster without contact points");
 
         return new Cluster(contactPointsinitializer.getConfiguration(), true);
     }

    
Creates a new Cluster.Builder instance.

This is a convenenience method for new Cluster.Builder().

Returns:
the new cluster builder.
 
     public static Cluster.Builder builder() {
         return new Cluster.Builder();
     }

    
Creates a new session on this cluster.

Returns:
a new session on this cluster sets to no keyspace.
 
     public Session connect() {
         .init(); // Calls init if deferInitialization was used. It's a no-op if it's already initialized.
         return .newSession();
     }

    
Creates a new session on this cluster and sets the keyspace to the provided one.

Parameters:
keyspace The name of the keyspace to use for the created Session.
Returns:
a new session on this cluster sets to keyspace keyspaceName.
Throws:
com.datastax.driver.core.exceptions.NoHostAvailableException if no host can be contacted to set the keyspace.
 
     public Session connect(String keyspace) {
         SessionManager session = (SessionManager)connect();
         try {
             session.setKeyspace(keyspace);
         } catch (RuntimeException e) {
             session.shutdown();
             throw e;
         }
         return session;
     }

    
Returns read-only metadata on the connected cluster.

This includes the known nodes with their status as seen by the driver, as well as the schema definitions.

Returns:
the cluster metadata.
 
     public Metadata getMetadata() {
         .init(); // Calls init if deferInitialization was used. It's a no-op if it's already initialized.
         return .;
     }

    
The cluster configuration.

Returns:
the cluster configuration.
 
     public Configuration getConfiguration() {
         return .;
     }

    
The cluster metrics.

Returns:
the cluster metrics, or null if metrics collection has been disabled (that is if Configuration.getMetricsOptions() returns null).
 
     public Metrics getMetrics() {
         return .;
     }

    
Registers the provided listener to be notified on hosts up/down/added/removed events.

Registering the same listener multiple times is a no-op.

Note that while com.datastax.driver.core.policies.LoadBalancingPolicy implements Host.StateListener, the configured load balancy does not need to (and should not) be registered through this method to received host related events.

Parameters:
listener the new Host.StateListener to register.
Returns:
this Cluster object;
 
     public Cluster register(Host.StateListener listener) {
         ..add(listener);
         return this;
     }

    
Unregisters the provided listener from being notified on hosts events.

This method is a no-op if listener hadn't previously be registered against this Cluster.

Parameters:
listener the Host.StateListener to unregister.
Returns:
this Cluster object;
 
     public Cluster unregister(Host.StateListener listener) {
         ..remove(listener);
         return this;
     }

    
Registers the provided tracker to be updated with hosts read latencies.

Registering the same listener multiple times is a no-op.

Be warry that the registered tracker update method will be call very frequently (at the end of every query to a Cassandra host) and should thus not be costly.

The main use case for a LatencyTracker is so com.datastax.driver.core.policies.LoadBalancingPolicy can implement latency awareness Typically, com.datastax.driver.core.policies.LatencyAwarePolicy registers it's own internal LatencyTracker (automatically, you don't have to call this method directly).

Parameters:
tracker the new LatencyTracker to register.
Returns:
this Cluster object;
 
     public Cluster register(LatencyTracker tracker) {
         ..add(tracker);
         return this;
     }

    
Unregisters the provided latency tracking from being updated with host read latencies.

This method is a no-op if tracker hadn't previously be registered against this Cluster.

Parameters:
tracker the LatencyTracker to unregister.
Returns:
this Cluster object;
 
     public Cluster unregister(LatencyTracker tracker) {
         ..remove(tracker);
         return this;
     }

    
Shuts down this cluster instance. This closes all connections from all the sessions of this Cluster instance and reclaims all resources used by it.

This method waits indefinitely for the driver to shut down.

This method has no effect if the cluster was already shut down.

 
     public void shutdown() {
     }

    
Shutdown this cluster instance, only waiting a definite amount of time. This closes all connections from all the sessions of this Cluster instance and reclaim all resources used by it.

Note that this method is not thread safe in the sense that if another shutdown is perform in parallel, it might return true even if the instance is not yet fully shutdown.

Parameters:
timeout how long to wait for the cluster instance to shutdown.
unit the unit for the timeout.
Returns:
true if the instance has been properly shutdown within the timeout, false otherwise.
 
     public boolean shutdown(long timeoutTimeUnit unit) {
         try {
             return .shutdown(timeoutunit);
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             return false;
         }
     }

    
Initializer for Cluster instances.

If you want to create a new Cluster instance programmatically, then it is advised to use Cluster.Builder which can be obtained from the Cluster.builder() method.

But it is also possible to implement a custom Initializer that retrieves initialization from a web-service or from a configuration file.

 
     public interface Initializer {

        
Returns the initial Cassandra hosts to connect to.

Returns:
the initial Cassandra contact points. See Cluster.Builder.addContactPoint(java.lang.String) for more details on contact points.
 
         public List<InetAddressgetContactPoints();

        
The configuration to use for the new cluster.

Note that some configuration can be modified after the cluster initialization but some others cannot. In particular, the ones that cannot be changed afterwards includes:

Returns:
the configuration to use for the new cluster.
 
         public Configuration getConfiguration();
     }

    
Helper class to build Cluster instances.
 
     public static class Builder implements Initializer {
 
         private final List<InetAddressaddresses = new ArrayList<InetAddress>();
         private int port = .;
         private AuthInfoProvider authProvider = .;
 
         private LoadBalancingPolicy loadBalancingPolicy;
         private ReconnectionPolicy reconnectionPolicy;
         private RetryPolicy retryPolicy;
 
         private SSLOptions sslOptions = null;
         private boolean metricsEnabled = true;
         private boolean jmxEnabled = true;
         private PoolingOptions poolingOptions = new PoolingOptions();
         private SocketOptions socketOptions = new SocketOptions();
 
         private boolean deferInitialization = false;
 
         @Override
         public List<InetAddressgetContactPoints() {
             return ;
         }

        
The port to use to connect to the Cassandra host. If not set through this method, the default port (9042) will be used instead.

Parameters:
port the port to set.
Returns:
this Builder
 
         public Builder withPort(int port) {
             this. = port;
             return this;
         }

        
Adds a contact point. Contact points are addresses of Cassandra nodes that the driver uses to discover the cluster topology. Only one contact point is required (the driver will retrieve the address of the other nodes automatically), but it is usually a good idea to provide more than one contact point, because if that single contact point is unavailable, the driver cannot initialize itself correctly.

Parameters:
address the address of the node to connect to
Returns:
this Builder
Throws:
java.lang.IllegalArgumentException if no IP address for address could be found
java.lang.SecurityException if a security manager is present and permission to resolve the host name is denied.
 
         public Builder addContactPoint(String address) {
             try {
                 this..add(InetAddress.getByName(address));
                 return this;
             } catch (UnknownHostException e) {
                 throw new IllegalArgumentException(e.getMessage());
             }
         }

        
Adds contact points. See addContactPoint(java.lang.String) for more details on contact points.

Parameters:
addresses addresses of the nodes to add as contact point
Returns:
this Builder
Throws:
java.lang.IllegalArgumentException if no IP address for at least one of addresses could be found
java.lang.SecurityException if a security manager is present and permission to resolve the host name is denied.
See also:
addContactPoint(java.lang.String)
 
         public Builder addContactPoints(String... addresses) {
             for (String address : addresses)
                 addContactPoint(address);
             return this;
         }

        
Adds contact points. See addContactPoint(java.lang.String) for more details on contact points.

Parameters:
addresses addresses of the nodes to add as contact point
Returns:
this Builder
See also:
addContactPoint(java.lang.String)
 
         public Builder addContactPoints(InetAddress... addresses) {
             for (InetAddress address : addresses)
                 this..add(address);
             return this;
         }

        
Configures the load balancing policy to use for the new cluster.

If no load balancing policy is set through this method, com.datastax.driver.core.policies.Policies.defaultLoadBalancingPolicy() will be used instead.

Parameters:
policy the load balancing policy to use
Returns:
this Builder
 
         public Builder withLoadBalancingPolicy(LoadBalancingPolicy policy) {
             this. = policy;
             return this;
         }

        
Configures the reconnection policy to use for the new cluster.

If no reconnection policy is set through this method, com.datastax.driver.core.policies.Policies will be used instead.

Parameters:
policy the reconnection policy to use
Returns:
this Builder
 
         public Builder withReconnectionPolicy(ReconnectionPolicy policy) {
             this. = policy;
             return this;
         }

        
Configures the retry policy to use for the new cluster.

If no retry policy is set through this method, com.datastax.driver.core.policies.Policies will be used instead.

Parameters:
policy the retry policy to use
Returns:
this Builder
 
         public Builder withRetryPolicy(RetryPolicy policy) {
             this. = policy;
             return this;
         }

        
Uses the provided credentials when connecting to Cassandra hosts.

This should be used if the Cassandra cluster has been configured to use the PasswordAuthenticator. If the the default AllowAllAuthenticator is used instead, using this method has no effect.

Parameters:
username the username to use to login to Cassandra hosts.
password the password corresponding to username.
Returns:
this Builder
 
         public Builder withCredentials(String usernameString password) {
             this. = new AuthInfoProvider.Simple(usernamepassword);
             return this;
         }

        
Sets the compression to use for the transport.

Parameters:
compression the compression to set
Returns:
this Builder
See also:
ProtocolOptions.Compression
 
         public Builder withCompression(ProtocolOptions.Compression compression) {
             this. = compression;
             return this;
         }

        
Disables metrics collection for the created cluster (metrics are enabled by default otherwise).

Returns:
this builder
 
         public Builder withoutMetrics() {
             this. = false;
             return this;
         }

        
Enables the use of SSL for the created Cluster.

Calling this method will use default SSL options (see SSLOptions.()). This is thus a shortcut for withSSL(new SSLOptions()). Note that if SSL is enabled, the driver will not connect to any Cassandra nodes that doesn't have SSL enabled and it is strongly advised to enable SSL on every Cassandra node if you plan on using SSL in the driver.

Returns:
this builder
 
         public Builder withSSL() {
             this. = new SSLOptions();
             return this;
         }

        
Enable the use of SSL for the created Cluster using the provided options.

Parameters:
sslOptions the SSL options to use.
Returns:
this builder
 
         public Builder withSSL(SSLOptions sslOptions) {
             this. = sslOptions;
             return this;
         }

        
Disables JMX reporting of the metrics.

JMX reporting is enabled by default (see Metrics) but can be disabled using this option. If metrics are disabled, this is a no-op.

Returns:
this builder
 
         public Builder withoutJMXReporting() {
             this. = false;
             return this;
         }

        
Return the pooling options used by this builder.

Deprecated:
you are now encouraged to use the withPoolingOptions(com.datastax.driver.core.PoolingOptions) method. This method is deprecated and will be removed in the next major version of the driver.
Returns:
the pooling options that will be used by this builder. You can use the returned object to define the initial pooling options for the built cluster.
 
         @Deprecated
         public PoolingOptions poolingOptions() {
             return ;
         }

        
Set the PoolingOptions to use for the newly created Cluster.

If no pooling options are set through this method, default pooling options will be used.

Parameters:
options the pooling options to use.
Returns:
this builder.
 
         public Builder withPoolingOptions(PoolingOptions options) {
             this. = options;
             return this;
         }

        
Returns the socket options used by this builder.

Deprecated:
you are now encouraged to use the withPoolingOptions(com.datastax.driver.core.PoolingOptions) method. This method is deprecated and will be removed in the next major version of the driver.
Returns:
the socket options that will be used by this builder. You can use the returned object to define the initial socket options for the built cluster.
 
         @Deprecated
         public SocketOptions socketOptions() {
             return ;
         }

        
Set the SocketOptions to use for the newly created Cluster.

If no socket options are set through this method, default socket options will be used.

Parameters:
options the socket options to use.
Returns:
this builder.
 
         public Builder withSocketOptions(SocketOptions options) {
             this. = options;
             return this;
         }

        
Defer the initialization of the created cluster.

By default, building the cluster (calling the build() method of this object) triggers the creation of a connection to one of the contact points. That connection is then used to fetch the metadata on the Cassandra cluster we are connected to (other nodes, schema, ...). If this method is used, the creation of that connection will be deferred until the first call to connect() or getMetadata() on the resulting Cluster object.

This method is useful when it is not convenient to deal with connection problems while creating the Cluster object. Note that this method only exists in the 1.X branch of the driver since deferred initialization is the default on the 2.X branch.

Returns:
this builder.
 
         public Builder withDeferredInitialization() {
             this. = true;
             return this;
         }

        
The configuration that will be used for the new cluster.

You should not modify this object directly because changes made to the returned object may not be used by the cluster build. Instead, you should use the other methods of this Builder.

Returns:
the configuration to use for the new cluster.
 
         @Override
         public Configuration getConfiguration() {
             Policies policies = new Policies(
                  == null ? Policies.defaultLoadBalancingPolicy() : ,
                  == null ? Policies.defaultReconnectionPolicy() : ,
                  == null ? Policies.defaultRetryPolicy() : 
             );
             return new Configuration(policies,
                                      new ProtocolOptions().setCompression(),
                                      ,
                                      ,
                                      ,
                                       ? new MetricsOptions() : null);
         }

        
Builds the cluster with the configured set of initial contact points and policies. This is a convenience method for Cluster.buildFrom(this).

Returns:
the newly built Cluster instance.
Throws:
com.datastax.driver.core.exceptions.NoHostAvailableException if none of the contact points provided can be reached.
com.datastax.driver.core.exceptions.AuthenticationException if an authentication error occurs. while contacting the initial contact points
 
         public Cluster build() {
             List<InetAddresscontactPoints = getContactPoints();
             if (contactPoints.isEmpty())
                 throw new IllegalArgumentException("Cannot build a cluster without contact points");
 
             return new Cluster(contactPointsgetConfiguration(), !);
         }
     }
 
     private static ThreadFactory threadFactory(String nameFormat) {
         return new ThreadFactoryBuilder().setNameFormat(nameFormat).build();
     }
 
     static long timeSince(long startNanosTimeUnit destUnit) {
         return destUnit.convert(System.nanoTime() - startNanos.);
     }
 
     private static ListeningExecutorService makeExecutor(int threadsString name) {
         ThreadPoolExecutor executor = new ThreadPoolExecutor(threads,
                                                              threads,
                                                              ,
                                                              .,
                                                              new LinkedBlockingQueue<Runnable>(),
                                                              threadFactory(name));
 
         executor.allowCoreThreadTimeOut(true);
         return MoreExecutors.listeningDecorator(executor);
     }

    
The sessions and hosts managed by this a Cluster instance.

Note: the reason we create a Manager object separate from Cluster is that Manager is not publicly visible. For instance, we wouldn't want user to be able to call the onUp(com.datastax.driver.core.Host) and onDown(com.datastax.driver.core.Host) methods.

 
 
         private boolean isInit;
 
         // Initial contacts point
         final List<InetAddresscontactPoints;
         final Set<SessionManagersessions = new CopyOnWriteArraySet<SessionManager>();
 
         final Metadata metadata;
         final Configuration configuration;
         final Metrics metrics;
 
         final Connection.Factory connectionFactory;
         final ControlConnection controlConnection;
 
 
         final ScheduledExecutorService reconnectionExecutor = Executors.newScheduledThreadPool(2, threadFactory("Reconnection-%d"));
         // scheduledTasksExecutor is used to process C* notifications. So having it mono-threaded ensures notifications are
         // applied in the order received.
         final ScheduledExecutorService scheduledTasksExecutor = Executors.newScheduledThreadPool(1, threadFactory("Scheduled Tasks-%d"));
 
         // Executor used for tasks that shouldn't be executed on an IO thread. Used for short-lived, generally non-blocking tasks
         final ListeningExecutorService executor;
 
         // An executor for tasks that migth block some time, like creating new connection, but are generally not too critical.
 
         final AtomicBoolean isShutdown = new AtomicBoolean(false);
 
         // All the queries that have been prepared (we keep them so we can re-prepared them when a node fail or a
         // new one join the cluster).
         // Note: we could move this down to the session level, but since prepared statement are global to a node,
         // this would yield a slightly less clear behavior.
 
         final Set<Host.StateListenerlisteners = new CopyOnWriteArraySet<Host.StateListener>();
         final Set<LatencyTrackertrackers = new CopyOnWriteArraySet<LatencyTracker>();
 
         private Manager(List<InetAddresscontactPointsConfiguration configuration) {
             .debug("Starting new cluster with contact points " + contactPoints);
 
             this. = makeExecutor(Runtime.getRuntime().availableProcessors(), "Cassandra Java Driver worker-%d");
             this. = makeExecutor(2, "Cassandra Java Driver blocking tasks worker-%d");
 
             this. = configuration;
             this. = new Metadata(this);
             this. = contactPoints;
             this. = new Connection.Factory(thisconfiguration.getAuthInfoProvider());
 
             this. = new ControlConnection(this);
 
             this. = configuration.getMetricsOptions() == null ? null : new Metrics(this);
             this..register(this);
         }
 
         // Initialization is not too performance intensive and in practice there shouldn't be contention
         // on it so synchronized is good enough.
         private synchronized void init() {
             if ()
                 return;
              = true;
 
             // Note: we mark the initial contact point as UP, because we have no prior
             // notion of their state and no real way to know until we connect to them
             // (since the node status is not exposed by C* in the System tables). This
             // may not be correct.
             for (InetAddress address : ) {
                 Host host = addHost(addressfalse);
                 if (host != null)
                     host.setUp();
             }
 
             loadBalancingPolicy().init(Cluster.this.allHosts());
 
             try {
                 .connect();
             } catch (NoHostAvailableException e) {
                 try {
                     shutdown(0, .);
                 } catch (InterruptedException ie) {
                     Thread.currentThread().interrupt();
                 }
                 throw e;
             }
 
         }
 
         Cluster getCluster() {
             return Cluster.this;
         }
 
             return .getPolicies().getLoadBalancingPolicy();
         }
 
             return .getPolicies().getReconnectionPolicy();
         }
 
         private Session newSession() {
             init();
 
             SessionManager session = new SessionManager(Cluster.this.allHosts());
             .add(session);
             return session;
         }
 
         void reportLatency(Host hostlong latencyNanos) {
             for (LatencyTracker tracker : ) {
                 tracker.update(hostlatencyNanos);
             }
         }
 
         private boolean shutdown(long timeoutTimeUnit unitthrows InterruptedException {
 
             if (!.compareAndSet(falsetrue))
                 return true;
 
             .debug("Shutting down");
 
             long start = System.nanoTime();
             boolean success = true;
 
             success &= .shutdown(timeoutunit);
 
             for (Session session : )
                 success &= session.shutdown(timeout - timeSince(startunit), unit);
 
             .shutdown();
             .shutdown();
             .shutdown();
 
             success &= .shutdown(timeout - timeSince(startunit), unit);
 
             if ( != null)
                 .shutdown();
 
             // Note that it's on purpose that we shutdown everything *even* if the timeout
             // is reached early
             return success
                 && .awaitTermination(timeout - timeSince(startunit), unit)
                 && .awaitTermination(timeout - timeSince(startunit), unit)
                 && .awaitTermination(timeout - timeSince(startunit), unit);
         }
 
         @Override
         public void onUp(final Host host) {
             .trace("Host {} is UP"host);
 
             if (.get())
                 return;
 
             if (host.isUp())
                 return;
 
             // If there is a reconnection attempt scheduled for that node, cancel it
             ScheduledFuture<?> scheduledAttempt = host.reconnectionAttempt.getAndSet(null);
             if (scheduledAttempt != null) {
                 .debug("Cancelling reconnection attempt since node is UP");
                 scheduledAttempt.cancel(false);
             }
 
             try {
                 prepareAllQueries(host);
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 // Don't propagate because we don't want to prevent other listener to run
             }
 
             // Session#onUp() expects the load balancing policy to have been updated first, so that
             // Host distances are up to date. This mean the policy could return the node before the
             // new pool have been created. This is harmless if there is no prior pool since RequestHandler
             // will ignore the node, but we do want to make sure there is no prior pool so we don't
             // query from a pool we will shutdown right away.
             for (SessionManager s : )
                 s.removePool(host);
             loadBalancingPolicy().onUp(host);
             .onUp(host);
 
             List<ListenableFuture<Boolean>> futures = new ArrayList<ListenableFuture<Boolean>>(.size());
             for (SessionManager s : )
                 futures.add(s.addOrRenewPool(hostfalse));
 
             // Only mark the node up once all session have re-added their pool (if the loadbalancing
             // policy says it should), so that Host.isUp() don't return true before we're reconnected
             // to the node.
             Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Boolean>>() {
                 public void onSuccess(List<BooleanpoolCreationResults) {
                     // If any of the creation failed, they will have signaled a connection failure
                     // which will trigger a reconnection to the node. So don't bother marking UP.
                     if (Iterables.any(poolCreationResults, Predicates.equalTo(false))) {
                         .debug("Connection pool cannot be created, not marking {} UP"host);
                         return;
                     }
 
                     host.setUp();
 
                     for (Host.StateListener listener : )
                         listener.onUp(host);
 
                     // Now, check if there isn't pools to create/remove following the addition.
                     // We do that now only so that it's not called before we've set the node up.
                     for (SessionManager s : )
                         s.updateCreatedPools();
                 }
 
                 public void onFailure(Throwable t) {
                     // That future is not really supposed to throw unexpected exceptions
                     if (!(t instanceof InterruptedException))
                         .error("Unexpected error while marking node UP: while this shouldn't happen, this shouldn't be critical"t);
                 }
             });
         }
 
         @Override
         public void onDown(final Host host) {
             onDown(hostfalse);
         }
 
         public void onDown(final Host hostfinal boolean isHostAddition) {
             .trace("Host {} is DOWN"host);
 
             if (.get())
                 return;
 
             // Note: we don't want to skip that method if !host.isUp() because we set isUp
             // late in onUp, and so we can rely on isUp if there is an error during onUp.
             // But if there is a reconnection attempt in progress already, then we know
             // we've already gone through that method since the last successful onUp(), so
             // we're good skipping it.
             if (host.reconnectionAttempt.get() != null)
                 return;
 
             boolean wasUp = host.isUp();
             host.setDown();
 
             loadBalancingPolicy().onDown(host);
             .onDown(host);
             for (SessionManager s : )
                 s.onDown(host);
 
             // Contrarily to other actions of that method, there is no reason to notify listeners
             // unless the host was UP at the beginning of this function since even if a onUp fail
             // mid-method, listeners won't  have been notified of the UP.
             if (wasUp) {
                 for (Host.StateListener listener : )
                     listener.onDown(host);
             }
 
             // Note: we basically waste the first successful reconnection, but it's probably not a big deal
             .debug("{} is down, scheduling connection retries"host);
             new AbstractReconnectionHandler(reconnectionPolicy().newSchedule(), host.reconnectionAttempt) {
 
                 protected Connection tryReconnect() throws ConnectionExceptionInterruptedException {
                     return .open(host);
                 }
 
                 protected void onReconnection(Connection connection) {
                     .debug("Successful reconnection to {}, setting host UP"host);
                     if (isHostAddition)
                         onAdd(host);
                     else
                         onUp(host);
                }
                protected boolean onConnectionException(ConnectionException elong nextDelayMs) {
                    if (.isDebugEnabled())
                        .debug("Failed reconnection to {} ({}), scheduling retry in {} milliseconds"new Object[]{ hoste.getMessage(), nextDelayMs});
                    return true;
                }
                protected boolean onUnknownException(Exception elong nextDelayMs) {
                    .error(String.format("Unknown error during control connection reconnection, scheduling retry in %d milliseconds"nextDelayMs), e);
                    return true;
                }
            }.start();
        }
        @Override
        public void onAdd(final Host host) {
            .trace("Adding new host {}"host);
            if (.get())
                return;
            // Adds to the load balancing first and foremost, as doing so might change the decision
            // it will make for distance() on that node (not likely but we leave that possibility).
            // This does mean the policy may start returning that node for query plan, but as long
            // as no pools have been created (below) this will be ignored by RequestHandler so it's fine.
            loadBalancingPolicy().onAdd(host);
            // Next, if the host should be ignored, well, ignore it.
            if (loadBalancingPolicy().distance(host) == .) {
                // We still mark the node UP though as it should be (and notifiy the listeners).
                // We'll mark it down if we have  a notification anyway and we've documented that especially
                // for IGNORED hosts, the isUp() method was a best effort guess
                host.setUp();
                for (Host.StateListener listener : )
                    listener.onAdd(host);
                return;
            }
            try {
                prepareAllQueries(host);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                // Don't propagate because we don't want to prevent other listener to run
            }
            .onAdd(host);
            List<ListenableFuture<Boolean>> futures = new ArrayList<ListenableFuture<Boolean>>(.size());
            for (SessionManager s : )
                futures.add(s.addOrRenewPool(hosttrue));
            // Only mark the node up once all session have added their pool (if the loadbalancing
            // policy says it should), so that Host.isUp() don't return true before we're reconnected
            // to the node.
            Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Boolean>>() {
                public void onSuccess(List<BooleanpoolCreationResults) {
                    // If any of the creation failed, they will have signaled a connection failure
                    // which will trigger a reconnection to the node. So don't bother marking UP.
                    if (Iterables.any(poolCreationResults, Predicates.equalTo(false))) {
                        .debug("Connection pool cannot be created, not marking {} UP"host);
                        return;
                    }
                    host.setUp();
                    for (Host.StateListener listener : )
                        listener.onAdd(host);
                    // Now, check if there isn't pools to create/remove following the addition.
                    // We do that now only so that it's not called before we've set the node up.
                    for (SessionManager s : )
                        s.updateCreatedPools();
                }
                public void onFailure(Throwable t) {
                    // That future is not really supposed to throw unexpected exceptions
                    if (!(t instanceof InterruptedException))
                        .error("Unexpected error while adding node: while this shouldn't happen, this shouldn't be critical"t);
                }
            });
        }
        @Override
        public void onRemove(Host host) {
            if (.get())
                return;
            host.setDown();
            .trace("Removing host {}"host);
            loadBalancingPolicy().onRemove(host);
            .onRemove(host);
            for (SessionManager s : )
                s.onRemove(host);
            for (Host.StateListener listener : )
                listener.onRemove(host);
        }
        public boolean signalConnectionFailure(Host hostConnectionException exceptionboolean isHostAddition) {
            boolean isDown = host.signalConnectionFailure(exception);
            if (isDown)
                onDown(hostisHostAddition);
            return isDown;
        }
        public Host addHost(InetAddress addressboolean signal) {
            Host newHost = .add(address);
            if (newHost != null && signal) {
                .info("New Cassandra host {} added"newHost);
                onAdd(newHost);
            }
            return newHost;
        }
        public void removeHost(Host host) {
            if (host == null)
                return;
            if (.remove(host)) {
                .info("Cassandra host {} removed"host);
                onRemove(host);
            }
        }
        public void ensurePoolsSizing() {
            for (SessionManager session : ) {
                for (HostConnectionPool pool : session.pools.values())
                    pool.ensureCoreConnections();
            }
        }
        public PreparedStatement addPrepared(PreparedStatement stmt) {
            PreparedStatement previous = .putIfAbsent(stmt.idstmt);
            if (previous != null) {
                .warn("Re-preparing already prepared query {}. Please note that preparing the same query more than once is "
                          + "generally an anti-pattern and will likely affect performance. Consider preparing the statement only once."stmt.getQueryString());
                // The one object in the cache will get GCed once it's not referenced by the client anymore since we use a weak reference.
                // So we need to make sure that the instance we do return to the user is the one that is in the cache.
                return previous;
            }
            return stmt;
        }
        private void prepareAllQueries(Host hostthrows InterruptedException {
            if (.isEmpty())
                return;
            .debug("Preparing {} prepared queries on newly up node {}".size(), host);
            try {
                Connection connection = .open(host);
                try
                {
                    try {
                        ControlConnection.waitForSchemaAgreement(connection);
                    } catch (ExecutionException e) {
                        // As below, just move on
                    }
                    // Furthermore, along with each prepared query we keep the current keyspace at the time of preparation
                    // as we need to make it is the same when we re-prepare on new/restarted nodes. Most query will use the
                    // same keyspace so keeping it each time is slightly wasteful, but this doesn't really matter and is
                    // simpler. Besides, we do avoid in prepareAllQueries to not set the current keyspace more than needed.
                    // We need to make sure we prepared every query with the right current keyspace, i.e. the one originally
                    // used for preparing it. However, since we are likely that all prepared query belong to only a handful
                    // of different keyspace (possibly only one), and to avoid setting the current keyspace more than needed,
                    // we first sort the query per keyspace.
                    SetMultimap<StringStringperKeyspace = HashMultimap.create();
                    for (PreparedStatement ps : .values()) {
                        // It's possible for a query to not have a current keyspace. But since null doesn't work well as
                        // map keys, we use the empty string instead (that is not a valid keyspace name).
                        String keyspace = ps.getQueryKeyspace() == null ? "" : ps.getQueryKeyspace();
                        perKeyspace.put(keyspaceps.getQueryString());
                    }
                    for (String keyspace : perKeyspace.keySet())
                    {
                        // Empty string mean no particular keyspace to set
                        if (!keyspace.isEmpty())
                            connection.setKeyspace(keyspace);
                        List<Connection.Futurefutures = new ArrayList<Connection.Future>(.size());
                        for (String query : perKeyspace.get(keyspace)) {
                            futures.add(connection.write(new PrepareMessage(query)));
                        }
                        for (Connection.Future future : futures) {
                            try {
                                future.get();
                            } catch (ExecutionException e) {
                                // This "might" happen if we drop a CF but haven't removed it's prepared queries (which we don't do
                                // currently). It's not a big deal however as if it's a more serious problem it'll show up later when
                                // the query is tried for execution.
                                .debug("Unexpected error while preparing queries on new/newly up host"e);
                            }
                        }
                    }
                } finally {
                    connection.close(0, .);
                }
            } catch (ConnectionException e) {
                // Ignore, not a big deal
            } catch (AuthenticationException e) {
                // That's a bad news, but ignore at this point
            } catch (BusyConnectionException e) {
                // Ignore, not a big deal
            }
        }
        public void submitSchemaRefresh(final String keyspacefinal String table) {
            .trace("Submitting schema refresh");
            .submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        .refreshSchema(keyspacetable);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            });
        }
        // refresh the schema using the provided connection, and notice the future with the provided resultset once done
        public void refreshSchema(final Connection connectionfinal SimpleFuture<ResultSetfuturefinal ResultSet rsfinal String keyspacefinal String table) {
            if (.isDebugEnabled())
                .debug("Refreshing schema for {}{}"keyspace == null ? "" : keyspacetable == null ? "" : "." + table);
            .submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        // Before refreshing the schema, wait for schema agreement so
                        // that querying a table just after having created it don't fail.
                        if (!ControlConnection.waitForSchemaAgreement(connection))
                            .warn("No schema agreement from live replicas after {} ms. The schema may not be up to date on some nodes.".);
                        ControlConnection.refreshSchema(connectionkeyspacetableCluster.Manager.this);
                    } catch (Exception e) {
                        .error("Error during schema refresh ({}). The schema from Cluster.getMetadata() might appear stale. Asynchronously submitting job to fix."e.getMessage());
                        submitSchemaRefresh(keyspacetable);
                    } finally {
                        // Always sets the result
                        future.set(rs);
                    }
                }
            });
        }
        // Called when some message has been received but has been initiated from the server (streamId < 0).
        @Override
        public void handle(Message.Response response) {
            if (!(response instanceof EventMessage)) {
                .error("Received an unexpected message from the server: {}"response);
                return;
            }
            final Event event = ((EventMessage)response).;
            .debug("Received event {}, scheduling delivery"response);
            // When handle is called, the current thread is a network I/O  thread, and we don't want to block
            // it (typically addHost() will create the connection pool to the new node, which can take time)
            // Besides, up events are usually sent a bit too early (since they're triggered once gossip is up,
            // but that before the client-side server is up) so adds a 1 second delay in that case.
            // TODO: this delay is honestly quite random. We should do something on the C* side to fix that.
            .schedule(new Runnable() {
                @Override
                public void run() {
                    switch (event.type) {
                        case :
                            Event.TopologyChange tpc = (Event.TopologyChange)event;
                            switch (tpc.change) {
                                case :
                                    addHost(tpc.node.getAddress(), true);
                                    break;
                                case :
                                    removeHost(.getHost(tpc.node.getAddress()));
                                    break;
                                case :
                                    .refreshNodeListAndTokenMap();
                                    break;
                            }
                            break;
                        case :
                            Event.StatusChange stc = (Event.StatusChange)event;
                            switch (stc.status) {
                                case :
                                    Host hostUp = .getHost(stc.node.getAddress());
                                    if (hostUp == null) {
                                        // first time we heard about that node apparently, add it
                                        addHost(stc.node.getAddress(), true);
                                    } else {
                                        onUp(hostUp);
                                    }
                                    break;
                                case :
                                    // Note that there is a slight risk we can receive the event late and thus
                                    // mark the host down even though we already had reconnected successfully.
                                    // But it is unlikely, and don't have too much consequence since we'll try reconnecting
                                    // right away, so we favor the detection to make the Host.isUp method more reliable.
                                    Host hostDown = .getHost(stc.node.getAddress());
                                    if (hostDown != null) {
                                        onDown(hostDown);
                                    }
                                    break;
                            }
                            break;
                        case :
                            Event.SchemaChange scc = (Event.SchemaChange)event;
                            switch (scc.change) {
                                case :
                                    if (scc.table.isEmpty())
                                        submitSchemaRefresh(nullnull);
                                    else
                                        submitSchemaRefresh(scc.keyspacenull);
                                    break;
                                case :
                                    if (scc.table.isEmpty())
                                        submitSchemaRefresh(nullnull);
                                    else
                                        submitSchemaRefresh(scc.keyspacenull);
                                    break;
                                case :
                                    if (scc.table.isEmpty())
                                        submitSchemaRefresh(scc.keyspacenull);
                                    else
                                        submitSchemaRefresh(scc.keyspacescc.table);
                                    break;
                            }
                            break;
                    }
                }
            }, delayForEvent(event), .);
        }
        private int delayForEvent(Event event) {
            switch (event.type) {
                case :
                    // Could probably be 0 for REMOVED_NODE but it's inconsequential
                    return 1;
                case :
                    Event.StatusChange stc = (Event.StatusChange)event;
                    if (stc.status == ...)
                        return 1;
                    break;
            }
            return 0;
        }
    }