Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package com.github.kristofa.brave.zipkin;
  
  import java.util.HashSet;
  import java.util.List;
  import java.util.Set;
 
 
 import  org.apache.thrift.TException;
 
Sends spans to Zipkin collector or Scribe.

Typically the ZipkinSpanCollector should be a singleton in your application that can be used by both ClientTracer as ServerTracer.

This SpanCollector is implemented so it puts spans on a queue which are processed by a separate thread. In this way we are submitting spans asynchronously and we should have minimal overhead on application performance.

At this moment the number of processing threads is fixed and set to 1.

Author(s):
kristof
 
 public class ZipkinSpanCollector implements SpanCollector {
 
     private static final String UTF_8 = "UTF-8";
     private static final Logger LOGGER = LoggerFactory.getLogger(ZipkinSpanCollector.class);
 
     private final BlockingQueue<SpanspanQueue;
     private final ExecutorService executorService;
     private final List<Future<Integer>> futures = new ArrayList<Future<Integer>>();
     private final Set<BinaryAnnotationdefaultAnnotations = new HashSet<BinaryAnnotation>();

    
Create a new instance with default queue size (= ZipkinSpanCollectorParams.DEFAULT_QUEUE_SIZE) and default batch size (= ZipkinSpanCollectorParams.DEFAULT_BATCH_SIZE).

Parameters:
zipkinCollectorHost Host for zipkin collector.
zipkinCollectorPort Port for zipkin collector.
 
     public ZipkinSpanCollector(final String zipkinCollectorHostfinal int zipkinCollectorPort) {
         this(zipkinCollectorHostzipkinCollectorPortnew ZipkinSpanCollectorParams());
     }

    
Create a new instance.

Parameters:
zipkinCollectorHost Host for zipkin collector.
zipkinCollectorPort Port for zipkin collector.
params Zipkin Span Collector parameters.
 
     public ZipkinSpanCollector(final String zipkinCollectorHostfinal int zipkinCollectorPort,
         final ZipkinSpanCollectorParams params) {
         Validate.notEmpty(zipkinCollectorHost);
         Validate.notNull(params);
          =
             new ZipkinCollectorClientProvider(zipkinCollectorHostzipkinCollectorPortparams.getSocketTimeout());
         try {
             .setup();
         } catch (final TException e) {
             if (params.failOnSetup()) {
                 throw new IllegalStateException(e);
             } else {
                 .error("Connection could not be established during setup."e);
             }
         }
          = new ArrayBlockingQueue<Span>(params.getQueueSize());
          = Executors.newFixedThreadPool(params.getNrOfThreads());
         for (int i = 1; i <= params.getNrOfThreads(); i++) {
             final SpanProcessingThread spanProcessingThread =
                 new SpanProcessingThread(params.getBatchSize());
             .add(spanProcessingThread);
             .add(.submit(spanProcessingThread));
         }
     }

    
 
    @Override
    public void collect(final Span span) {
        final long start = System.currentTimeMillis();
        if (!.isEmpty()) {
            for (final BinaryAnnotation ba : ) {
                span.addToBinary_annotations(ba);
            }
        }
        final boolean offer = .offer(span);
        if (!offer) {
            .error("Queue rejected Span, span not submitted: " + span);
        } else {
            final long end = System.currentTimeMillis();
            if (.isDebugEnabled()) {
                .debug("Adding span to queue took " + (end - start) + "ms.");
            }
        }
    }

    
    @Override
    public void addDefaultAnnotation(final String keyfinal String value) {
        Validate.notEmpty(key);
        Validate.notNull(value);
        try {
            final ByteBuffer bb = ByteBuffer.wrap(value.getBytes());
            final BinaryAnnotation binaryAnnotation = new BinaryAnnotation();
            binaryAnnotation.setKey(key);
            binaryAnnotation.setValue(bb);
            binaryAnnotation.setAnnotation_type(.);
            .add(binaryAnnotation);
        } catch (final UnsupportedEncodingException e) {
            throw new IllegalStateException(e);
        }
    }

    
    @Override
    @PreDestroy
    public void close() {
        .info("Stopping SpanProcessingThread.");
        for (final SpanProcessingThread thread : ) {
            thread.stop();
        }
        for (final Future<Integerfuture : ) {
            try {
                final Integer spansProcessed = future.get();
                .info("SpanProcessingThread processed " + spansProcessed + " spans.");
            } catch (final Exception e) {
                .error("Exception when getting result of SpanProcessingThread."e);
            }
        }
        .shutdown();
        .close();
        .info("ZipkinSpanCollector closed.");
    }
New to GrepCode? Check out our FAQ X