Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Copyright (C) 2010, FuseSource Corp. All rights reserved.
  
  package org.fusesource.fabric.stream.log;
  
  import org.slf4j.Logger;
  
  import java.io.*;
 import static org.fusesource.fabric.stream.log.Support.*;

Author(s):
Hiram Chirino
 
 class LogStreamer {
 
     private static final transient Logger LOG = LoggerFactory.getLogger(LogStreamer.class);
 
     static final Object EOF = new Object();
     static class QueueEntry {
         private final byte[] data;
         private final long file;
         private final long offset;
         private final int size;
 
         QueueEntry(byte data[], long filelong offsetint size) {
             this. = data;
             this. = file;
             this. = offset;
             this. = size;
         }
     }
 
     final ExecutorService inputReader = Executors.newSingleThreadExecutor();
     final ExecutorService batchReader = Executors.newSingleThreadExecutor();
     final private AtomicBoolean runAllowed = new AtomicBoolean(false);
     final ArrayBlockingQueue<Objectqueue = new ArrayBlockingQueue<Object>(1024);
 
 
     public String logFilePattern = null;
     public File positionFile = null;
     public int batchSize = 1024*256;
     public long batchTimeout = 1000;
     public long tailRetry = 500;
     public InputStream is;
     public boolean exitOnEOF;
     public Processor processor;
 
     private void updateLogPosition(long filelong offset) {
         if !=null ) {
             try {
                 writeText(, String.format("%d:%d\n"fileoffset));
             } catch (IOException e) {
                 new RuntimeException(e);
             }
         }
     }
 
     private boolean logFileExists(long file) {
         return new File(String.format(file)).exists();
     }
 
     private boolean isRunAllowed() {
         return .get();
     }
 
     public void start() {
         if(.compareAndSet(falsetrue)) {
             try {
                 .start();
             } catch (Exception e) {
                 .set(false);
                 return;
             }
 
             //
             // Start a thread which reads stdin and passes the data in big byte[] chunks
             // aligned at \n boundaries to an ArrayBlockingQueue
             //
             .execute(new Runnable() {
                 @Override
                 public void run() {
                     readInput();
                 }
             });
 
             // In another thread
             .execute(new Runnable() {
                 @Override
                 public void run() {
                     drainBatchQueue();
                 }
            });
        }
    }
    public void stop() {
        if(.compareAndSet(truefalse)) {
            .shutdown();
            .shutdown();
            .stop();
        }
        .set(false);
    }
    private void readInput() {
        if!=null ) {
            long currentFile = -1;
            long currentOffset = -1;
            try {
                if( !.exists() ) {
                    writeText("0:0");
                }
                String data = readText().trim();
                String[] split = data.split(":");
                currentFile = Long.parseLong(split[0]);
                currentOffset = Long.parseLong(split[1]);
                while(.get()) {
                    File current = new File(String.format(currentFile));
                    FileInputStream is = new FileInputStream(current);
                    try {
                        process(iscurrentFilecurrentOffset);
                        currentFile ++;
                        currentOffset = 0;
                    } finally {
                        is.close();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                return;
            }
        } else {
            if(==null) {
                 = .;
            }
            try {
                process(, 0, 0);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    private boolean process(InputStream islong filelong offsetthrows IOExceptionInterruptedException {
        // Skip to the offset.
        ifoffset > 0 ) {
            is.skip(offset);
        }
        int pos = 0;
        byte batch[] = new byte[4*1024];
        boolean eof_possible = false;
        while(isRunAllowed()) {
            int count = is.read(batchposbatch.length - pos);
            ifcount < 0  ) {
                if==null || eof_possible ) {
                    ifpos > 0 ) {
                        .put(new QueueEntry(Arrays.copyOf(batchpos), fileoffsetpos));
                    }
                    .put();
                    return true;
                } else {
                    // We won't move off the current log file until the next one is created.
                    iflogFileExists(file+1) ) {
                        // To to read 1 more time.. and then switch..
                        eof_possible = true;
                        continue;
                    } else {
                        eof_possible = false;
                        Thread.sleep();
                    }
                }
            } else {
                eof_possible = false;
                pos += count;
                int at = lastnlposition(batchpos);
                ifat >= 0 ) {
                    int len = at+1;
                    byte[] data = Arrays.copyOf(batchlen);
                    int remaining = pos-len;
                    System.arraycopy(batchlenbatch, 0,  remaining);
                    pos = remaining;
                    .put(new QueueEntry(datafileoffsetlen));
                }
                if (pos == batch.length) {
                    .put(new QueueEntry(batchfileoffsetpos));
                    batch = new byte[batch.length];
                    pos = 0;
                }
            }
            offset += count;
        }
        return false;
    }
    private void drainBatchQueue() {
        while(isRunAllowed()) {
            boolean atEOF = false;
            // loop while we are allowed, or if we are stopping loop until the queue is empty
            while (isRunAllowed() && !atEOF) {
                QueueEntry firstEntry = null;
                QueueEntry lastEntry;
                ByteArrayOutputStream batch = new ByteArrayOutputStream((int) (*1.5));
                try {
                    Object obj = .poll(1000, .);
                    if (obj == null) {
                        continue;
                    }
                    // we are done.
                    if(obj == ) {
                        atEOF = true;
                        continue;
                    }
                    //starting a new batch..
                    long start = System.currentTimeMillis();
                    long timeout = start + ;
                    lastEntry = (QueueEntry)obj;
                    iffirstEntry == null ) {
                        firstEntry = lastEntry;
                    }
                    batch.write(lastEntry.data);
                    // Fill in the rest of the batch up to the batch size or the batch timeout.
                    while(batch.size() <  && !atEOF) {
                        obj = .poll();
                        ifobj!=null ) {
                            if(obj == ) {
                                atEOF = true;
                            } else {
                                lastEntry = (QueueEntry)obj;
                                batch.write(lastEntry.data);
                            }
                        } else {
                            // gonna have to poll with a timeout..
                            long remaining = timeout - System.currentTimeMillis();
                            ifremaining > 0 ) {
                                obj = .poll(remaining.);
                                ifobj!=null ) {
                                    if(obj == ) {
                                        atEOF = true;
                                    } else {
                                        lastEntry = (QueueEntry)obj;
                                        batch.write(lastEntry.data);
                                    }
                                    continue;
                                }
                            }
                            // timeout.
                            break;
                        }
                    }
                    if(batch.size() > 0 ) {
                        byte[] body = batch.toByteArray();
                        batch.reset();
                        assert firstEntry.file == lastEntry.file;
                        HashMap<StringStringheaders = new HashMap<StringString>();
                        headers.put("at", String.format("%d:%d"firstEntry.filefirstEntry.offset));
                        final QueueEntry entry = lastEntry;
                        send(headerslastEntrybodynew Runnable() {
                            @Override
                            public void run() {
                                updateLogPosition(entry.fileentry.offset + entry.size);
                            }
                        });
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    stop();
                }
            }
            ifatEOF && isRunAllowed() ) {
                HashMap<StringStringheaders = new HashMap<StringString>();
                headers.put("EOF""true");
                send(headersnullnew byte[0], new Runnable() {
                    @Override
                    public void run() {
                        if ) {
                            System.exit(0);
                        }
                    }
                });
            }
        }
    }
    public Semaphore sendSemaphore = new Semaphore(10);
    private void send(HashMap<StringStringheadersfinal QueueEntry lastEntrybyte[] bodyfinal Runnable onComplete) {
        // We send up to 10 batches before this semaphore blocks.
        // blocks waiting for a response.
        try {
            .acquire();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
        .send(headersbodynew Callback(){
            @Override
            public void onSuccess() {
                try {
                    .release();
                    ifonComplete!=null ) {
                        onComplete.run();
                    }
                } catch (Throwable e) {
                    onFailure(e);
                }
            }
            @Override
            public void onFailure(Throwable e) {
                e.printStackTrace();
                stop();
            }
        });
    }
    public int getBatchSize() {
        return ;
    }
    public void setBatchSize(int batchSize) {
        this. = batchSize;
    }
    public long getBatchTimeout() {
        return ;
    }
    public void setBatchTimeout(long batchTimeout) {
        this. = batchTimeout;
    }
    public InputStream getIs() {
        return ;
    }
    public void setIs(InputStream is) {
        this. = is;
    }
    public boolean isExitOnEOF() {
        return ;
    }
    public void setExitOnEOF(boolean exitOnEOF) {
        this. = exitOnEOF;
    }
    public String getLogFilePattern() {
        return ;
    }
    public void setLogFilePattern(String logFilePattern) {
        this. = logFilePattern;
    }
    public File getPositionFile() {
        return ;
    }
    public void setPositionFile(File positionFile) {
        this. = positionFile;
    }
    public Processor getProcessor() {
        return ;
    }
    public void setProcessor(Processor processor) {
        this. = processor;
    }
    public long getTailRetry() {
        return ;
    }
    public void setTailRetry(long tailRetry) {
        this. = tailRetry;
    }
New to GrepCode? Check out our FAQ X