Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  //  The contents of this file are subject to the Mozilla Public License
  //  Version 1.1 (the "License"); you may not use this file except in
  //  compliance with the License. You may obtain a copy of the License
  //  at http://www.mozilla.org/MPL/
  //
  //  Software distributed under the License is distributed on an "AS IS"
  //  basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  //  the License for the specific language governing rights and
  //  limitations under the License.
 //
 //  The Original Code is RabbitMQ.
 //
 //  The Initial Developer of the Original Code is VMware, Inc.
 //  Copyright (c) 2007-2012 VMware, Inc.  All rights reserved.
 //
 
 
 package com.rabbitmq.client;
 
 
Convenience class: an implementation of Consumer with straightforward blocking semantics. The general pattern for using QueueingConsumer is as follows:
 // Create connection and channel.
 ConnectionFactory factory = new ConnectionFactory();
 Connection conn = factory.newConnection();
 Channel ch1 = conn.createChannel();

 // Declare a queue and bind it to an exchange.
 String queueName = ch1.queueDeclare().getQueue();
 ch1.queueBind(queueName, exchangeName, queueName);

 // Create the QueueingConsumer and have it consume from the queue
 QueueingConsumer consumer = new QueueingConsumer(ch1);
 ch1.basicConsume(queueName, false, consumer);

 // Process deliveries
 while (/* some condition * /) {
     QueueingConsumer.Delivery delivery = consumer.nextDelivery();
     // process delivery
     ch1.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
 }
 

For a more complete example, see LogTail in the test/src/com/rabbitmq/examples directory of the source distribution.

deprecated QueueingConsumer was introduced to allow applications to overcome a limitation in the way Connection managed threads and consumer dispatching. When QueueingConsumer was introduced, callbacks to Consumers were made on the Connection's thread. This had two main drawbacks. Firstly, the Consumer could stall the processing of all Channels on the Connection. Secondly, if a Consumer made a recursive synchronous call into its Channel the client would deadlock.

QueueingConsumer provided client code with an easy way to obviate this problem by queueing incoming messages and processing them on a separate, application-managed thread.

The threading behaviour of Connection and Channel has been changed so that each Channel uses a distinct thread for dispatching to Consumers. This prevents Consumers on one Channel holding up Consumers on another and it also prevents recursive calls from deadlocking the client.

As such, it is now safe to implement Consumer directly or to extend DefaultConsumer.

 
 public class QueueingConsumer extends DefaultConsumer {
     private final BlockingQueue<Delivery_queue;
 
     // When this is non-null the queue is in shutdown mode and nextDelivery should
     // throw a shutdown signal exception.
     private volatile ShutdownSignalException _shutdown;
     private volatile ConsumerCancelledException _cancelled;
 
     // Marker object used to signal the queue is in shutdown mode.
     // It is only there to wake up consumers. The canonical representation
     // of shutting down is the presence of _shutdown.
     // Invariant: This is never on _queue unless _shutdown != null.
     private static final Delivery POISON = new Delivery(nullnullnull);
 
     public QueueingConsumer(Channel ch) {
         this(chnew LinkedBlockingQueue<Delivery>());
    }
    public QueueingConsumer(Channel chBlockingQueue<Deliveryq) {
        super(ch);
        this. = q;
    }
    @Override public void handleShutdownSignal(String consumerTag,
                                               ShutdownSignalException sig) {
         = sig;
        .add();
    }
    @Override public void handleCancel(String consumerTagthrows IOException {
         = new ConsumerCancelledException();
        .add();
    }
    @Override public void handleDelivery(String consumerTag,
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body)
        throws IOException
    {
        checkShutdown();
        this..add(new Delivery(envelopepropertiesbody));
    }

    
Encapsulates an arbitrary message - simple "bean" holder structure.
    public static class Delivery {
        private final Envelope _envelope;
        private final AMQP.BasicProperties _properties;
        private final byte[] _body;
        public Delivery(Envelope envelopeAMQP.BasicProperties propertiesbyte[] body) {
             = envelope;
             = properties;
             = body;
        }

        
Retrieve the message envelope.

Returns:
the message envelope
        public Envelope getEnvelope() {
            return ;
        }

        
Retrieve the message properties.

Returns:
the message properties
        public BasicProperties getProperties() {
            return ;
        }

        
Retrieve the message body.

Returns:
the message body
        public byte[] getBody() {
            return ;
        }
    }

    
Check if we are in shutdown mode and if so throw an exception.
    private void checkShutdown() {
        if ( != null)
            throw Utility.fixStackTrace();
    }

    
If delivery is not POISON nor null, return it.

If delivery, _shutdown and _cancelled are all null, return null.

If delivery is POISON re-insert POISON into the queue and throw an exception if POISONed for no reason.

Otherwise, if we are in shutdown mode or cancelled, throw a corresponding exception.

    private Delivery handle(Delivery delivery) {
        if (delivery ==  ||
            delivery == null && ( != null ||  != null)) {
            if (delivery == ) {
                .add();
                if ( == null &&  == null) {
                    throw new IllegalStateException(
                        "POISON in queue, but null _shutdown and null _cancelled. " +
                        "This should never happen, please report as a BUG");
                }
            }
            if (null != )
                throw Utility.fixStackTrace();
            if (null != )
                throw Utility.fixStackTrace();
        }
        return delivery;
    }

    
Main application-side API: wait for the next message delivery and return it.

Returns:
the next message
Throws:
java.lang.InterruptedException if an interrupt is received while waiting
ShutdownSignalException if the connection is shut down while waiting
    public Delivery nextDelivery()
    {
        return handle(.take());
    }

    
Main application-side API: wait for the next message delivery and return it.

Parameters:
timeout timeout in millisecond
Returns:
the next message or null if timed out
Throws:
java.lang.InterruptedException if an interrupt is received while waiting
ShutdownSignalException if the connection is shut down while waiting
    public Delivery nextDelivery(long timeout)
    {
        return handle(.poll(timeout.));
    }
New to GrepCode? Check out our FAQ X