Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Copyright (C) FuseSource, Inc. http://fusesource.com Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 package org.fusesource.fabric.stream.log;
 
 
 import java.util.Map;

Author(s):
Hiram Chirino
 
 public class InputBatcher extends DefaultComponent {
 
     private static final transient Logger LOG = LoggerFactory.getLogger(InputBatcherConsumer.class);
 
     public int batchSize = 1024*256;
     public long batchTimeout = 1000;
     public InputStream is = .;
 
     @Override
     protected Endpoint createEndpoint(String uriString remainingMap<StringObjectparametersthrows Exception {
         return new InputBatcherEndpoint(urithis);
     }
 
     class InputBatcherEndpoint extends DefaultEndpoint {
 
         InputBatcherEndpoint(String endpointUriComponent component) {
             super(endpointUricomponent);
         }
 
         @Override
         public boolean isSingleton() {
             return true;
         }
 
         @Override
         public Producer createProducer() throws Exception {
             throw new UnsupportedOperationException("Producer not supported!");
         }
 
         @Override
         public Consumer createConsumer(Processor processorthrows Exception {
             return new InputBatcherConsumer(thisprocessor);
         }
 
     }
     
     static final Object EOF = new Object();
     
     class InputBatcherConsumer extends DefaultConsumer {
 
         final ArrayBlockingQueue<Objectqueue = new ArrayBlockingQueue<Object>(1024);
 
         final ExecutorService inputReader = Executors.newSingleThreadExecutor();
         final ExecutorService batchReader = Executors.newSingleThreadExecutor();
         private AsyncProcessor processor;
 
 
         InputBatcherConsumer(Endpoint endpointProcessor processor) {
             super(endpointprocessor);
         }
         
         @Override
         protected void doStart() throws Exception {
             super.doStart();
 
             //
             // Start a thread which reads stdin and passes the data in big byte[] chunks
             // aligned at \n boundaries to an ArrayBlockingQueue
             //
            .execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        byte batch[] = new byte[4*1024];
                        int pos = 0;
                        while(isRunAllowed()) {
                            // do not poll if we are suspended
                            if (isSuspending() || isSuspended()) {
                                .trace("Consumer is suspended so skip polling");
                                try {
                                    Thread.sleep(1000);
                                } catch (InterruptedException e) {
                                    .debug("Sleep interrupted, are we stopping? {}"isStopping() || isStopped());
                                }
                                continue;
                            }
                            int count = .read(batchposbatch.length - pos);
                            ifcount < 0  ) {
                                ifpos > 0 ) {
                                    byte[] data = new byte[pos];
                                    System.arraycopy(batch, 0, data, 0, pos);
                                    .put(data);
                                }
                                .put();
                                return;
                            } else {
                                pos += count;
                                int at = lastnlposition(batchpos);
                                ifat >= 0 ) {
                                    int len = at+1;
                                    byte[] data = new byte[len];
                                    System.arraycopy(batch, 0, data, 0, len);
                                    int remaining = pos-len;
                                    System.arraycopy(batchlenbatch, 0,  remaining);
                                    pos = remaining;
//                                    System.err.println("Queued "+len+" bytes");
                                    .put(data);
                                } else if (pos == batch.length) {
                                    // no nl found in the 4k read so far.. pass it along.
                                    // only other alternative is to drop it and that's not so good.
//                                    System.err.println("queuing "+pos+" data, but no nl was in it");
                                    .put(batch);
                                    batch = new byte[4*1024];
                                    pos = 0;
                                } else {
//                                    System.err.println("at "+pos+" but no nl yet");
                                }
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            // In another thread
            .execute(new Runnable() {
                @Override
                public void run() {
                    boolean atEOF = false;
                    // loop while we are allowed, or if we are stopping loop until the queue is empty
                    while (isRunAllowed() && !atEOF) {
                        // do not poll if we are suspended
                        if (isSuspending() || isSuspended()) {
                            .trace("Consumer is suspended so skip polling");
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                .debug("Sleep interrupted, are we stopping? {}"isStopping() || isStopped());
                            }
                            continue;
                        }
                        ByteArrayOutputStream batch = new ByteArrayOutputStream((int) (*1.5));
                        try {
                            Object obj = .poll(1000, .);
                            if (obj != null) {
                                
                                // we are done. 
                                if(obj == ) {
                                    atEOF = true;
                                    continue;
                                }
                                
                                //starting a new batch..
                                long start = System.currentTimeMillis();
                                long timeout = start + ;
                                try {
//                                    System.err.println("batch start, size: "+batch.size());
                                    batch.write((byte[]) obj);
                                    // Fill in the rest of the batch up to the batch size or the batch timeout.
                                    while(batch.size() <  && !atEOF) {
                                        obj = .poll();
                                        ifobj!=null ) {
                                            if(obj == ) {
                                                atEOF = true;
                                            } else {
//                                                System.err.println("batch add, size: "+batch.size());
                                                batch.write((byte[]) obj);
                                            }
                                        } else {
                                            // gonna have to poll with a timeout..
                                            long remaining = timeout - System.currentTimeMillis();
                                            ifremaining > 0 ) {
//                                                System.err.println("batch waiting "+remaining);
                                                obj = .poll(remaining.);
                                                ifobj!=null ) {
                                                    if(obj == ) {
                                                        atEOF = true;
                                                    } else {
                                                        batch.write((byte[]) obj);
                                                    }
                                                    continue;
                                                }
                                                // else timeout..
                                            }
                                            // timeout.
//                                            System.err.println("batch poll timeout");
                                            break;
                                        }
                                    }
                                    if(batch.size() > 0 ) {
                                        byte[] body = batch.toByteArray();
                                        batch.reset();
                                        Exchange exchange = getEndpoint().createExchange();
                                        Message msg = new DefaultMessage();
                                        msg.setBody(body);
                                        exchange.setIn(msg);
                                        try {
//                                            System.err.println("sending: "+body.length);
                                            getProcessor().process(exchange);
//                                            System.err.println("sent");
                                            if (exchange.getException() != null) {
                                                getExceptionHandler().handleException("Error processing exchange"exchangeexchange.getException());
                                            }
                                        } catch (Exception e) {
                                            getExceptionHandler().handleException("Error processing exchange"exchangee);
                                        }
                                    }
                                } catch (Exception e) {
                                    .info("Error processing exchange."e);
                                }
                            }
                        } catch (InterruptedException e) {
                            .debug("Sleep interrupted, are we stopping? {}"isStopping() || isStopped());
                            continue;
                        }
                    }
                    
                    ifatEOF && isRunAllowed() ) {
                        // Send an exchange to signal the end of the stream.
                        Exchange exchange = getEndpoint().createExchange();
                        Message msg = new DefaultMessage();
                        msg.setHeader("EOF""true");
                        msg.setBody(new byte[0]);
                        exchange.setIn(msg);
                        try {
//                            System.err.println("Sending EOF signal");
                            getProcessor().process(exchange);
                            if (exchange.getException() != null) {
                                getExceptionHandler().handleException("Error processing exchange"exchangeexchange.getException());
                            }
                        } catch (Exception e) {
                            getExceptionHandler().handleException("Error processing exchange"exchangee);
                        }
                        // we are done processing... lets exit cleanly.
//                        System.err.println("EOF reached shutting down.");
                        System.exit(0);
                    }
                }
            });
        }
        @Override
        protected void doStop() throws Exception {
            .shutdown();
            .shutdown();
            super.doStop();
        }
    }
    private static int lastnlposition(byte[] dataint len) {
        // have we received an entire log line yet?
        int at = -1;
        for(int i=len-1; i >= 0; i--) {
            if(data[i] == '\n') {
                at = i;
                break;
            }
        }
        return at;
    }
    public int getBatchSize() {
        return ;
    }
    public void setBatchSize(int batchSize) {
        this. = batchSize;
    }
    public long getBatchTimeout() {
        return ;
    }
    public void setBatchTimeout(long batchTimeout) {
        this. = batchTimeout;
    }
    public InputStream getIs() {
        return ;
    }
    public void setIs(InputStream is) {
        this. = is;
    }
New to GrepCode? Check out our FAQ X