Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   *      Copyright (C) 2012 DataStax Inc.
   *
   *   Licensed under the Apache License, Version 2.0 (the "License");
   *   you may not use this file except in compliance with the License.
   *   You may obtain a copy of the License at
   *
   *      http://www.apache.org/licenses/LICENSE-2.0
   *
  *   Unless required by applicable law or agreed to in writing, software
  *   distributed under the License is distributed on an "AS IS" BASIS,
  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
 package com.datastax.driver.core.policies;
 
 import java.util.*;
 
 
A wrapper load balancing policy that adds latency awareness to a child policy.

When used, this policy will collect the latencies of the queries to each Cassandra node and maintain a per-node latency score (an average). Based on these scores, the policy will penalize (technically, it will ignore them unless no other nodes are up) the nodes that are slower than the best performing node by more than some configurable amount (the exclusion threshold).

The latency score for a given node is a based on a form of exponential moving average. In other words, the latency score of a node is the average of its previously measured latencies, but where older measurements gets an exponentially decreasing weight. The exact weight applied to a newly received latency is based on the time elapsed since the previous measure (to account for the fact that latencies are not necessarily reported with equal regularity, neither over time nor between different nodes).

Once a node is excluded from query plans (because its averaged latency grew over the exclusion threshold), its latency score will not be updated anymore (since it is not queried). To give a chance to this node to recover, the policy has a configurable retry period. The policy will not penalize a host for which no measurement has been collected for more than this retry period.

Please see the LatencyAwarePolicy.Builder class and methods for more details on the possible parameters of this policy.

Since:
1.0.4
 
 public class LatencyAwarePolicy implements LoadBalancingPolicy {
 
     private static final Logger logger = LoggerFactory.getLogger(LatencyAwarePolicy.class);
 
     private final LoadBalancingPolicy childPolicy;
     private final Tracker latencyTracker;
     private final ScheduledExecutorService updaterService = Executors.newSingleThreadScheduledExecutor(threadFactory("LatencyAwarePolicy updater"));
 
     private final double exclusionThreshold;
 
     private final long scale;
     private final long retryPeriod;
     private final long minMeasure;
 
     private LatencyAwarePolicy(LoadBalancingPolicy childPolicy,
                                double exclusionThreshold,
                                long scale,
                                long retryPeriod,
                                long updateRate,
                                int minMeasure) {
         this. = childPolicy;
         this. = retryPeriod;
         this. = scale;
         this. = new Tracker();
         this. = exclusionThreshold;
         this. = minMeasure;
 
         .scheduleAtFixedRate(new Updater(), updateRateupdateRate.);
     }

    
Creates a new latency aware policy builder given the child policy that the resulting policy should wrap.

Parameters:
childPolicy the load balancing policy to wrap with latency awareness.
Returns:
the created builder.
 
     public static Builder builder(LoadBalancingPolicy childPolicy) {
         return new Builder(childPolicy);
    }
    private class Updater implements Runnable {
        private Set<HostexcludedAtLastTick = Collections.<Host>emptySet();
        @Override
        public void run() {
            try {
                .trace("Updating LatencyAwarePolicy minimum");
                .updateMin();
                if (.isDebugEnabled()) {
                    /*
                     * For users to be able to know if the policy potentially needs tuning, we need to provide
                     * some feedback on on how things evolve. For that, we use the min computation to also check
                     * which host will be excluded if a query is submitted now and if any host is, we log it (but
                     * we try to avoid flooding too). This is probably interesting information anyway since it
                     * gets an idea of which host perform badly.
                     */
                    Set<HostexcludedThisTick = new HashSet<Host>();
                    double currentMin = .getMinAverage();
                    for (Map.Entry<HostSnapshot.Statsentry : getScoresSnapshot().getAllStats().entrySet()) {
                        Host host = entry.getKey();
                        Snapshot.Stats stats = entry.getValue();
                        if (stats.getMeasurementsCount() < )
                            continue;
                        if (stats.lastUpdatedSince() > ) {
                            if (.contains(host))
                                .debug(String.format("Previously avoided host %s has not be queried since %.3fms: will be reconsidered."hostinMS(stats.lastUpdatedSince())));
                            continue;
                        }
                        if (stats.getLatencyScore() > ((long)( * currentMin))) {
                            excludedThisTick.add(host);
                            if (!.contains(host))
                                .debug(String.format("Host %s has an average latency score of %.3fms, more than %f times more than the minimum %.3fms: will be avoided temporarily.",
                                                          hostinMS(stats.getLatencyScore()), inMS(currentMin)));
                            continue;
                        }
                        if (.contains(host)) {
                            .debug("Previously avoided host {} average latency has come back within accepted bounds: will be reconsidered."host);
                        }
                    }
                     = excludedThisTick;
                }
            } catch (RuntimeException e) {
                // An unexpected exception would suppress further execution, so catch, log, but swallow after that.
                .error("Error while updating LatencyAwarePolicy minimum"e);
            }
        }
    }
    private static double inMS(long nanos) {
        return ((double)nanos) / (1000 * 1000);
    }
    private static double inMS(double nanos) {
        return nanos / (1000 * 1000);
    }
    private static ThreadFactory threadFactory(String nameFormat) {
        return new ThreadFactoryBuilder().setNameFormat(nameFormat).build();
    }
    @Override
    public void init(Cluster clusterCollection<Hosthosts) {
        .init(clusterhosts);
        cluster.register();
    }

    
Returns the HostDistance for the provided host.

Parameters:
host the host of which to return the distance of.
Returns:
the HostDistance to host as returned by the wrapped policy.
    @Override
    public HostDistance distance(Host host) {
        return .distance(host);
    }

    
Returns the hosts to use for a new query.

The returned plan will be the same as the plan generated by the child policy, but with the (initial) exclusion of hosts whose recent (averaged) latency is more than exclusionThreshold * minLatency (where minLatency is the (averaged) latency of the fastest host).

The hosts that are initially excluded due to their latency will be returned by this iterator, but only only after all non-excluded hosts of the child policy have been returned.

Parameters:
query the query for which to build the plan.
Returns:
the new query plan.
    @Override
    public Iterator<HostnewQueryPlan(Query query) {
        final Iterator<HostchildIter = .newQueryPlan(query);
        return new AbstractIterator<Host>() {
            private Queue<Hostskipped;
            @Override
            protected Host computeNext() {
                long min = .getMinAverage();
                long now = System.nanoTime();
                while (childIter.hasNext()) {
                    Host host = childIter.next();
                    TimestampedAverage latency = .latencyOf(host);
                    // If we haven't had enough data point yet to have a score, or the last update of the score
                    // is just too old, include the host.
                    if (min < 0 || latency == null || latency.nbMeasure <  || (now - latency.timestamp) > )
                        return host;
                    // If the host latency is within acceptable bound of the faster known host, return
                    // that host. Otherwise, skip it.
                    if (latency.average <= ((long)( * (double)min)))
                        return host;
                    if ( == null)
                         = new ArrayDeque<Host>();
                    .offer(host);
                }
                if ( != null && !.isEmpty())
                    return .poll();
                return endOfData();
            };
        };
    }

    
Returns a snapshot of the scores (latency averages) maintained by this policy.

Returns:
a new (immutable) LatencyAwarePolicy.Snapshot object containing the current latency scores maintained by this policy.
    public Snapshot getScoresSnapshot() {
        Map<HostTimestampedAveragecurrentLatencies = .currentLatencies();
        ImmutableMap.Builder<HostSnapshot.Statsbuilder = ImmutableMap.builder();
        long now = System.nanoTime();
        for (Map.Entry<HostTimestampedAverageentry : currentLatencies.entrySet()) {
            Host host = entry.getKey();
            TimestampedAverage latency = entry.getValue();
            Snapshot.Stats stats = new Snapshot.Stats(now - latency.timestamplatency.averagelatency.nbMeasure);
            builder.put(hoststats);
        }
        return new Snapshot(builder.build());
    }
    @Override
    public void onUp(Host host) {
        .onUp(host);
    }
    @Override
    public void onDown(Host host) {
        .onDown(host);
        .resetHost(host);
    }
    @Override
    public void onAdd(Host host) {
        .onAdd(host);
    }
    @Override
    public void onRemove(Host host) {
        .onRemove(host);
        .resetHost(host);
    }

    
An immutable snapshot of the per-host scores (and stats in general) maintained by LatencyAwarePolicy to base its decision upon.
    public static class Snapshot {
        private final Map<HostStatsstats;
        private Snapshot(Map<HostStatsstats) {
            this. = stats;
        }

        
A map with the stats for all hosts tracked by the LatencyAwarePolicy at the time of the snapshot.

Returns:
a immutable map with all the stats contained in this snapshot.
        public Map<HostStatsgetAllStats() {
            return ;
        }

        
The Stats object for a given host.

Parameters:
host the host to return the stats of.
Returns:
the Stats for host in this snapshot or null if the snapshot has not information on host.
        public Stats getStats(Host host) {
            return .get(host);
        }

        
A snapshot of the statistics on a given host kept by LatencyAwarePolicy.
        public static class Stats {
            private final long lastUpdatedSince;
            private final long average;
            private final long nbMeasurements;
            private Stats(long lastUpdatedSincelong averagelong nbMeasurements) {
                this. = lastUpdatedSince;
                this. = average;
                this. = nbMeasurements;
            }

            
The number of nanoseconds since the last latency update was recorded (at the time of the snapshot).

Returns:
The number of nanoseconds since the last latency update was recorded (at the time of the snapshot).
            public long lastUpdatedSince() {
                return ;
            }

            
The latency score for the host this is the stats of at the time of the snapshot.

Returns:
the latency score for the host this is the stats of at the time of the snapshot, or -1L if not enough measurements have been taken to assign a score.
            public long getLatencyScore() {
                return ;
            }

            
The number of recorded latency measurements for the host this is the stats of.

Returns:
the number of recorded latency measurements for the host this is the stats of.
            public long getMeasurementsCount() {
                return ;
            }
        }
    }
    private class Tracker implements LatencyTracker {
        private volatile long cachedMin = -1L;
        public void update(Host hostlong newLatencyNanos) {
            HostLatencyTracker hostTracker = .get(host);
            if (hostTracker == null) {
                hostTracker = new HostLatencyTracker(, (30L * ) / 100L);
                HostLatencyTracker old = .putIfAbsent(hosthostTracker);
                if (old != null)
                    hostTracker = old;
            }
            hostTracker.add(newLatencyNanos);
        }
        public void updateMin() {
            long newMin = .;
            long now = System.nanoTime();
            for (HostLatencyTracker tracker : .values()) {
                TimestampedAverage latency = tracker.getCurrentAverage();
                if (latency != null && latency.average >= 0 && latency.nbMeasure >=  && (now - latency.timestamp) <= )
                    newMin = Math.min(newMinlatency.average);
            }
            if (newMin != .)
                 = newMin;
        }
        public long getMinAverage() {
            return ;
        }
        public TimestampedAverage latencyOf(Host host) {
            HostLatencyTracker tracker = .get(host);
            return tracker == null ? null : tracker.getCurrentAverage();
        }
        public Map<HostTimestampedAveragecurrentLatencies() {
            Map<HostTimestampedAveragemap = new HashMap<HostTimestampedAverage>(.size());
            for (Map.Entry<HostHostLatencyTrackerentry : .entrySet())
                map.put(entry.getKey(), entry.getValue().getCurrentAverage());
            return map;
        }
        public void resetHost(Host host) {
            .remove(host);
        }
    }
    private static class TimestampedAverage {
        private final long timestamp;
        private final long average;
        private final long nbMeasure;
        TimestampedAverage(long timestamplong averagelong nbMeasure) {
            this. = timestamp;
            this. = average;
            this. = nbMeasure;
        }
    }
    private static class HostLatencyTracker {
        private final long thresholdToAccount;
        private final double scale;
        private final AtomicReference<TimestampedAveragecurrent = new AtomicReference<TimestampedAverage>();
        HostLatencyTracker(long scalelong thresholdToAccount) {
            this. = (double)scale// We keep in double since that's how we'll use it.
            this. = thresholdToAccount;
        }
        public void add(long newLatencyNanos) {
            TimestampedAverage previousnext;
            do {
                previous = .get();
                next = computeNextAverage(previousnewLatencyNanos);
            } while (next != null && !.compareAndSet(previousnext));
        }
        private TimestampedAverage computeNextAverage(TimestampedAverage previouslong newLatencyNanos) {
            long currentTimestamp = System.nanoTime();
            long nbMeasure = previous == null ? 1 : previous.nbMeasure + 1;
            if (nbMeasure < )
                return new TimestampedAverage(currentTimestamp, -1L, nbMeasure);
            if (previous == null || previous.average < 0)
                return new TimestampedAverage(currentTimestampnewLatencyNanosnbMeasure);
            // Note: it's possible for the delay to be 0, in which case newLatencyNanos will basically be
            // discarded. It's fine: nanoTime is precise enough in practice that even if it happens, it
            // will be very rare, and discarding a latency every once in a while is not the end of the world.
            // We do test for negative value, even though in theory that should not happen, because it seems
            // that historically there has been bugs here (https://blogs.oracle.com/dholmes/entry/inside_the_hotspot_vm_clocks)
            // so while this is almost surely not a problem anymore, there's no reason to break the computation
            // if this even happen.
            long delay = currentTimestamp - previous.timestamp;
            if (delay <= 0)
                return null;
            double scaledDelay = ((double)delay)/;
            // Note: We don't use log1p because we it's quite a bit slower and we don't care about the precision (and since we
            // refuse ridiculously big scales, scaledDelay can't be so low that scaledDelay+1 == 1.0 (due to rounding)).
            double prevWeight = Math.log(scaledDelay+1) / scaledDelay;
            long newAverage = (long)((1.0 - prevWeight) * newLatencyNanos + prevWeight * previous.average);
            return new TimestampedAverage(currentTimestampnewAveragenbMeasure);
        }
        public TimestampedAverage getCurrentAverage() {
            return .get();
        }
    }

    
Helper builder object to create a latency aware policy.

This helper allows to configure the different parameters used by LatencyAwarePolicy. The only mandatory parameter is the child policy that will be wrapped with latency awareness. The other parameters can be set through the methods of this builder, but all have defaults (that are documented in the javadoc of each method) if you don't.

If you observe that the resulting policy excludes hosts too aggressively or not enough so, the main parameters to check are the exclusion threshold (withExclusionThreshold(double)) and scale (withScale(long,java.util.concurrent.TimeUnit)).

Since:
1.0.4
    public static class Builder {
        private static final double DEFAULT_EXCLUSION_THRESHOLD = 2.0;
        private static final long DEFAULT_SCALE = ..toNanos(100);
        private static final long DEFAULT_RETRY_PERIOD = ..toNanos(10);
        private static final long DEFAULT_UPDATE_RATE = ..toNanos(100);
        private static final int DEFAULT_MIN_MEASURE = 50;
        private final LoadBalancingPolicy childPolicy;
        private double exclusionThreshold = ;
        private long scale = ;
        private long retryPeriod = ;
        private long updateRate = ;
        private int minMeasure = ;

        
Creates a new latency aware policy builder given the child policy that the resulting policy wraps.

Parameters:
childPolicy the load balancing policy to wrap with latency awareness.
        public Builder(LoadBalancingPolicy childPolicy) {
            this. = childPolicy;
        }

        
Sets the exclusion threshold to use for the resulting latency aware policy.

The exclusion threshold controls how much worse the average latency of a node must be compared to the fastest performing node for it to be penalized by the policy.

The default exclusion threshold (if this method is not called) is 2. In other words, the resulting policy excludes nodes that are more than twice slower than the fastest node.

Parameters:
exclusionThreshold the exclusion threshold to use. Must be greater or equal to 1.
Returns:
this builder.
Throws:
java.lang.IllegalArgumentException if exclusionThreshold &lt; 1.
        public Builder withExclusionThreshold(double exclusionThreshold) {
            if (exclusionThreshold < 1d)
                throw new IllegalArgumentException("Invalid exclusion threshold, must be greater than 1.");
            this. = exclusionThreshold;
            return this;
        }

        
Sets the scale to use for the resulting latency aware policy.

The scale provides control on how the weight given to older latencies decreases over time. For a given host, if a new latency \(l\) is received at time \(t\), and the previously calculated average is \(prev\) calculated at time \(t'\), then the newly calculated average \(avg\) for that host is calculated thusly: \[ d = \frac{t - t'}{scale} \\ \alpha = 1 - \left(\frac{\ln(d+1)}{d}\right) \\ avg = \alpha * l + (1-\alpha) * prev \] Typically, with a scale of 100 milliseconds (the default), if a new latency is measured and the previous measure is 10 millisecond old (so \(d=0.1\)), then \(\alpha\) will be around \(0.05\). In other words, the new latency will weight 5% of the updated average. A bigger scale will get less weight to new measurements (compared to previous ones), a smaller one will give them more weight.

The default scale (if this method is not used) is of 100 milliseconds. If unsure, try this default scale first and experiment only if it doesn't provide acceptable results (hosts are excluded too quickly or not fast enough and tuning the exclusion threshold doesn't help).

Parameters:
scale the scale to use.
unit the unit of scale.
Returns:
this builder.
Throws:
java.lang.IllegalArgumentException if scale &lte; 0.
        public Builder withScale(long scaleTimeUnit unit) {
            if (scale <= 0)
                throw new IllegalArgumentException("Invalid scale, must be strictly positive");
            this. = unit.toNanos(scale);
            return this;
        }

        
Sets the retry period for the resulting latency aware policy.

The retry period defines how long a node may be penalized by the policy before it is given a 2nd change. More precisely, a node is excluded from query plans if both his calculated average latency is exclusionThreshold times slower than the fastest node average latency (at the time the query plan is computed) and his calculated average latency has been updated since less than retryPeriod. Since penalized nodes will likely not see their latency updated, this is basically how long the policy will exclude a node.

Parameters:
retryPeriod the retry period to use.
unit the unit for retryPeriod.
Returns:
this builder.
Throws:
java.lang.IllegalArgumentException if retryPeriod &lt; 0.
        public Builder withRetryPeriod(long retryPeriodTimeUnit unit) {
            if (retryPeriod < 0)
                throw new IllegalArgumentException("Invalid retry period, must be positive");
            this. = unit.toNanos(retryPeriod);
            return this;
        }

        
Sets the update rate for the resulting latency aware policy. The update rate defines how often the minimum average latency is recomputed. While the average latency score of each node is computed iteratively (updated each time a new latency is collected), the minimum score needs to be recomputed from scratch every time, which is slightly more costly. For this reason, the minimum is only re-calculated at the given fixed rate and cached between re-calculation.

The default update rate if 100 milliseconds, which should be appropriate for most applications. In particular, note that while we want to avoid to recompute the minimum for every query, that computation is not particularly intensive either and there is no reason to use a very slow rate (more than second is probably unnecessarily slow for instance).

Parameters:
updateRate the update rate to use.
unit the unit for updateRate.
Returns:
this builder.
Throws:
java.lang.IllegalArgumentException if updateRate &lte; 0.
        public Builder withUpdateRate(long updateRateTimeUnit unit) {
            if (updateRate <= 0)
                throw new IllegalArgumentException("Invalid update rate value, must be strictly positive");
            this. = unit.toNanos(updateRate);
            return this;
        }

        
Sets the minimimum number of measurements per-host to consider for the resulting latency aware policy.

Penalizing nodes is based on an average of their recently measured average latency. This average is only meaningful if a minimum of measurements have been collected (moreover, a newly started Cassandra node will tend to perform relatively poorly on the first queries due to the JVM warmup). This is what this option controls. If less that minMeasure data points have been collected for a given host, the policy will never penalize that host. Also, the 30% first measurement will be entirely ignored (in other words, the 30% * minMeasure first measurement to a node are entirely ignored, while the 70% next ones are accounted in the latency computed but the node won't get convicted until we've had at least minMeasure measurements).

Note that the number of collected measurements for a given host is reseted if the node is restarted.

The default for this option (if this method is not called) is 50. Note that it is probably not a good idea to put this option too low if only to avoid the influence of JVM warm-up on newly restarted nodes.

Parameters:
minMeasure the minimum measurements to consider.
Returns:
this builder.
Throws:
java.lang.IllegalArgumentException if minMeasure &lt; 0.
        public Builder withMininumMeasurements(int minMeasure) {
            if (minMeasure < 0)
                throw new IllegalArgumentException("Invalid minimum measurements value, must be positive");
            this. = minMeasure;
            return this;
        }

        
Builds a new latency aware policy using the options set on this builder.

Returns:
the newly created LatencyAwarePolicy.
        public LatencyAwarePolicy build() {
        }
    }
New to GrepCode? Check out our FAQ X