Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package com.yammer.metrics.stats;
  
  
 
 import static java.lang.Math.exp;
 import static java.lang.Math.min;

An exponentially-decaying random sample of longs. Uses Cormode et al's forward-decaying priority reservoir sampling method to produce a statistically representative sample, exponentially biased towards newer entries.

See also:
Cormode et al. Forward Decay: A Practical Time Decay Model for Streaming Systems. ICDE '09: Proceedings of the 2009 IEEE International Conference on Data Engineering (2009)
 
 public class ExponentiallyDecayingSample implements Sample {
     private static final long RESCALE_THRESHOLD = ..toNanos(1);
     private final ConcurrentSkipListMap<DoubleLongvalues;
     private final ReentrantReadWriteLock lock;
     private final double alpha;
     private final int reservoirSize;
     private final AtomicLong count = new AtomicLong(0);
     private volatile long startTime;
     private final AtomicLong nextScaleTime = new AtomicLong(0);
     private final Clock clock;

    
Creates a new ExponentiallyDecayingSample.

Parameters:
reservoirSize the number of samples to keep in the sampling reservoir
alpha the exponential decay factor; the higher this is, the more biased the sample will be towards newer values
 
     public ExponentiallyDecayingSample(int reservoirSizedouble alpha) {
         this(reservoirSizealpha, Clock.defaultClock());
     }

    
Creates a new ExponentiallyDecayingSample.

Parameters:
reservoirSize the number of samples to keep in the sampling reservoir
alpha the exponential decay factor; the higher this is, the more biased the sample will be towards newer values
 
     public ExponentiallyDecayingSample(int reservoirSizedouble alphaClock clock) {
         this. = new ConcurrentSkipListMap<DoubleLong>();
         this. = new ReentrantReadWriteLock();
         this. = alpha;
         this. = reservoirSize;
         this. = clock;
         clear();
     }
 
     @Override
     public void clear() {
         lockForRescale();
         try {
             .clear();
             .set(0);
             this. = currentTimeInSeconds();
             .set(.tick() + );
         } finally {
             unlockForRescale();
         }
     }
 
     @Override
     public int size() {
         return (intmin(.get());
     }
 
     @Override
     public void update(long value) {
         update(valuecurrentTimeInSeconds());
     }

    
Adds an old value with a fixed timestamp to the sample.

Parameters:
value the value to be added
timestamp the epoch timestamp of value in seconds
 
     public void update(long valuelong timestamp) {
 
         rescaleIfNeeded();
 
         lockForRegularUsage();
         try {
             final double priority = weight(timestamp - ) / ThreadLocalRandom.current()
                                                                                      .nextDouble();
             final long newCount = .incrementAndGet();
             if (newCount <= ) {
                .put(priorityvalue);
            } else {
                Double first = .firstKey();
                if (first < priority) {
                    if (.putIfAbsent(priorityvalue) == null) {
                        // ensure we always remove an item
                        while (.remove(first) == null) {
                            first = .firstKey();
                        }
                    }
                }
            }
        } finally {
            unlockForRegularUsage();
        }
    }
    private void rescaleIfNeeded() {
        final long now = .tick();
        final long next = .get();
        if (now >= next) {
            rescale(nownext);
        }
    }
    @Override
    public Snapshot getSnapshot() {
        lockForRegularUsage();
        try {
            return new Snapshot(.values());
        } finally {
            unlockForRegularUsage();
        }
    }
    private long currentTimeInSeconds() {
        return ..toSeconds(.time());
    }
    private double weight(long t) {
        return exp( * t);
    }
    /* "A common feature of the above techniques—indeed, the key technique that
     * allows us to track the decayed weights efficiently—is that they maintain
     * counts and other quantities based on g(ti − L), and only scale by g(t − L)
     * at query time. But while g(ti −L)/g(t−L) is guaranteed to lie between zero
     * and one, the intermediate values of g(ti − L) could become very large. For
     * polynomial functions, these values should not grow too large, and should be
     * effectively represented in practice by floating point values without loss of
     * precision. For exponential functions, these values could grow quite large as
     * new values of (ti − L) become large, and potentially exceed the capacity of
     * common floating point types. However, since the values stored by the
     * algorithms are linear combinations of g values (scaled sums), they can be
     * rescaled relative to a new landmark. That is, by the analysis of exponential
     * decay in Section III-A, the choice of L does not affect the final result. We
     * can therefore multiply each value based on L by a factor of exp(−α(L′ − L)),
     * and obtain the correct value as if we had instead computed relative to a new
     * landmark L′ (and then use this new L′ at query time). This can be done with
     * a linear pass over whatever data structure is being used."
     */
    private void rescale(long nowlong next) {
        if (.compareAndSet(nextnow + )) {
            lockForRescale();
            try {
                final long oldStartTime = ;
                this. = currentTimeInSeconds();
                final ArrayList<Doublekeys = new ArrayList<Double>(.keySet());
                for (Double key : keys) {
                    final Long value = .remove(key);
                    .put(key * exp(- * ( - oldStartTime)), value);
                }
                // make sure the counter is in sync with the number of stored samples.
                .set(.size());
            } finally {
                unlockForRescale();
            }
        }
    }
    private void unlockForRescale() {
        .writeLock().unlock();
    }
    private void lockForRescale() {
        .writeLock().lock();
    }
    private void lockForRegularUsage() {
        .readLock().lock();
    }
    private void unlockForRegularUsage() {
        .readLock().unlock();
    }
New to GrepCode? Check out our FAQ X