Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package io.advantageous.qbit.metrics.support;
  
  import org.slf4j.Logger;
  
 import java.net.*;
created by rhightower on 5/22/15.
 
 public class StatsDReplicator implements StatReplicatorQueueCallBackHandler {
 
     private final int flushRateIntervalMS;
     private final ByteBuffer sendBuffer;
     private final boolean multiMetrics;
 
     private final Random random = new Random();
     private final Logger logger = LoggerFactory.getLogger(StatsDReplicator.class);
     private final InetSocketAddress address;
     private final int bufferSize;
     private  DatagramChannel channel;
     private final ConcurrentHashMap<StringMetriccountMap = new ConcurrentHashMap<>();
 
 
     private long lastFlush;
     private long time;
     private long lastOpenTime;
 
     /*
     Sets
 
 StatsD supports counting unique occurences of events between flushes, using a Set to store all occurring events.
 
 uniques:765|s
 If the count at flush is 0 then you can opt to send no metric at all for this set, by setting config.deleteSets.
      */
 
     public StatsDReplicator(String hostint portboolean multiMetricsint bufferSizeint flushRateIntervalMSthrows IOException {
         this(InetAddress.getByName(host), portmultiMetricsbufferSizeflushRateIntervalMS);
     }
 
     public StatsDReplicator(InetAddress hostint portboolean multiMetricsint bufferSizeint flushRateIntervalMSthrows IOException {
          = new InetSocketAddress(hostport);
 
         this. = bufferSize;
 
         openChannel();
 
         this. = multiMetrics;
         this. = flushRateIntervalMS;
          = ByteBuffer.allocate(bufferSize + 100);
 
     }
 
     private void openChannel() {
 
 
          = Timer.timer().now();
 
 
         try {
 
             if (!=null) {
                 try {
                     .close();
                 }catch (Exception ex) {
                     .debug("unable to clean up channel connection"ex);
                 }
             }
              = DatagramChannel.open();
              = ;
             .configureBlocking(false);
             .setOption(. * 2);
 
         }catch (Exception ex) {
             .error("Unable to open channel"ex);
         }
     }
 
     protected void finalize() throws Throwable {
         super.finalize();
         flushStatSend();
     }
 
 
     @SuppressWarnings("UnusedReturnValue")
     public boolean timing(String keyint value) {
 
         return timing(keyvalue, 1.0);
    }
    public boolean timing(String keyint valuedouble sampleRate) {
        return send(sampleRate, String.format(."%s:%d|ms"keyvalue));
    }
    public boolean decrement(String key) {
        return increment(key, -1, 1.0);
    }
    public boolean decrement(String keyint magnitude) {
        return decrement(keymagnitude, 1.0);
    }
    public boolean decrement(String keyint magnitudedouble sampleRate) {
        magnitude = magnitude < 0 ? magnitude : -magnitude;
        return increment(keymagnitudesampleRate);
    }
    public boolean decrement(String... keys) {
        return increment(-1, 1.0, keys);
    }
    public boolean decrement(int magnitudeString... keys) {
        magnitude = magnitude < 0 ? magnitude : -magnitude;
        return increment(magnitude, 1.0, keys);
    }
    public boolean decrement(int magnitudedouble sampleRateString... keys) {
        magnitude = magnitude < 0 ? magnitude : -magnitude;
        return increment(magnitudesampleRatekeys);
    }
    public boolean increment(String key) {
        return increment(key, 1, 1.0);
    }
    @SuppressWarnings("UnusedReturnValue")
    public boolean increment(String keyint magnitude) {
        return increment(keymagnitude, 1.0);
    }
    public boolean increment(String keyint magnitudedouble sampleRate) {
        String stat = String.format(."%s:%s|c"keymagnitude);
        return send(sampleRatestat);
    }
    public boolean increment(int magnitudedouble sampleRateString... keys) {
        String[] stats = new String[keys.length];
        for (int i = 0; i < keys.lengthi++) {
            stats[i] = String.format(."%s:%s|c"keys[i], magnitude);
        }
        return send(sampleRatestats);
    }
    @SuppressWarnings("UnusedReturnValue")
    public boolean gauge(String keydouble magnitude) {
        return gauge(keymagnitude, 1.0);
    }
    public boolean gauge(String keydouble magnitudedouble sampleRate) {
        final String stat = String.format(."%s:%s|g"keymagnitude);
        return send(sampleRatestat);
    }
    private boolean send(double sampleRateString... stats) {
        boolean sentSomething = false// didn't send anything
        if (sampleRate < 1.0) {
            for (String stat : stats) {
                if (.nextDouble() <= sampleRate) {
                    stat = String.format(."%s|@%f"statsampleRate);
                    if (doSend(stat)) {
                        sentSomething = true;
                    }
                }
            }
        } else {
            for (String stat : stats) {
                if (doSend(stat)) {
                    sentSomething = true;
                }
            }
        }
        return sentSomething;
    }
    private boolean doSend(String stat) {
        try {
            final byte[] data = stat.getBytes("utf-8");
            // If we're going to go past the threshold of the buffer then flush.
            // the +1 is for the potential '\n' in multi_metrics below
            if (.remaining() < (data.length + 1)) {
                if (!flushStatSend()) {
                    .error("Buffer overflow, connection might be down");
                    return false;
                }
            }
            if (.position() > 0) {         // multiple metrics are separated by '\n'
                .put((byte'\n');
            }
            .put(data);   // append the data
            if (!) {
                flushStatSend();
            }
            return true;
        } catch (IOException e) {
            .error(
                    String.format("Could not send stat %s to host %s:%d",
                            .toString(), .getHostName(),
                            .getPort()), e);
            return false;
        }
    }
    @SuppressWarnings("UnusedReturnValue")
    public boolean flushStatSend() {
        try {
            final int sizeOfBuffer = .position();
            if (sizeOfBuffer <= 0) {
                return false;
            }
            final int sentByteCount = sendBufferOverChannel();
            if (sizeOfBuffer == sentByteCount) {
                return true;
            } else {
                .error(String.format(
                        "Could not send all of stat %s to host %s:%d. Only sent %d bytes out of %d bytes".toString(),
                        .getHostName(), .getPort(), sentByteCountsizeOfBuffer));
                return false;
            }
        } catch (IOException e) {
            .error(
                    String.format("Could not send stat %s to host %s:%d".toString(), .getHostName(),
                            .getPort()), e);
            return false;
        }
    }
    int resetDatagramEvery = 0;
    private int sendBufferOverChannel() throws IOException {
        if (++ > 10) {
            openChannel();
            =0;
        }
        try {
            .flip();
            /* Made this async. */
            final int sentByteCount = .send();
            .limit(.capacity());
            .rewind();
            return sentByteCount;
        }catch (IOException ex) {
            DatagramChannel oldChannel = ;
             = null;
            /* Added recovery logic. */
            if (oldChannel!=null) {
                oldChannel.close();
            }
            openChannel();
            return 0;
        }
    }
    @Override
    public void replicateCount(final String namefinal int countfinal long time) {
        if (count == 0) {
            return;
        }
       Metric localCount = .get(name);
       if (localCount == null) {
           localCount = Metric.count(name);
            .put(namelocalCount);
        }
        localCount.value += count;
    }
    @Override
    public void replicateLevel(final String namefinal int levelfinal long time) {
        Metric localCount = .get(name);
        if (localCount == null) {
            localCount = Metric.level(name);
            .put(namelocalCount);
            /* Set the initial level. */
            localCount.value = level;
            /* Send the gauge. */
            gauge(namelevel);
        }
        localCount.value = level;
    }
    @Override
    public void replicateTiming(String nameint timedlong time) {
        /* A 0 timing is not useful. */
        if (timed <= 0) {
            return;
        }
        Metric localCount = .get(name);
        if (localCount == null) {
            localCount = Metric.timing(name);
            .put(namelocalCount);
            /* Set the initial timing. */
            localCount.value = timed;
            /* Send the timing. */
            timing(nametimed);
        }


        
It would be nice to average the time.
        localCount.value = timed;
    }
    private void flushIfNeeded() {
        long delta =  - ;
        if (delta > ) {
            .entrySet().forEach(entry -> {
                if (entry.getValue(). != 0) {
                    switch (entry.getValue().) {
                        case :
                            increment(entry.getKey(), entry.getValue().);
                            break;
                        case :
                            timing(entry.getKey(), entry.getValue().);
                            break;
                        case :
                            gauge(entry.getKey(), entry.getValue().);
                            break;
                    }
                    entry.getValue(). = 0;
                }
            });
            flushStatSend();
             = ;
        }
    }
    @Override
    public void queueProcess() {
         = Timer.timer().now();
        flushIfNeeded();
        /* Reopen channel every hour so if there is a problem like last time
        we are at least fixing it once an hour.
         */
        if ( -  > (60 * 60 * 1000) ||  == null) {
            openChannel();
        }
    }
    enum MetricType {
        COUNT, LEVEL, TIMING;
    }
    final static class Metric {
        int value;
        final String name;
        final MetricType type;
        public static Metric count(String name) {
            return new Metric(name.);
        }
        public static Metric level(String name) {
            return new Metric(name.);
        }
        public static Metric timing(String name) {
            return new Metric(name.);
        }
        public Metric(String nameMetricType type) {
            this. = name;
            this. = type;
        }
    }
New to GrepCode? Check out our FAQ X