Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package com.github.kristofa.brave.zipkin;
  
  import java.util.List;
  
 import  org.apache.thrift.TException;
 import  org.apache.thrift.protocol.TBinaryProtocol;
 import  org.apache.thrift.protocol.TProtocol;
 import  org.apache.thrift.protocol.TProtocolFactory;
 import  org.apache.thrift.transport.TIOStreamTransport;
 
Thread implementation that is responsible for submitting spans to the Zipkin span collector or Scribe. The thread takes spans from a queue. The spans are produced by ZipkinSpanCollector put on a queue and consumed and processed by this thread.

We will try to buffer spans and send them in batches to minimize communication overhead. However if the batch size is not reached within 2 polls (max 10 seconds) the available spans will be sent over anyway.

Author(s):
kristof
See also:
ZipkinSpanCollector
 
 class SpanProcessingThread implements Callable<Integer> {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(SpanProcessingThread.class);
     private static final int MAX_SUBSEQUENT_EMPTY_BATCHES = 2;
 
     private final BlockingQueue<Spanqueue;
     private final Base64 base64 = new Base64();
     private final TProtocolFactory protocolFactory;
     private boolean stop = false;
     private int processedSpans = 0;
     private final List<LogEntrylogEntries;
     private final int maxBatchSize;

    
Creates a new instance.

Parameters:
queue BlockingQueue that will provide spans.
clientProvider ThriftClientProvider that provides client used to submit spans to zipkin span collector.
maxBatchSize Max batch size. Indicates how many spans we submit to collector in 1 go.
 
     public SpanProcessingThread(final BlockingQueue<Spanqueuefinal ZipkinCollectorClientProvider clientProvider,
         final int maxBatchSize) {
         Validate.notNull(queue);
         Validate.notNull(clientProvider);
         Validate.isTrue(maxBatchSize > 0);
         this. = queue;
         this. = clientProvider;
          = new TBinaryProtocol.Factory();
         this. = maxBatchSize;
          = new ArrayList<LogEntry>(maxBatchSize);
     }

    
Requests the thread to stop.
 
     public void stop() {
          = true;
     }

    
 
     @Override
     public Integer call() {
 
         int subsequentEmptyBatches = 0;
         do {
 
             try {
                 final Span span = .poll(5, .);
                 if (span == null) {
                     subsequentEmptyBatches++;
 
                 } else {
                     .add(create(span));
                 }
 
                 if (subsequentEmptyBatches >=  && !.isEmpty()
                     || .size() >=  || !.isEmpty() && ) {
                     log();
                     .clear();
                     subsequentEmptyBatches = 0;
                 }
             } catch (final Exception e) {
                .warn("Unexpected exception flushing spans"e);
            }
        } while ( == false);
        return ;
    }
    private void log(final List<LogEntrylogEntries) {
        final long start = System.currentTimeMillis();
        final boolean success = log(.getClient(), logEntries);
         += logEntries.size();
        if (success && .isDebugEnabled()) {
            final long end = System.currentTimeMillis();
            .debug("Submitting " + logEntries.size() + " spans to service took " + (end - start) + "ms.");
        }
    }
    private boolean log(final Client clientfinal List<LogEntrylogEntries) {
        try {
            .getClient().Log(logEntries);
            return true;
        } catch (final TException e) {
            .debug("Exception when trying to log Span.  Will retry: "e.getMessage());
            final Client newClient = .exception(e);
            if (newClient != null) {
                .debug("Got new client with new connection. Logging with new client.");
                try {
                    newClient.Log(logEntries);
                    return true;
                } catch (final TException e2) {
                    .warn("Logging spans failed. " + logEntries.size() + " spans are lost!"e2);
                }
            } else {
                .warn("Logging spans failed (couldn't establish connection). " + logEntries.size() + " spans are lost!");
            }
        }
        return false;
    }
    private LogEntry create(final Span spanthrows TException {
        final String spanAsString = .encodeToString(spanToBytes(span));
        return new LogEntry("zipkin"spanAsString);
    }
    private byte[] spanToBytes(final Span thriftSpanthrows TException {
        final ByteArrayOutputStream buf = new ByteArrayOutputStream();
        final TProtocol proto = .getProtocol(new TIOStreamTransport(buf));
        thriftSpan.write(proto);
        return buf.toByteArray();
    }
New to GrepCode? Check out our FAQ X