Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  //  The contents of this file are subject to the Mozilla Public License
  //  Version 1.1 (the "License"); you may not use this file except in
  //  compliance with the License. You may obtain a copy of the License
  //  at http://www.mozilla.org/MPL/
  //
  //  Software distributed under the License is distributed on an "AS IS"
  //  basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  //  the License for the specific language governing rights and
  //  limitations under the License.
 //
 //  The Original Code is RabbitMQ.
 //
 //  The Initial Developer of the Original Code is VMware, Inc.
 //  Copyright (c) 2007-2011 VMware, Inc.  All rights reserved.
 //
 
 
 package com.rabbitmq.client.impl;
 
 import java.util.Map;
 
Concrete class representing and managing an AMQP connection to a broker.

To create a broker connection, use com.rabbitmq.client.ConnectionFactory. See com.rabbitmq.client.Connection for an example.

 
 public class AMQConnection extends ShutdownNotifierComponent implements Connection {
    
Timeout used while waiting for AMQP handshaking to complete (milliseconds)
 
     public static final int HANDSHAKE_TIMEOUT = 10000;

    
Retrieve a copy of the default table of client properties that will be sent to the server during connection startup. This method is called when each new ConnectionFactory instance is constructed.

Returns:
a map of client properties
See also:
com.rabbitmq.client.Connection.getClientProperties()
 
     public static final Map<StringObjectdefaultClientProperties() {
         Map<StringObjectcapabilities = new HashMap<StringObject>();
         capabilities.put("publisher_confirms"true);
         capabilities.put("exchange_exchange_bindings"true);
         capabilities.put("basic.nack"true);
         capabilities.put("consumer_cancel_notify"true);
         return buildTable(new Object[] {
                 "product", LongStringHelper.asLongString("RabbitMQ"),
                 "version", LongStringHelper.asLongString(.),
                 "platform", LongStringHelper.asLongString("Java"),
                 "copyright", LongStringHelper.asLongString(
                     "Copyright (C) 2007-2011 VMware, Inc."),
                 "information", LongStringHelper.asLongString(
                     "Licensed under the MPL. See http://www.rabbitmq.com/"),
                 "capabilities"capabilities
             });
     }
 
     private static final Version clientVersion =
         new Version(....);

    
The special channel 0 (not managed by the _channelManager)
 
     private final AMQChannel _channel0 = new AMQChannel(this, 0) {
         @Override public boolean processAsync(Command cthrows IOException {
             return getConnection().processControlCommand(c);
         }
     };
 
     private final ConsumerWorkService _workService;

    
Frame source/sink
 
     private final FrameHandler _frameHandler;

    
Flag controlling the main driver loop's termination
 
     private volatile boolean _running = false;

    
Handler for (uncaught) exceptions that crop up in the AMQConnection.MainLoop.
    private final ExceptionHandler _exceptionHandler;

    
Object used for blocking main application thread when doing all the necessary connection shutdown operations
    private final BlockingCell<Object_appContinuation = new BlockingCell<Object>();

    
Flag indicating whether the client received Connection.Close message from the broker
    private volatile boolean _brokerInitiatedShutdown;

    
Manages heart-beat sending for this connection
    private final HeartbeatSender _heartbeatSender;
    private final String _virtualHost;
    private final Map<StringObject_clientProperties;
    private final SaslConfig saslConfig;
    private final int requestedHeartbeat;
    private final int requestedChannelMax;
    private final int requestedFrameMax;
    private final String username;
    private final String password;
    /* State modified after start - all volatile */

    
Maximum frame length, or zero if no limit is set
    private volatile int _frameMax = 0;
    
Count of socket-timeouts that have happened without any incoming frames
    private volatile int _missedHeartbeats = 0;
    
Currently-configured heart-beat interval, in seconds. 0 meaning none.
    private volatile int _heartbeat = 0;
    
Object that manages a set of channels
    private volatile ChannelManager _channelManager;
    
Saved server properties field from connection.start
    private volatile Map<StringObject_serverProperties;

    
Protected API - respond, in the driver thread, to a ShutdownSignal.

Parameters:
channel the channel to disconnect
    public final void disconnectChannel(ChannelN channel) {
        ChannelManager cm = ;
        if (cm != null)
            cm.releaseChannelNumber(channel);
    }
    private final void ensureIsOpen()
        throws AlreadyClosedException
    {
        if (!isOpen()) {
            throw new AlreadyClosedException("Attempt to use closed connection"this);
        }
    }

    
    public InetAddress getAddress() {
        return .getAddress();
    }

    
    public int getPort() {
        return .getPort();
    }
    public FrameHandler getFrameHandler(){
        return ;
    }

    
    public Map<StringObjectgetServerProperties() {
        return ;
    }

    
Construct a new connection using a default ExeceptionHandler

Parameters:
username name used to establish connection
password for username
frameHandler for sending and receiving frames on this connection
executor thread pool service for consumer threads for channels on this connection
virtualHost
clientProperties client info used in negotiating with the server
requestedFrameMax max size of frame offered
requestedChannelMax max number of channels offered
requestedHeartbeat heart-beat in seconds offered
saslConfig sasl configuration hook
    public AMQConnection(String username,
                         String password,
                         FrameHandler frameHandler,
                         ExecutorService executor,
                         String virtualHost,
                         Map<StringObjectclientProperties,
                         int requestedFrameMax,
                         int requestedChannelMax,
                         int requestedHeartbeat,
                         SaslConfig saslConfig)
    {
        this(username,
             password,
             frameHandler,
             executor,
             virtualHost,
             clientProperties,
             requestedFrameMax,
             requestedChannelMax,
             requestedHeartbeat,
             saslConfig,
             new DefaultExceptionHandler());
    }

    
Construct a new connection

Parameters:
username name used to establish connection
password for username
frameHandler for sending and receiving frames on this connection
executor thread pool service for consumer threads for channels on this connection
virtualHost
clientProperties client info used in negotiating with the server
requestedFrameMax max size of frame offered
requestedChannelMax max number of channels offered
requestedHeartbeat heart-beat in seconds offered
saslConfig sasl configuration hook
execeptionHandler handler for exceptions using this connection
    public AMQConnection(String username,
                         String password,
                         FrameHandler frameHandler,
                         ExecutorService executor,
                         String virtualHost,
                         Map<StringObjectclientProperties,
                         int requestedFrameMax,
                         int requestedChannelMax,
                         int requestedHeartbeat,
                         SaslConfig saslConfig,
                         ExceptionHandler execeptionHandler)
    {
        checkPreconditions();
        this. = username;
        this. = password;
        this. = frameHandler;
        this. = virtualHost;
        this. = execeptionHandler;
        this. = new HashMap<StringObject>(clientProperties);
        this. = requestedFrameMax;
        this. = requestedChannelMax;
        this. = requestedHeartbeat;
        this. = saslConfig;
        this.  = new ConsumerWorkService(executor);
        this. = null;
        this. = new HeartbeatSender(frameHandler);
        this. = false;
    }

    
Start up the connection, including the MainLoop thread. Sends the protocol version negotiation header, and runs through Connection.Start/.StartOk, Connection.Tune/.TuneOk, and then calls Connection.Open and waits for the OpenOk. Sets heart-beat and frame max values after tuning has taken place.

Throws:
java.io.IOException if an error is encountered; sub-classes com.rabbitmq.client.ProtocolVersionMismatchException and com.rabbitmq.client.PossibleAuthenticationFailureException will be thrown in the corresponding circumstances.
    public void start()
        throws IOException
    {
        this. = true;
        // Make sure that the first thing we do is to send the header,
        // which should cause any socket errors to show up for us, rather
        // than risking them pop out in the MainLoop
        AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =
            new AMQChannel.SimpleBlockingRpcContinuation();
        // We enqueue an RPC continuation here without sending an RPC
        // request, since the protocol specifies that after sending
        // the version negotiation header, the client (connection
        // initiator) is to wait for a connection.start method to
        // arrive.
        .enqueueRpc(connStartBlocker);
        // The following two lines are akin to AMQChannel's
        // transmit() method for this pseudo-RPC.
        .sendHeader();
        // start the main loop going
        Thread ml = new MainLoop();
        ml.setName("AMQP Connection " + getHostAddress() + ":" + getPort());
        ml.start();
        AMQP.Connection.Start connStart = null;
        try {
            connStart =
                (AMQP.Connection.StartconnStartBlocker.getReply().getMethod();
             = Collections.unmodifiableMap(connStart.getServerProperties());
            Version serverVersion =
                new Version(connStart.getVersionMajor(),
                            connStart.getVersionMinor());
            if (!Version.checkVersion(serverVersion)) {
                .close(); //this will cause mainLoop to terminate
                throw new ProtocolVersionMismatchException(,
                                                           serverVersion);
            }
        } catch (ShutdownSignalException sse) {
            throw AMQChannel.wrap(sse);
        }
        String[] mechanisms = connStart.getMechanisms().toString().split(" ");
        SaslMechanism sm = this..getSaslMechanism(mechanisms);
        if (sm == null) {
            throw new IOException("No compatible authentication mechanism found - " +
                    "server offered [" + connStart.getMechanisms() + "]");
        }
        LongString challenge = null;
        LongString response = sm.handleChallenge(nullthis.this.);
        AMQP.Connection.Tune connTune = null;
        do {
            Method method = (challenge == null)
                ? new AMQP.Connection.StartOk.Builder()
                                .clientProperties()
                                .mechanism(sm.getName())
                                .response(response)
                      .build()
                : new AMQP.Connection.SecureOk.Builder().response(response).build();
            try {
                Method serverResponse = .rpc(method).getMethod();
                if (serverResponse instanceof AMQP.Connection.Tune) {
                    connTune = (AMQP.Connection.TuneserverResponse;
                } else {
                    challenge = ((AMQP.Connection.SecureserverResponse).getChallenge();
                    response = sm.handleChallenge(challengethis.this.);
                }
            } catch (ShutdownSignalException e) {
                throw new PossibleAuthenticationFailureException(e);
            }
        } while (connTune == null);
        int channelMax =
            negotiatedMaxValue(this.,
                               connTune.getChannelMax());
         = new ChannelManager(this.channelMax);
        int frameMax =
            negotiatedMaxValue(this.,
                               connTune.getFrameMax());
        this. = frameMax;
        int heartbeat =
            negotiatedMaxValue(this.,
                               connTune.getHeartbeat());
        setHeartbeat(heartbeat);
                            .channelMax(channelMax)
                            .frameMax(frameMax)
                            .heartbeat(heartbeat)
                          .build());
                                  .virtualHost()
                                .build());
        return;
    }

    
Private API - check required preconditions and protocol invariants
    private static final void checkPreconditions() {
        AMQCommand.checkPreconditions();
    }

    
    public int getChannelMax() {
        ChannelManager cm = ;
        if (cm == nullreturn 0;
        return cm.getChannelMax();
    }

    
    public int getFrameMax() {
        return ;
    }

    
    public int getHeartbeat() {
        return ;
    }

    
Protected API - set the heartbeat timeout. Should only be called during tuning.
    public void setHeartbeat(int heartbeat) {
        try {
            .setHeartbeat(heartbeat);
             = heartbeat;
            // Divide by four to make the maximum unwanted delay in
            // sending a timeout be less than a quarter of the
            // timeout setting.
            .setTimeout(heartbeat * 1000 / 4);
        } catch (SocketException se) {
            // should do more here?
        }
    }
    public Map<StringObjectgetClientProperties() {
        return new HashMap<StringObject>();
    }

    
Protected API - retrieve the current ExceptionHandler
        return ;
    }

    
Public API -
    public Channel createChannel(int channelNumberthrows IOException {
        ensureIsOpen();
        ChannelManager cm = ;
        if (cm == nullreturn null;
        return cm.createChannel(thischannelNumber);
    }

    
Public API -
    public Channel createChannel() throws IOException {
        ensureIsOpen();
        ChannelManager cm = ;
        if (cm == nullreturn null;
        return cm.createChannel(this);
    }

    
Public API - sends a frame directly to the broker.
    public void writeFrame(Frame fthrows IOException {
        .writeFrame(f);
    }
    private static final int negotiatedMaxValue(int clientValueint serverValue) {
        return (clientValue == 0 || serverValue == 0) ?
            Math.max(clientValueserverValue) :
            Math.min(clientValueserverValue);
    }
    private class MainLoop extends Thread {

        
Channel reader thread main loop. Reads a frame, and if it is not a heartbeat frame, dispatches it to the channel it refers to. Continues running until the "running" flag is set false by shutdown().
        @Override public void run() {
            try {
                while () {
                    Frame frame = .readFrame();
                    if (frame != null) {
                         = 0;
                        if (frame.type == .) {
                            // Ignore it: we've already just reset the heartbeat counter.
                        } else {
                            if (frame.channel == 0) { // the special channel
                                .handleFrame(frame);
                            } else {
                                if (isOpen()) {
                                    // If we're still _running, but not isOpen(), then we
                                    // must be quiescing, which means any inbound frames
                                    // for non-zero channels (and any inbound commands on
                                    // channel zero that aren't Connection.CloseOk) must
                                    // be discarded.
                                    ChannelManager cm = ;
                                    if (cm != null) {
                                        cm.getChannel(frame.channel).handleFrame(frame);
                                    }
                                }
                            }
                        }
                    } else {
                        // Socket timeout waiting for a frame.
                        // Maybe missed heartbeat.
                        handleSocketTimeout();
                    }
                }
            } catch (EOFException ex) {
                if (!)
                    shutdown(exfalseextrue);
            } catch (Throwable ex) {
                                                                            ex);
                shutdown(exfalseextrue);
            } finally {
                // Finally, shut down our underlying data connection.
                .close();
                .set(null);
                notifyListeners();
            }
        }
    }

    
Called when a frame-read operation times out

Throws:
com.rabbitmq.client.MissedHeartbeatException if heart-beats have been missed
    private void handleSocketTimeout() throws MissedHeartbeatException {
        if ( == 0) { // No heart-beating
            return;
        }
        // We check against 8 = 2 * 4 because we need to wait for at
        // least two complete heartbeat setting intervals before
        // complaining, and we've set the socket timeout to a quarter
        // of the heartbeat setting in setHeartbeat above.
        if (++ > (2 * 4)) {
            throw new MissedHeartbeatException("Heartbeat missing with heartbeat = " +
                                                + " seconds");
        }
    }

    
Handles incoming control commands on channel zero.

    public boolean processControlCommand(Command cthrows IOException
    {
        // Similar trick to ChannelN.processAsync used here, except
        // we're interested in whole-connection quiescing.
        // See the detailed comments in ChannelN.processAsync.
        Method method = c.getMethod();
        if (isOpen()) {
            if (method instanceof AMQP.Connection.Close) {
                handleConnectionClose(c);
                return true;
            } else {
                return false;
            }
        } else {
            if (method instanceof AMQP.Connection.Close) {
                // Already shutting down, so just send back a CloseOk.
                try {
                    .quiescingTransmit(new AMQP.Connection.CloseOk.Builder().build());
                } catch (IOException ioe) {
                    Utility.emptyStatement();
                }
                return true;
            } else if (method instanceof AMQP.Connection.CloseOk) {
                // It's our final "RPC". Time to shut down.
                 = false;
                // If Close was sent from within the MainLoop we
                // will not have a continuation to return to, so
                // we treat this as processed in that case.
                return !.isOutstandingRpc();
            } else { // Ignore all others.
                return true;
            }
        }
    }
    public void handleConnectionClose(Command closeCommand) {
        ShutdownSignalException sse = shutdown(closeCommandfalsenullfalse);
        try {
        } catch (IOException ioe) {
            Utility.emptyStatement();
        }
         = true;
        Thread scw = new SocketCloseWait(sse);
        scw.setName("AMQP Connection Closing Monitor " +
                getHostAddress() + ":" + getPort());
        scw.start();
    }
    private class SocketCloseWait extends Thread {
        private ShutdownSignalException cause;
        public SocketCloseWait(ShutdownSignalException sse) {
             = sse;
        }
        @Override public void run() {
            try {
                .uninterruptibleGet();
            } finally {
                 = false;
                .notifyOutstandingRpc();
            }
        }
    }

    
Protected API - causes all attached channels to terminate with a ShutdownSignal built from the argument, and stops this connection from accepting further work from the application.

Returns:
a shutdown signal built using the given arguments
    public ShutdownSignalException shutdown(Object reason,
                         boolean initiatedByApplication,
                         Throwable cause,
                         boolean notifyRpc)
    {
        ShutdownSignalException sse = new ShutdownSignalException(true,initiatedByApplication,
                                                                  reasonthis);
        sse.initCause(cause);
        if (!setShutdownCauseIfOpen(sse)) {
            if (initiatedByApplication
                throw new AlreadyClosedException("Attempt to use closed connection"this);
        }
        // stop any heartbeating
        .shutdown();
        .processShutdownSignal(sse, !initiatedByApplicationnotifyRpc);
        ChannelManager cm = ;
        if (cm != nullcm.handleSignal(sse);
        return sse;
    }

    
Public API -
    public void close()
        throws IOException
    {
        close(-1);
    }

    
Public API -
    public void close(int timeout)
        throws IOException
    {
        close(."OK"timeout);
    }

    
Public API -
    public void close(int closeCodeString closeMessage)
        throws IOException
    {
        close(closeCodecloseMessage, -1);
    }

    
Public API -
    public void close(int closeCodeString closeMessageint timeout)
        throws IOException
    {
        close(closeCodecloseMessagetruenulltimeoutfalse);
    }

    
Public API -
    public void abort()
    {
        abort(-1);
    }

    
Public API -
    public void abort(int closeCodeString closeMessage)
    {
       abort(closeCodecloseMessage, -1);
    }

    
Public API -
    public void abort(int timeout)
    {
        abort(."OK"timeout);
    }

    
Public API -
    public void abort(int closeCodeString closeMessageint timeout)
    {
        try {
            close(closeCodecloseMessagetruenulltimeouttrue);
        } catch (IOException e) {
            Utility.emptyStatement();
        }
    }

    
Protected API - Delegates to close(int,java.lang.String,boolean,java.lang.Throwable,int,boolean), passing -1 for the timeout, and false for the abort flag.
    public void close(int closeCode,
                      String closeMessage,
                      boolean initiatedByApplication,
                      Throwable cause)
        throws IOException
    {
        close(closeCodecloseMessageinitiatedByApplicationcause, -1, false);
    }

    
Protected API - Close this connection with the given code, message, source and timeout value for all the close operations to complete. Specifies if any encountered exceptions should be ignored.
    public void close(int closeCode,
                      String closeMessage,
                      boolean initiatedByApplication,
                      Throwable cause,
                      int timeout,
                      boolean abort)
        throws IOException
    {
        boolean sync = !(Thread.currentThread() instanceof MainLoop);
        try {
            AMQP.Connection.Close reason =
                new AMQP.Connection.Close.Builder()
                    .replyCode(closeCode)
                    .replyText(closeMessage)
                .build();
            shutdown(reasoninitiatedByApplicationcausetrue);
            if(sync){
              AMQChannel.SimpleBlockingRpcContinuation k =
                  new AMQChannel.SimpleBlockingRpcContinuation();
              .quiescingRpc(reasonk);
              k.getReply(timeout);
            } else {
              .quiescingTransmit(reason);
            }
        } catch (TimeoutException tte) {
            if (!abort)
                throw new ShutdownSignalException(truetruettethis);
        } catch (ShutdownSignalException sse) {
            if (!abort)
                throw sse;
        } catch (IOException ioe) {
            if (!abort)
                throw ioe;
        } finally {
            if(sync.close();
        }
    }
    @Override public String toString() {
        return "amqp://" + this. + "@" + getHostAddress() + ":" + getPort() + ;
    }
    private String getHostAddress() {
        return getAddress() == null ? null : getAddress().getHostAddress();
    }

    
Utility for constructing a java.util.Map instance from an even-length array containing alternating String keys (on the even elements, starting at zero) and values (on the odd elements, starting at one).
    private static final Map<StringObjectbuildTable(Object[] keysValues) {
        Map<StringObjectresult = new HashMap<StringObject>();
        for (int index = 0; index < keysValues.lengthindex += 2) {
            String key = (StringkeysValues[index];
            Object value = keysValues[index + 1];
            result.put(keyvalue);
        }
        return result;
    }
New to GrepCode? Check out our FAQ X