Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Copyright (C) FuseSource, Inc. http://fusesource.com Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 package org.fusesource.fabric.stream.log;
 
 
 import javax.jms.*;
 import java.io.File;
 
 import static org.fusesource.fabric.stream.log.Support.compress;
 import static org.fusesource.fabric.stream.log.Support.displayResourceFile;

Author(s):
Hiram Chirino
 
 public class Producer {
     
     private String logFilePattern = null;
     private File positionFile = null;
     private String broker;
     private String destination;
     private int batchSize = 1024*64;
     private long batchTimeout = 1000*5;
     private boolean compress = true;
     private InputStream is = null;
 
     public static void main(String[] argsthrows Exception {
         Producer producer = new Producer();
         
         // Process the arguments
         LinkedList<Stringargl = new LinkedList<String>(Arrays.asList(args));
         while(!argl.isEmpty()) {
             try {
                 String arg = argl.removeFirst();
                 if"--help".equals(arg) ) {
                     displayHelpAndExit(0);
                 } else if"--broker".equals(arg) ) {
                     producer.broker = shift(argl);
                 } else if"--destination".equals(arg) ) {
                     producer.destination = shift(argl);
                 } else if"--batch-size".equals(arg) ) {
                     producer.batchSize = Integer.parseInt(shift(argl));
                 } else if"--batch-timeout".equals(arg) ) {
                     producer.batchTimeout =  Long.parseLong(shift(argl));
                 } else if"--compress".equals(arg) ) {
                     producer.compress = Boolean.parseBoolean(shift(argl));
                 } else if"--log-file".equals(arg) ) {
                     producer.logFilePattern = shift(argl);
                 } else if"--position-file".equals(arg) ) {
                     producer.positionFile = new File(shift(argl));
                 } else {
                     ..println("Invalid usage: unknown option: "+arg);
                     displayHelpAndExit(1);
                 }
             } catch (NumberFormatException e) {
                 ..println("Invalid usage: argument not a number");
                 displayHelpAndExit(1);
             }
         }
 
         ifproducer.logFilePattern!=null ^ producer.positionFile!=null ) {
             ..println("Invalid usage: --log-file and --position-file but both be set.");
             displayHelpAndExit(1);
         }
         ifproducer.broker==null ) {
             ..println("Invalid usage: --broker option not specified.");
             displayHelpAndExit(1);
         }
         ifproducer.destination==null ) {
             ..println("Invalid usage: --destination option not specified.");
             displayHelpAndExit(1);
         }
 
         producer.execute();
         System.exit(0);
    }
    private static String shift(LinkedList<Stringargl) {
        if(argl.isEmpty()) {
            ..println("Invalid usage: Missing argument");
            displayHelpAndExit(1);
        }
        return argl.removeFirst();
    }
    private static void displayHelpAndExit(int exitCode) {
        displayResourceFile("producer-usage.txt");
        System.exit(exitCode);
    }
    private void execute() throws Exception {
        LogStreamer source = configure();
        source.start();
        // block until the process is killed.
        synchronized (this) {
            while(true) {
                this.wait();
            }
        }
    }
    public LogStreamer configure() throws Exception {
        Processor processor = new Processor() {
            Connection connection;
            Session session;
            ActiveMQMessageProducer producer;
            @Override
            public void start() throws JMSException {
                ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
                 = factory.createConnection();
                .start();
                 = .createSession(false.);
                if(!=null) {
                    .setDeliveryMode(.);
                } else {
                    .setDeliveryMode(.);
                }
            }
            @Override
            public void stop() {
                new Thread() {
                    @Override
                    public void run() {
                        try {
                            .stop();
                        } catch (JMSException e) {
                        }
                    }
                }.start();
            }
            @Override
            public void send(HashMap<StringStringheadersbyte[] datafinal Callback onComplete) {
                try {
                    BytesMessage msg = .createBytesMessage();
                    .send(msgnew AsyncCallback(){
                        public void onSuccess() {
                            onComplete.onSuccess();
                        }
                        public void onException(JMSException exception) {
                            onComplete.onFailure(exception);
                        }
                    });
                } catch (JMSException e) {
                    onComplete.onFailure(e);
                }
            }
        };
        if() {
            final Processor next = processor;
            processor = new Processor() {
                @Override
                public void start() throws Exception {
                    next.start();
                }
                @Override
                public void stop() {
                    next.stop();
                }
                @Override
                public void send(HashMap<StringStringheadersbyte[] dataCallback onComplete) {
                    next.send(headerscompress(data), onComplete);
                }
            };
        }
        LogStreamer streamer = new LogStreamer();
        streamer.setBatchSize();
        streamer.setBatchTimeout();
        streamer.setIs();
        streamer.setLogFilePattern();
        streamer.setPositionFile();
        if==null ) {
            streamer.setExitOnEOF(true);
        }
        streamer.setProcessor(processor);
        return streamer;
    }
    public int getBatchSize() {
        return ;
    }
    public void setBatchSize(int batchSize) {
        this. = batchSize;
    }
    public long getBatchTimeout() {
        return ;
    }
    public void setBatchTimeout(long batchTimeout) {
        this. = batchTimeout;
    }
    public String getBroker() {
        return ;
    }
    public void setBroker(String broker) {
        this. = broker;
    }
    public boolean isCompress() {
        return ;
    }
    public void setCompress(boolean compress) {
        this. = compress;
    }
    public String getDestination() {
        return ;
    }
    public void setDestination(String destination) {
        this. = destination;
    }
    public InputStream getIs() {
        return ;
    }
    public void setIs(InputStream is) {
        this. = is;
    }
    public String getLogFilePattern() {
        return ;
    }
    public void setLogFilePattern(String logFilePattern) {
        this. = logFilePattern;
    }
    public File getPositionFile() {
        return ;
    }
    public void setPositionFile(File positionFile) {
        this. = positionFile;
    }
New to GrepCode? Check out our FAQ X