Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  //  The contents of this file are subject to the Mozilla Public License
  //  Version 1.1 (the "License"); you may not use this file except in
  //  compliance with the License. You may obtain a copy of the License
  //  at http://www.mozilla.org/MPL/
  //
  //  Software distributed under the License is distributed on an "AS IS"
  //  basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  //  the License for the specific language governing rights and
  //  limitations under the License.
 //
 //  The Original Code is RabbitMQ.
 //
 //  The Initial Developer of the Original Code is VMware, Inc.
 //  Copyright (c) 2007-2011 VMware, Inc.  All rights reserved.
 //
 
 
 package com.rabbitmq.client.impl;
 
 
Base class modelling an AMQ channel. Subclasses implement com.rabbitmq.client.Channel.close() and processAsync(), and may choose to override processShutdownSignal() and rpc().

 
 public abstract class AMQChannel extends ShutdownNotifierComponent {
    
Protected; used instead of synchronizing on the channel itself, so that clients can themselves use the channel to synchronize on.
 
     protected final Object _channelMutex = new Object();

    
The connection this channel is associated with.
 
     private final AMQConnection _connection;

    
This channel's channel number.
 
     private final int _channelNumber;

    
Command being assembled
 
     private AMQCommand _command = new AMQCommand();

    
The current outstanding RPC request, if any. (Could become a queue in future.)
 
     private RpcContinuation _activeRpc = null;

    
Whether transmission of content-bearing methods should be blocked
 
     public boolean _blockContent = false;

    
Construct a channel on the given connection, with the given channel number.

Parameters:
connection the underlying connection for this channel
channelNumber the allocated reference number for this channel
 
     public AMQChannel(AMQConnection connectionint channelNumber) {
         this. = connection;
         this. = channelNumber;
     }

    
Public API - Retrieves this channel's channel number.

Returns:
the channel number
 
     public int getChannelNumber() {
         return ;
     }

    
Private API - When the Connection receives a Frame for this channel, it passes it to this method.

Parameters:
frame the incoming frame
Throws:
java.io.IOException if an error is encountered
 
     public void handleFrame(Frame framethrows IOException {
         AMQCommand command = ;
         if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line
              = new AMQCommand(); // prepare for the next one
             handleCompleteInboundCommand(command);
         }
     }

    
Placeholder until we address bug 15786 (implementing a proper exception hierarchy). In the meantime, this at least won't throw away any information from the wrapped exception.

Parameters:
ex the exception to wrap
Returns:
the wrapped exception
    public static IOException wrap(ShutdownSignalException ex) {
        IOException ioe = new IOException();
        ioe.initCause(ex);
        return ioe;
    }
    public static IOException wrap(ShutdownSignalException exString message) {
        IOException ioe = new IOException(message);
        ioe.initCause(ex);
        return ioe;
    }

    
Placeholder until we address bug 15786 (implementing a proper exception hierarchy).
    public AMQCommand exnWrappingRpc(Method m)
        throws IOException
    {
        try {
            return privateRpc(m);
        } catch (AlreadyClosedException ace) {
            // Do not wrap it since it means that connection/channel
            // was closed in some action in the past
            throw ace;
        } catch (ShutdownSignalException ex) {
            throw wrap(ex);
        }
    }

    
Private API - handle a command which has been assembled

Parameters:
command the incoming command
Throws:
java.io.IOException if there's any problem
java.io.IOException
    public void handleCompleteInboundCommand(AMQCommand commandthrows IOException {
        // First, offer the command to the asynchronous-command
        // handling mechanism, which gets to act as a filter on the
        // incoming command stream.  If processAsync() returns true,
        // the command has been dealt with by the filter and so should
        // not be processed further.  It will return true for
        // asynchronous commands (deliveries/returns/other events),
        // and false for commands that should be passed on to some
        // waiting RPC continuation.
        if (!processAsync(command)) {
            // The filter decided not to handle/consume the command,
            // so it must be some reply to an earlier RPC.
            nextOutstandingRpc().handleCommand(command);
        }
    }
    public void enqueueRpc(RpcContinuation k)
    {
        synchronized () {
            while ( != null) {
                try {
                    .wait();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
             = k;
        }
    }
    public boolean isOutstandingRpc()
    {
        synchronized () {
            return ( != null);
        }
    }
    {
        synchronized () {
            RpcContinuation result = ;
             = null;
            .notifyAll();
            return result;
        }
    }
    public void ensureIsOpen()
        throws AlreadyClosedException
    {
        if (!isOpen()) {
            throw new AlreadyClosedException("Attempt to use closed channel"this);
        }
    }

    
Protected API - sends a com.rabbitmq.client.Method to the broker and waits for the next in-bound Command from the broker: only for use from non-connection-MainLoop threads!
    public AMQCommand rpc(Method m)
        throws IOExceptionShutdownSignalException
    {
        return privateRpc(m);
    }
    private AMQCommand privateRpc(Method m)
        throws IOExceptionShutdownSignalException
    {
        rpc(mk);
        // At this point, the request method has been sent, and we
        // should wait for the reply to arrive.
        //
        // Calling getReply() on the continuation puts us to sleep
        // until the connection's reader-thread throws the reply over
        // the fence.
        return k.getReply();
    }
    public void rpc(Method mRpcContinuation k)
        throws IOException
    {
        synchronized () {
            ensureIsOpen();
            quiescingRpc(mk);
        }
    }
    public void quiescingRpc(Method mRpcContinuation k)
        throws IOException
    {
        synchronized () {
            enqueueRpc(k);
            quiescingTransmit(m);
        }
    }

    
Protected API - called by nextCommand to check possibly handle an incoming Command before it is returned to the caller of nextCommand. If this method returns true, the command is considered handled and is not passed back to nextCommand's caller; if it returns false, nextCommand returns the command as usual. This is used in subclasses to implement handling of Basic.Return and Basic.Deliver messages, as well as Channel.Close and Connection.Close.

Parameters:
command the command to handle asynchronously
Returns:
true if we handled the command; otherwise the caller should consider it "unhandled"
    public abstract boolean processAsync(Command commandthrows IOException;
    @Override public String toString() {
        return "AMQChannel(" +  + "," +  + ")";
    }

    
Protected API - respond, in the driver thread, to a com.rabbitmq.client.ShutdownSignalException.

Parameters:
signal the signal to handle
ignoreClosed the flag indicating whether to ignore the AlreadyClosedException thrown when the channel is already closed
notifyRpc the flag indicating whether any remaining rpc continuation should be notified with the given signal
    public void processShutdownSignal(ShutdownSignalException signal,
                                      boolean ignoreClosed,
                                      boolean notifyRpc) {
        try {
            synchronized () {
                if (!setShutdownCauseIfOpen(signal)) {
                    if (!ignoreClosed)
                        throw new AlreadyClosedException("Attempt to use closed channel"this);
                }
                .notifyAll();
            }
        } finally {
            if (notifyRpc)
                notifyOutstandingRpc(signal);
        }
    }
    public void notifyOutstandingRpc(ShutdownSignalException signal) {
        RpcContinuation k = nextOutstandingRpc();
        if (k != null) {
            k.handleShutdownSignal(signal);
        }
    }
    public void transmit(Method mthrows IOException {
        synchronized () {
            transmit(new AMQCommand(m));
        }
    }
    public void transmit(AMQCommand cthrows IOException {
        synchronized () {
            ensureIsOpen();
            quiescingTransmit(c);
        }
    }
    public void quiescingTransmit(Method mthrows IOException {
        synchronized () {
            quiescingTransmit(new AMQCommand(m));
        }
    }
    public void quiescingTransmit(AMQCommand cthrows IOException {
        synchronized () {
            if (c.getMethod().hasContent()) {
                while () {
                    try {
                        .wait();
                    } catch (InterruptedException e) {}
                    // This is to catch a situation when the thread wakes up during
                    // shutdown. Currently, no command that has content is allowed
                    // to send anything in a closing state.
                    ensureIsOpen();
                }
            }
            c.transmit(this);
        }
    }
    public AMQConnection getConnection() {
        return ;
    }
    public interface RpcContinuation {
        void handleCommand(AMQCommand command);
        void handleShutdownSignal(ShutdownSignalException signal);
    }
    public static abstract class BlockingRpcContinuation<T> implements RpcContinuation {
        public final BlockingValueOrException<T, ShutdownSignalException_blocker =
            new BlockingValueOrException<T, ShutdownSignalException>();
        public void handleCommand(AMQCommand command) {
            .setValue(transformReply(command));
        }
        public void handleShutdownSignal(ShutdownSignalException signal) {
            .setException(signal);
        }
        public T getReply() throws ShutdownSignalException
        {
            return .uninterruptibleGetValue();
        }
        public T getReply(int timeout)
            throws ShutdownSignalExceptionTimeoutException
        {
            return .uninterruptibleGetValue(timeout);
        }
        public abstract T transformReply(AMQCommand command);
    }
    public static class SimpleBlockingRpcContinuation
        extends BlockingRpcContinuation<AMQCommand>
    {
        public AMQCommand transformReply(AMQCommand command) {
            return command;
        }
    }
New to GrepCode? Check out our FAQ X