Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  //  The contents of this file are subject to the Mozilla Public License
  //  Version 1.1 (the "License"); you may not use this file except in
  //  compliance with the License. You may obtain a copy of the License
  //  at http://www.mozilla.org/MPL/
  //
  //  Software distributed under the License is distributed on an "AS IS"
  //  basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  //  the License for the specific language governing rights and
  //  limitations under the License.
 //
 //  The Original Code is RabbitMQ.
 //
 //  The Initial Developer of the Original Code is VMware, Inc.
 //  Copyright (c) 2007-2012 VMware, Inc.  All rights reserved.
 //
 
 
 package com.rabbitmq.client;
 
 import java.util.Date;
 import java.util.Map;
 
Convenience class which manages a temporary reply queue for simple RPC-style communication. The class is agnostic about the format of RPC arguments / return values. It simply provides a mechanism for sending a message to an exchange with a given routing key, and waiting for a response on a reply queue.
 
 public class RpcClient {
    
Channel we are communicating on
 
     private final Channel _channel;
    
Exchange to send requests to
 
     private final String _exchange;
    
Routing key to use for requests
 
     private final String _routingKey;
    
timeout to use on call responses
 
     private final int _timeout;
    
NO_TIMEOUT value must match convention on com.rabbitmq.utility.BlockingCell.uninterruptibleGet(int)
 
     protected final static int NO_TIMEOUT = -1;

    
Map from request correlation ID to continuation BlockingCell
 
     private final Map<StringBlockingCell<Object>> _continuationMap = new HashMap<StringBlockingCell<Object>>();
    
Contains the most recently-used request correlation ID
 
     private int _correlationId;

    
The name of our private reply queue
 
     private String _replyQueue;
    
Consumer attached to our reply queue
 
     private DefaultConsumer _consumer;

    
Construct a new RpcClient that will communicate on the given channel, sending requests to the given exchange with the given routing key.

Causes the creation of a temporary private autodelete queue.

Parameters:
channel the channel to use for communication
exchange the exchange to connect to
routingKey the routing key
timeout milliseconds before timing out on wait for response
Throws:
java.io.IOException if an error is encountered
See also:
setupReplyQueue()
 
     public RpcClient(Channel channelString exchangeString routingKeyint timeoutthrows IOException {
          = channel;
          = exchange;
          = routingKey;
         if (timeout < throw new IllegalArgumentException("Timeout arguument must be NO_TIMEOUT(-1) or non-negative.");
          = timeout;
          = 0;
 
          = setupReplyQueue();
          = setupConsumer();
     }

    
Construct a new RpcClient that will communicate on the given channel, sending requests to the given exchange with the given routing key.

Causes the creation of a temporary private autodelete queue.

Waits forever for responses (that is, no timeout).

Parameters:
channel the channel to use for communication
exchange the exchange to connect to
routingKey the routing key
Throws:
java.io.IOException if an error is encountered
See also:
setupReplyQueue()
    public RpcClient(Channel channelString exchangeString routingKeythrows IOException {
        this(channelexchangeroutingKey);
    }

    
Private API - ensures the RpcClient is correctly open.

Throws:
java.io.IOException if an error is encountered
    public void checkConsumer() throws IOException {
        if ( == null) {
            throw new EOFException("RpcClient is closed");
        }
    }

    
Public API - cancels the consumer, thus deleting the temporary queue, and marks the RpcClient as closed.

Throws:
java.io.IOException if an error is encountered
    public void close() throws IOException {
        if ( != null) {
            .basicCancel(.getConsumerTag());
             = null;
        }
    }

    
Creates a server-named exclusive autodelete queue to use for receiving replies to RPC requests.

Returns:
the name of the reply queue
Throws:
java.io.IOException if an error is encountered
    protected String setupReplyQueue() throws IOException {
        return .queueDeclare(""falsetruetruenull).getQueue();
    }

    
Registers a consumer on the reply queue.

Returns:
the newly created and registered consumer
Throws:
java.io.IOException if an error is encountered
    protected DefaultConsumer setupConsumer() throws IOException {
        DefaultConsumer consumer = new DefaultConsumer() {
            @Override
            public void handleShutdownSignal(String consumerTag,
                                             ShutdownSignalException signal) {
                synchronized () {
                    for (Entry<StringBlockingCell<Object>> entry : .entrySet()) {
                        entry.getValue().set(signal);
                    }
                     = null;
                }
            }
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                synchronized () {
                    String replyId = properties.getCorrelationId();
                    BlockingCell<Objectblocker = .get(replyId);
                    .remove(replyId);
                    blocker.set(body);
                }
            }
        };
        .basicConsume(trueconsumer);
        return consumer;
    }
    public void publish(AMQP.BasicProperties propsbyte[] message)
        throws IOException
    {
        .basicPublish(propsmessage);
    }
    public byte[] primitiveCall(AMQP.BasicProperties propsbyte[] message)
    {
        checkConsumer();
        BlockingCell<Objectk = new BlockingCell<Object>();
        synchronized () {
            ++;
            String replyId = "" + ;
            props = ((props==null) ? new AMQP.BasicProperties.Builder() : props.builder())
                    .correlationId(replyId).replyTo().build();
            .put(replyIdk);
        }
        publish(propsmessage);
        Object reply = k.uninterruptibleGet();
        if (reply instanceof ShutdownSignalException) {
            ShutdownSignalException sig = (ShutdownSignalExceptionreply;
            ShutdownSignalException wrapper =
                new ShutdownSignalException(sig.isHardError(),
                                            sig.isInitiatedByApplication(),
                                            sig.getReason(),
                                            sig.getReference());
            wrapper.initCause(sig);
            throw wrapper;
        } else {
            return (byte[]) reply;
        }
    }

    
Perform a simple byte-array-based RPC roundtrip.

Parameters:
message the byte array request message to send
Returns:
the byte array response received
Throws:
ShutdownSignalException if the connection dies during our wait
java.io.IOException if an error is encountered
java.util.concurrent.TimeoutException if a response is not received within the configured timeout
    public byte[] primitiveCall(byte[] message)
        return primitiveCall(nullmessage);
    }

    
Perform a simple string-based RPC roundtrip.

Parameters:
message the string request message to send
Returns:
the string response received
Throws:
ShutdownSignalException if the connection dies during our wait
java.io.IOException if an error is encountered
java.util.concurrent.TimeoutException if a timeout occurs before the response is received
    public String stringCall(String message)
    {
        return new String(primitiveCall(message.getBytes()));
    }

    
Perform an AMQP wire-protocol-table based RPC roundtrip

There are some restrictions on the values appearing in the table:
they must be of type java.lang.String, LongString, java.lang.Integer, java.math.BigDecimal, java.util.Date, or (recursively) a java.util.Map of the enclosing type.

Parameters:
message the table to send
Returns:
the table received
Throws:
ShutdownSignalException if the connection dies during our wait
java.io.IOException if an error is encountered
java.util.concurrent.TimeoutException if a timeout occurs before a response is received
    public Map<StringObjectmapCall(Map<StringObjectmessage)
    {
        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
        MethodArgumentWriter writer = new MethodArgumentWriter(new ValueWriter(new DataOutputStream(buffer)));
        writer.writeTable(message);
        writer.flush();
        byte[] reply = primitiveCall(buffer.toByteArray());
        MethodArgumentReader reader =
            new MethodArgumentReader(new ValueReader(new DataInputStream(new ByteArrayInputStream(reply))));
        return reader.readTable();
    }

    
Perform an AMQP wire-protocol-table based RPC roundtrip, first constructing the table from an array of alternating keys (in even-numbered elements, starting at zero) and values (in odd-numbered elements, starting at one)
Restrictions on value arguments apply as in mapCall(java.util.Map).

Parameters:
keyValuePairs alternating {key, value, key, value, ...} data to send
Returns:
the table received
Throws:
ShutdownSignalException if the connection dies during our wait
java.io.IOException if an error is encountered
java.util.concurrent.TimeoutException if a timeout occurs before a response is received
    public Map<StringObjectmapCall(Object[] keyValuePairs)
    {
        Map<StringObjectmessage = new HashMap<StringObject>();
        for (int i = 0; i < keyValuePairs.lengthi += 2) {
            message.put((StringkeyValuePairs[i], keyValuePairs[i + 1]);
        }
        return mapCall(message);
    }

    
Retrieve the channel.

Returns:
the channel to which this client is connected
    public Channel getChannel() {
        return ;
    }

    
Retrieve the exchange.

Returns:
the exchange to which this client is connected
    public String getExchange() {
        return ;
    }

    
Retrieve the routing key.

Returns:
the routing key for messages to this client
    public String getRoutingKey() {
        return ;
    }

    
Retrieve the continuation map.

Returns:
the map of objects to blocking cells for this client
        return ;
    }

    
Retrieve the correlation id.

Returns:
the most recently used correlation id
    public int getCorrelationId() {
        return ;
    }

    
Retrieve the reply queue.

Returns:
the name of the client's reply queue
    public String getReplyQueue() {
        return ;
    }

    
Retrieve the consumer.

Returns:
an interface to the client's consumer object
    public Consumer getConsumer() {
        return ;
    }
New to GrepCode? Check out our FAQ X