Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  //  The contents of this file are subject to the Mozilla Public License
  //  Version 1.1 (the "License"); you may not use this file except in
  //  compliance with the License. You may obtain a copy of the License
  //  at http://www.mozilla.org/MPL/
  //
  //  Software distributed under the License is distributed on an "AS IS"
  //  basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  //  the License for the specific language governing rights and
  //  limitations under the License.
 //
 //  The Original Code is RabbitMQ.
 //
 //  The Initial Developer of the Original Code is VMware, Inc.
 //  Copyright (c) 2007-2011 VMware, Inc.  All rights reserved.
 //
 
 package com.rabbitmq.tools;
 
 import java.util.Map;
 
AMQP Protocol Analyzer program. Listens on a port (in-port) and when a connection arrives, makes an outbound connection to a host and port (out-port). Relays frames from the in-port to the out-port. Commands are decoded and printed to a supplied Tracer.Logger.

The stand-alone program (main(java.lang.String[])) prints to System.out, using a private Tracer.AsyncLogger instance. When the connection closes the program listens for a subsequent connection and traces that to the same Tracer.Logger. This continues until the program is interrupted.

Options for controlling, for example, whether command bodies are decoded, are obtained from System.properties, and are reported to the console before starting the trace.

A Tracer object may be instantiated, using one of the constructors

  • Tracer(int listenPort, String id, String host, int port, Logger logger, Properties props)
  • Tracer(String id)
  • Tracer(String id, Properties props)

    where the missing parameters default as follows:

    • listenPort defaults to 5673
    • host defaults to localhost
    • port defaults to 5672
    • logger defaults to new AsyncLogger(System.out)
    • and
    • props defaults to System.getProperties()

These constructors block waiting for a connection to arrive on the listenPort. Tracing does not begin until the tracer is start()ed which Tracer.Logger.start()s the supplied logger and creates and starts a java.lang.Thread for relaying and deconstructing the frames.

The properties specified in props are used at start() time and may be modified before this call.

 
 public class Tracer implements Runnable {
     private static final int DEFAULT_LISTEN_PORT = 5673;
     private static final String DEFAULT_CONNECT_HOST = "localhost";
     private static final int DEFAULT_CONNECT_PORT = 5672;
 
     private static final String PROP_PREFIX = "com.rabbitmq.tools.Tracer.";
 
     private static boolean getBoolProperty(String propertyNameProperties props) {
         return Boolean.parseBoolean(props.getProperty( + propertyName));
     }
 
     private static void printBoolProperty(String propNameProperties props) {
         StringBuilder sb = new StringBuilder(100);
         ..println(sb.append().append(propName)
                 .append(" = ").append(getBoolProperty(propNameprops)).toString());
     }
 
    public static void main(String[] args) {
        int listenPort = args.length > 0 ? Integer.parseInt(args[0]) : ;
        String connectHost = args.length > 1 ? args[1] : ;
        int connectPort = args.length > 2 ? Integer.parseInt(args[2]) : ;
        ..println("Usage: Tracer [<listenport> [<connecthost> [<connectport>]]]");
        ..println("   Serially traces connections on the <listenport>, logging\n"
                         + "   frames received and passing them to the connect host and port.");
        ..println("Invoked as: Tracer " + listenPort + " " + connectHost + " " + connectPort);
        Properties props = System.getProperties();
        printBoolProperty("WITHHOLD_INBOUND_HEARTBEATS"props);
        printBoolProperty("WITHHOLD_OUTBOUND_HEARTBEATS"props);
        printBoolProperty("NO_ASSEMBLE_FRAMES"props);
        printBoolProperty("NO_DECODE_FRAMES"props);
        printBoolProperty("SUPPRESS_COMMAND_BODIES"props);
        Logger logger = new AsyncLogger(.); // initially stopped
        try {
            ServerSocket server = new ServerSocket(listenPort);
            int counter = 0;
            while (true) {
                Socket conn = server.accept();
                new Tracerconn
                          , "Tracer-" + (counter++)
                          , connectHostconnectPort
                          , logger
                          ).start();
            }
        } catch (Exception e) {
            logger.stop(); // will stop shared logger thread
            e.printStackTrace();
            System.exit(1);
        }
    }
    private final Properties props;
    private final Socket inSock;
    private final Socket outSock;
    private final String idLabel;
    private final DataInputStream iis;
    private final DataOutputStream ios;
    private final DataInputStream ois;
    private final DataOutputStream oos;
    private final Logger logger;
    private final BlockingCell<ExceptionreportEnd;
    private final AtomicBoolean started;
    private Tracer(Socket sockString idString hostint portLogger loggerBlockingCell<ExceptionreportEndProperties props)
            throws IOException {
        this. = props;
        this. = sock;
        this. = new Socket(hostport);
        this. = ": <" + id + "> ";
        this. = new DataInputStream(this..getInputStream());
        this. = new DataOutputStream(this..getOutputStream());
        this. = new DataInputStream(this..getInputStream());
        this. = new DataOutputStream(this..getOutputStream());
        this. = logger;
        this. = reportEnd;
        this. = new AtomicBoolean(false);
    }
    private Tracer(Socket sockString idString hostint portLogger logger)
            throws IOException {
        this(sockidhostportloggernew BlockingCell<Exception>(), System.getProperties());
    }
    private Tracer(int listenPortString idString hostint port,
            Logger loggerBlockingCell<ExceptionreportEndProperties props)
            throws IOException {
        this(new ServerSocket(listenPort).accept(), idhostportloggerreportEndprops);
    }
    public Tracer(int listenPortString idString hostint portLogger loggerProperties propsthrows IOException {
        this(listenPortidhostportloggernew BlockingCell<Exception>(), props);
    }
    public Tracer(String idthrows IOException {
    }
    public Tracer(String idProperties propsthrows IOException {
    }
    public void start() {
        if (this..compareAndSet(falsetrue)) {
            this..start();
            new Thread(this).start();
        }
    }
    public void run() {
        try {
            byte[] handshake = new byte[8];
            this..readFully(handshake);
            this..write(handshake);
            BlockingCell<Exceptionwio = new BlockingCell<Exception>();
            new Thread(new DirectionHandler(wiotruethis.this.this.))
                    .start();
            new Thread(new DirectionHandler(wiofalsethis.this.this.))
                    .start();
            waitAndLogException(wio); // either stops => we stop
        } catch (Exception e) {
            reportAndLogNonNullException(e);
        } finally {
            try { this..close(); } catch (Exception e) { logException(e); }
            try { this..close(); } catch (Exception e) { logException(e); }
            this..setIfUnset(null);
            this..stop();
        }
    }
    private void waitAndLogException(BlockingCell<Exceptionbc) {
    }
    private void reportAndLogNonNullException(Exception e) {
        if (e!=null) {
            this..setIfUnset(e);
            logException(e);
        }
    }
    public void log(String message) {
        StringBuilder sb = new StringBuilder();
        this..log(sb.append(System.currentTimeMillis())
                          .append(this.)
                          .append(message)
                        .toString());
    }
    public void logException(Exception e) {
        log("uncaught " + Utility.makeStackTrace(e));
    }
    private class DirectionHandler implements Runnable {
        private final BlockingCell<ExceptionwaitCell;
        private final boolean silentMode;
        private final boolean noDecodeFrames;
        private final boolean noAssembleFrames;
        private final boolean suppressCommandBodies;
        private final boolean writeHeartBeats;
        private final String directionIndicator;
        private final DataInputStream inStream;
        private final DataOutputStream outStream;
        private final Map<IntegerAMQCommandcommands;
        public DirectionHandler(BlockingCell<ExceptionwaitCellboolean inBound,
                DataInputStream inStreamDataOutputStream outStreamProperties props) {
            this. = waitCell;
            this. = getBoolProperty("SILENT_MODE"props);
            this. = getBoolProperty("NO_DECODE_FRAMES"props);
            this. = getBoolProperty("NO_ASSEMBLE_FRAMES"props);
            this. = getBoolProperty("SUPPRESS_COMMAND_BODIES"props);
            this.
               = ( inBound && !getBoolProperty("WITHHOLD_INBOUND_HEARTBEATS"props))
              || (!inBound && !getBoolProperty("WITHHOLD_OUTBOUND_HEARTBEATS"props));
            this. = (inBound ? " -> " : " <- ");
            this. = inStream;
            this. = outStream;
            this. = new HashMap<IntegerAMQCommand>();
        }
        private Frame readFrame() throws IOException {
            return Frame.readFrom(this.);
        }
        private void report(int channelObject object) {
            StringBuilder sb = new StringBuilder("ch#").append(channel)
                    .append(this.).append(object);
            Tracer.this.log(sb.toString());
        }
        private void reportFrame(Frame framethrows IOException {
            switch (frame.type) {
            case .: {
                report(frame.channel, AMQImpl.readMethodFrom(frame.getInputStream()));
                break;
            }
            case .: {
                AMQContentHeader contentHeader = AMQImpl
                        .readContentHeaderFrom(frame.getInputStream());
                long remainingBodyBytes = contentHeader.getBodySize();
                StringBuilder sb = new StringBuilder("Expected body size: ")
                        .append(remainingBodyBytes).append("; ")
                        .append(contentHeader);
                report(frame.channelsb);
                break;
            }
            default:
                report(frame.channelframe);
            }
        }
        private void doFrame() throws IOException {
            Frame frame = readFrame();
            if (frame != null) {
                if (this.) {
                    frame.writeTo(this.);
                    return;
                }
                if (frame.type == .) {
                    if (this.) {
                        frame.writeTo(this.);
                        report(frame.channelframe);
                    } else {
                        report(frame.channel"(withheld) " + frame.toString());
                    }
                } else {
                    frame.writeTo(this.);
                    if (this.) {
                        report(frame.channelframe);
                    } else if (this.) {
                        reportFrame(frame);
                    } else {
                        AMQCommand cmd = this..get(frame.channel);
                        if (cmd == null) {
                            cmd = new AMQCommand();
                            this..put(frame.channelcmd);
                        }
                        if (cmd.handleFrame(frame)) {
                            report(frame.channelcmd.toString(this.));
                            .remove(frame.channel);
                        }
                    }
                }
            }
        }
        public void run() {
            try {
                while (true) {
                    doFrame();
                }
            } catch (Exception e) {
                this..setIfUnset(e);
            } finally {
                this..setIfUnset(null);
            }
        }
    }

    
Logging strings to an outputStream. Logging may be started and stopped.
    public interface Logger {
        
Start logging, that is, printing log entries written using log(java.lang.String). Multiple successive starts are equivalent to a single start.

Returns:
true if start actually started the logger; false otherwise.
        boolean start();

        
Stop logging, that is, stop printing log entries written using log(java.lang.String). Flush preceding writes. The logger can only be stopped if started. Multiple successive stops are equivalent to a single stop.

Returns:
true if stop actually stopped the logger; false otherwise.
        boolean stop();

        
Write msg to the log. This may block, and may block indefinitely if the logger is stopped.

Parameters:
msg
        void log(String msg);
    }

    
A Tracer.Logger designed to print java.lang.Strings to a designated java.io.OutputStream on a private thread.

java.lang.Strings are read from a private queue and printed to a buffered java.io.PrintStream which is periodically flushed, determined by a flushInterval.

When instantiated the private queue is created (an in-memory java.util.concurrent.ArrayBlockingQueue in this implementation) and when start()ed the private thread is created and started unless it is already present. An Tracer.AsyncLogger may be started many times, but only one thread is created.

When stop()ed either the number of starts is decremented, or, if this count reaches zero, a special element is queued which causes the private thread to end when encountered.

If the private thread is interrupted, the thread will also end, and the count set to zero, This will cause subsequent stop()s to be ignored, and the next start() will create a new thread.

log(java.lang.String) never blocks unless the private queue is full; this may never un-block if the Tracer.Logger is stopped.

    public static class AsyncLogger implements Logger {
        private static final int MIN_FLUSH_INTERVAL = 100;
        private static final int ONE_SECOND_INTERVAL = 1000;
        private static final int LOG_QUEUE_SIZE = 1024 * 1024;
        private static final int BUFFER_SIZE = 10 * 1024 * 1024;
        private final Runnable loggerRunnable;
        private final SafeCounter countStarted;
        private volatile Thread loggerThread = null;

        
Simple pair class for queue elements.

Parameters:
<L> type of left item
<R> type of right item
        private static class Pr<L,R> {
            private final L left;
            private final R right;
            public L left() { return this.; }
            public R right() { return this.; }
            public Pr(L left, R right) { this.=leftthis.=right; }
        }
        private enum LogCmd {
            STOP,
            PRINT
        }
        private final BlockingQueue<Pr<StringLogCmd> > queue = new ArrayBlockingQueue<Pr<StringLogCmd> >(
                true);

        
Same as AsyncLogger(os, flushInterval) with a one-second flush interval.

Parameters:
os OutputStream to print to.
        public AsyncLogger(OutputStream os) {
            this(os);
        }

        
Start/stoppable logger that prints to an java.io.OutputStream with flushes every flushInterval milliseconds.

Parameters:
os OutputStream to print to.
flushInterval in milliseconds, time between flushes.
        public AsyncLogger(OutputStream osint flushInterval) {
            if (flushInterval < )
                throw new IllegalArgumentException("Flush interval ("
                        + flushInterval + "ms) must be positive and at least "
                        +  + "ms.");
            this. = new SafeCounter();
            PrintStream printStream = new PrintStream(new BufferedOutputStream(
                    os), false);
            this. = new AsyncLoggerRunnable(printStream,
                    flushIntervalthis.);
        }
        public void log(String message) {
            if (message != null) {
                try {
                    this..put(new Pr<StringLogCmd>(message.));
                } catch (InterruptedException ie) {
                    throw new RuntimeException("Interrupted while logging."ie);
                }
            }
        }
        public boolean start() {
            if (this..testZeroAndIncrement()) {
                this. = new Thread(this.);
                this..start();
                return true;
            }
            return false// meaning already started
        }
        public boolean stop() {
            if (this..decrementAndTestZero()) {
                if (this. != null) {
                    try {
                        this..put(new Pr<StringLogCmd>(null.));
                    } catch (InterruptedException ie) {
                        this..interrupt();  //try harder
                        throw new RuntimeException("Interrupted while stopping."ie);
                    }
                    this. = null;
                }
                return true;
            }
            return false// meaning already stopped
        }
        private class AsyncLoggerRunnable implements Runnable {
            private final int flushInterval;
            private final PrintStream ps;
            private final BlockingQueue<Pr<StringLogCmd> > queue;
            public AsyncLoggerRunnable(PrintStream psint flushInterval,
                    BlockingQueue<Pr<StringLogCmd> > queue) {
                this. = flushInterval;
                this. = ps;
                this. = queue;
            }
            public void run() {
                try {
                    long timeOfNextFlush = System.currentTimeMillis()
                            + this.;
                    boolean printedSinceLastFlush = false;
                    while (true) {
                        long timeToNextFlush;
                        while (0 >= (timeToNextFlush = timeOfNextFlush
                                - System.currentTimeMillis())) {
                            if (printedSinceLastFlush) {
                                this..flush();
                                printedSinceLastFlush = false;
                            }
                            timeOfNextFlush += this.;
                        }
                        Pr<StringLogCmditem = this..poll(timeToNextFlush,
                                .);
                        if (item != null) {
                            if (item.left() != null) {
                                this..println(item.left());
                                printedSinceLastFlush = true;
                            }
                            if (item.right() == .break;
                        }
                    }
                    drainCurrentQueue();
                    this..println("Stopped.");
                    this..flush();
                } catch (InterruptedException ie) {
                    AsyncLogger.this..reset();
                    drainCurrentQueue();
                    this..println("Interrupted.");
                    this..flush();
                }
            }
            private void drainCurrentQueue() {
                int currentSize = this..size();
                while (currentSize-- > 0) {
                    Pr<StringLogCmditem = this..poll();
                    if (item != null && item.left() != null)
                        this..println(item.left());
                }
            }
        }
    }
    
    private static class SafeCounter {
        private final Object countMonitor = new Object();
        private int count;
        public SafeCounter() {
            this. = 0;
        }
        public boolean testZeroAndIncrement() {
            synchronized (this.) {
                int val = this.;
                this.++;
                return (val == 0);
            }
        }
        public boolean decrementAndTestZero() {
            synchronized (this.) {
                if (this. == 0) return false;
                --this.;
                return (0 == this.);
            }
        }
        public void reset() {
            synchronized (this.) {
                this. = 0;
            }
        }
    }
New to GrepCode? Check out our FAQ X