Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   *      Copyright (C) 2012 DataStax Inc.
   *
   *   Licensed under the Apache License, Version 2.0 (the "License");
   *   you may not use this file except in compliance with the License.
   *   You may obtain a copy of the License at
   *
   *      http://www.apache.org/licenses/LICENSE-2.0
   *
  *   Unless required by applicable law or agreed to in writing, software
  *   distributed under the License is distributed on an "AS IS" BASIS,
  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
 package com.datastax.driver.core;
 
 import java.util.*;
 
 
Handles a request to cassandra, dealing with host failover and retries on unavailable/timeout.
 
 
     private static final Logger logger = LoggerFactory.getLogger(RequestHandler.class);
 
     private final SessionManager manager;
     private final Callback callback;
 
     private final Iterator<HostqueryPlan;
     private final Query query;
     private volatile Host current;
     private volatile List<HosttriedHosts;
     private volatile HostConnectionPool currentPool;
 
     private volatile int queryRetries;
     private volatile ConsistencyLevel retryConsistencyLevel;
 
     private volatile Map<InetAddressStringerrors;
 
     private volatile boolean isCanceled;
     private volatile Connection.ResponseHandler connectionHandler;
 
     private final TimerContext timerContext;
     private final long startTime;
 
     public RequestHandler(SessionManager managerCallback callbackQuery query) {
         this. = manager;
         this. = callback;
 
         callback.register(this);
 
         this. = manager.loadBalancingPolicy().newQueryPlan(query);
         this. = query;
 
         this. = metricsEnabled()
                           ? metrics().getRequestsTimer().time()
                           : null;
         this. = System.nanoTime();
     }
 
     private boolean metricsEnabled() {
         return .configuration().getMetricsOptions() != null;
     }
 
     private Metrics metrics() {
         return ...;
     }
 
     public void sendRequest() {
         try {
             while (.hasNext() && !) {
                 Host host = .next();
                 .trace("Querying node {}"host);
                 if (query(host))
                     return;
             }
             setFinalException(nullnew NoHostAvailableException( == null ? Collections.<InetAddressString>emptyMap() : ));
         } catch (Exception e) {
             // Shouldn't happen really, but if ever the loadbalancing policy returned iterator throws, we don't want to block.
             setFinalException(nullnew DriverInternalError("An unexpected error happened while sending requests"e));
        }
    }
    private boolean query(Host host) {
         = ..get(host);
        if ( == null || .isShutdown())
            return false;
        PooledConnection connection = null;
        try {
            // Note: this is not perfectly correct to use getConnectTimeoutMillis(), but
            // until we provide a more fancy to control query timeouts, it's not a bad solution either
            if ( != null) {
                if ( == null)
                     = new ArrayList<Host>();
                .add();
            }
             = host;
             = connection.write(this);
            return true;
        } catch (ConnectionException e) {
            // If we have any problem with the connection, move to the next node.
            if (metricsEnabled())
                metrics().getErrorMetrics().getConnectionErrors().inc();
            if (connection != null)
                connection.release();
            logError(host.getAddress(), e.getMessage());
            return false;
        } catch (BusyConnectionException e) {
            // The pool shoudln't have give us a busy connection unless we've maxed up the pool, so move on to the next host.
            if (connection != null)
                connection.release();
            logError(host.getAddress(), e.getMessage());
            return false;
        } catch (TimeoutException e) {
            // We timeout, log it but move to the next node.
            logError(host.getAddress(), "Timeout while trying to acquire available connection (you may want to increase the driver number of per-host connections)");
            return false;
        } catch (RuntimeException e) {
            if (connection != null)
                connection.release();
            .error("Unexpected error while querying " + host.getAddress(), e);
            logError(host.getAddress(), e.getMessage());
            return false;
        }
    }
    private void logError(InetAddress addressString msg) {
        .debug("Error querying {}, trying next host (error is: {})"addressmsg);
        if ( == null)
             = new HashMap<InetAddressString>();
        .put(addressmsg);
    }
    private void retry(final boolean retryCurrentConsistencyLevel newConsistencyLevel) {
        final Host h = ;
        this. = newConsistencyLevel;
        // We should not retry on the current thread as this will be an IO thread.
        .executor().execute(new Runnable() {
            @Override
            public void run() {
                try {
                    if (retryCurrent) {
                        if (query(h))
                            return;
                    }
                    sendRequest();
                } catch (Exception e) {
                    setFinalException(nullnew DriverInternalError("Unexpected exception while retrying query"e));
                }
            }
        });
    }
    public void cancel() {
         = true;
        if ( != null)
            .cancelHandler();
    }
    @Override
    public Message.Request request() {
        Message.Request request = .request();
        if ( != null) {
            org.apache.cassandra.db.ConsistencyLevel cl = ConsistencyLevel.toCassandraCL();
            if (request instanceof QueryMessage) {
                QueryMessage qm = (QueryMessage)request;
                if (qm.consistency != cl)
                    request = new QueryMessage(qm.querycl);
            }
            else if (request instanceof ExecuteMessage) {
                ExecuteMessage em = (ExecuteMessage)request;
                if (em.consistency != cl)
                    request = new ExecuteMessage(em.statementIdem.valuescl);
            }
        }
        return request;
    }
    private void setFinalResult(Connection connectionMessage.Response response) {
        try {
            if ( != null)
                .stop();
            ExecutionInfo info = .;
            if ( != null)
            {
                .add();
                info = new ExecutionInfo();
            }
            if ( != null)
                info = info.withAchievedConsistency();
            .onSet(connectionresponseinfo, System.nanoTime() - );
        } catch (Exception e) {
            .onException(connectionnew DriverInternalError("Unexpected exception while setting final result from " + responsee), System.nanoTime() - );
        }
    }
    private void setFinalException(Connection connectionException exception) {
        try {
            if ( != null)
                .stop();
        } finally {
            .onException(connectionexception, System.nanoTime() - );
        }
    }
    @Override
    public void onSet(Connection connectionMessage.Response responselong latency) {
        Host queriedHost = ;
        try {
            if (connection instanceof PooledConnection)
                ((PooledConnection)connection).release();
            switch (response.type) {
                case :
                    setFinalResult(connectionresponse);
                    break;
                case :
                    ErrorMessage err = (ErrorMessage)response;
                    RetryPolicy.RetryDecision retry = null;
                    RetryPolicy retryPolicy = .getRetryPolicy() == null
                                            ? .configuration().getPolicies().getRetryPolicy()
                                            : .getRetryPolicy();
                    switch (err.error.code()) {
                        case :
                            assert err.error instanceof ReadTimeoutException;
                            if (metricsEnabled())
                                metrics().getErrorMetrics().getReadTimeouts().inc();
                            ReadTimeoutException rte = (ReadTimeoutException)err.error;
                            ConsistencyLevel rcl = ConsistencyLevel.from(rte.consistency);
                            retry = retryPolicy.onReadTimeout(rclrte.blockForrte.receivedrte.dataPresent);
                            break;
                        case :
                            assert err.error instanceof WriteTimeoutException;
                            if (metricsEnabled())
                                metrics().getErrorMetrics().getWriteTimeouts().inc();
                            WriteTimeoutException wte = (WriteTimeoutException)err.error;
                            ConsistencyLevel wcl = ConsistencyLevel.from(wte.consistency);
                            retry = retryPolicy.onWriteTimeout(wcl, WriteType.from(wte.writeType), wte.blockForwte.received);
                            break;
                        case :
                            assert err.error instanceof UnavailableException;
                            if (metricsEnabled())
                                metrics().getErrorMetrics().getUnavailables().inc();
                            UnavailableException ue = (UnavailableException)err.error;
                            ConsistencyLevel ucl = ConsistencyLevel.from(ue.consistency);
                            retry = retryPolicy.onUnavailable(uclue.requiredue.alive);
                            break;
                        case :
                            // Try another node
                            .warn("Host {} is overloaded, trying next host."connection.address);
                            logError(connection.address"Host overloaded");
                            if (metricsEnabled())
                                metrics().getErrorMetrics().getOthers().inc();
                            retry(falsenull);
                            return;
                        case :
                            // Try another node
                            .error("Query sent to {} but it is bootstrapping. This shouldn't happen but trying next host."connection.address);
                            logError(connection.address"Host is boostrapping");
                            if (metricsEnabled())
                                metrics().getErrorMetrics().getOthers().inc();
                            retry(falsenull);
                            return;
                        case :
                            assert err.error instanceof PreparedQueryNotFoundException;
                            PreparedQueryNotFoundException pqnf = (PreparedQueryNotFoundException)err.error;
                            PreparedStatement toPrepare = ....get(pqnf.id);
                            if (toPrepare == null) {
                                // This shouldn't happen
                                String msg = String.format("Tried to execute unknown prepared query %s"pqnf.id);
                                .error(msg);
                                setFinalException(connectionnew DriverInternalError(msg));
                                return;
                            }
                            .info("Query {} is not prepared on {}, preparing before retrying executing. "
                                      + "Seeing this message a few times is fine, but seeing it a lot may be source of performance problems",
                                        toPrepare.getQueryString(), connection.address);
                            String currentKeyspace = connection.keyspace();
                            String prepareKeyspace = toPrepare.getQueryKeyspace();
                            // This shouldn't happen in normal use, because a user shouldn't try to execute
                            // a prepared statement with the wrong keyspace set. However, if it does, we'd rather
                            // prepare the query correctly and let the query executing return a meaningful error message
                            if (prepareKeyspace != null && (currentKeyspace == null || !currentKeyspace.equals(prepareKeyspace)))
                            {
                                .trace("Setting keyspace for prepared query to {}"prepareKeyspace);
                                connection.setKeyspace(prepareKeyspace);
                            }
                            try {
                                connection.write(prepareAndRetry(toPrepare.getQueryString()));
                            } finally {
                                // Always reset the previous keyspace if needed
                                if (connection.keyspace() == null || !connection.keyspace().equals(currentKeyspace))
                                {
                                    .trace("Setting back keyspace post query preparation to {}"currentKeyspace);
                                    connection.setKeyspace(currentKeyspace);
                                }
                            }
                            // we're done for now, the prepareAndRetry callback will handle the rest
                            return;
                        default:
                            if (metricsEnabled())
                                metrics().getErrorMetrics().getOthers().inc();
                            break;
                    }
                    if (retry == null)
                        setFinalResult(connectionresponse);
                    else {
                        switch (retry.getType()) {
                            case :
                                ++;
                                if (.isTraceEnabled())
                                    .trace("Doing retry {} for query {} at consistency {}"new Object[]{ retry.getRetryConsistencyLevel()});
                                if (metricsEnabled())
                                    metrics().getErrorMetrics().getRetries().inc();
                                retry(trueretry.getRetryConsistencyLevel());
                                break;
                            case :
                                setFinalResult(connectionresponse);
                                break;
                            case :
                                if (metricsEnabled())
                                    metrics().getErrorMetrics().getIgnores().inc();
                                setFinalResult(connectionnew ResultMessage.Void());
                                break;
                        }
                    }
                    break;
                default:
                    setFinalResult(connectionresponse);
                    break;
            }
        } catch (Exception e) {
            setFinalException(connectione);
        } finally {
            if (queriedHost != null)
                ...reportLatency(queriedHostlatency);
        }
    }
    private Connection.ResponseCallback prepareAndRetry(final String toPrepare) {
        return new Connection.ResponseCallback() {
            @Override
            public Message.Request request() {
                return new PrepareMessage(toPrepare);
            }
            @Override
            public void onSet(Connection connectionMessage.Response responselong latency) {
                // TODO should we check the response ?
                switch (response.type) {
                    case :
                        if (((ResultMessage)response). == ..) {
                            .trace("Scheduling retry now that query is prepared");
                            retry(truenull);
                        } else {
                            logError(connection.address"Got unexpected response to prepare message: " + response);
                            retry(falsenull);
                        }
                        break;
                    case :
                        logError(connection.address"Error preparing query, got " + response);
                        if (metricsEnabled())
                            metrics().getErrorMetrics().getOthers().inc();
                        retry(falsenull);
                        break;
                    default:
                        // Something's wrong, so we return but we let setFinalResult propagate the exception
                        RequestHandler.this.setFinalResult(connectionresponse);
                        break;
                }
            }
            @Override
            public void onException(Connection connectionException exceptionlong latency) {
                RequestHandler.this.onException(connectionexceptionlatency);
            }
            @Override
            public void onTimeout(Connection connectionlong latency) {
                logError(connection.address"Timeout waiting for response to prepare message");
                retry(falsenull);
            }
        };
    }
    @Override
    public void onException(Connection connectionException exceptionlong latency) {
        Host queriedHost = ;
        try {
            if (connection instanceof PooledConnection)
                ((PooledConnection)connection).release();
            if (exception instanceof ConnectionException) {
                if (metricsEnabled())
                    metrics().getErrorMetrics().getConnectionErrors().inc();
                ConnectionException ce = (ConnectionException)exception;
                logError(ce.addressce.getMessage());
                retry(falsenull);
                return;
            }
            setFinalException(connectionexception);
        } catch (Exception e) {
            // This shouldn't happen, but if it does, we want to signal the callback, not let him hang indefinitively
            setFinalException(nullnew DriverInternalError("An unexpected error happened while handling exception " + exceptione));
        } finally {
            if (queriedHost != null)
                ...reportLatency(queriedHostlatency);
        }
    }
    @Override
    public void onTimeout(Connection connectionlong latency) {
        Host queriedHost = ;
        try {
            if (connection instanceof PooledConnection)
                ((PooledConnection)connection).release();
            logError(connection.address"Timeout during read");
            retry(falsenull);
        } catch (Exception e) {
            // This shouldn't happen, but if it does, we want to signal the callback, not let him hang indefinitively
            setFinalException(nullnew DriverInternalError("An unexpected error happened while handling timeout"e));
        } finally {
            if (queriedHost != null)
                ...reportLatency(queriedHostlatency);
        }
    }
    interface Callback extends Connection.ResponseCallback {
        public void onSet(Connection connectionMessage.Response responseExecutionInfo infolong latency);
        public void register(RequestHandler handler);
    }
New to GrepCode? Check out our FAQ X