Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  //  The contents of this file are subject to the Mozilla Public License
  //  Version 1.1 (the "License"); you may not use this file except in
  //  compliance with the License. You may obtain a copy of the License
  //  at
  //  Software distributed under the License is distributed on an "AS IS"
  //  basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  //  the License for the specific language governing rights and
  //  limitations under the License.
 //  The Original Code is RabbitMQ.
 //  The Initial Developer of the Original Code is VMware, Inc.
 //  Copyright (c) 2007-2011 VMware, Inc.  All rights reserved.
 package com.rabbitmq.client.impl;
 import java.util.Map;
 import java.util.Set;
Manages a set of channels, indexed by channel number (1.._channelMax).
 public final class ChannelManager {
     private static final int SHUTDOWN_TIMEOUT_SECONDS = 10;

Monitor for _channelMap and channelNumberAllocator
     private final Object monitor = new Object();
Mapping from 1.._channelMax to ChannelN instance
         private final Map<IntegerChannelN_channelMap = new HashMap<IntegerChannelN>();
         private final IntAllocator channelNumberAllocator;
     private final ConsumerWorkService workService;
     private final Set<CountDownLatchshutdownSet = new HashSet<CountDownLatch>();

Maximum channel number available on this connection.
     private final int _channelMax;
     public int getChannelMax(){
       return ;
     public ChannelManager(ConsumerWorkService workServiceint channelMax) {
         if (channelMax == 0) {
             // The framing encoding only allows for unsigned 16-bit integers
             // for the channel number
             channelMax = (1 << 16) - 1;
          = channelMax;
          = new IntAllocator(1, channelMax);
         this. = workService;

Looks up a channel on this connection.

channelNumber the number of the required channel
the channel on this connection with number channelNumber
UnknownChannelException if there is no channel with number channelNumber on this connection
     public ChannelN getChannel(int channelNumber) {
         synchronized (this.) {
             ChannelN ch = .get(channelNumber);
             if(ch == nullthrow new UnknownChannelException(channelNumber);
             return ch;
     public void handleSignal(ShutdownSignalException signal) {
         synchronized(this.) {
             channels = new HashSet<ChannelN>(.values());
         for (ChannelN channel : channels) {
     private void scheduleShutdownProcessing() {
         final Set<CountDownLatchsdSet = new HashSet<CountDownLatch>();
         final ConsumerWorkService ssWorkService = ;
         Thread shutdownThread = new Threadnew Runnable() {
             public void run() {
                 for (CountDownLatch latch : sdSet) {
                     try { latch.await(.); } catch (Throwable e) { }
            }}, "ConsumerWorkServiceShutdown");
    public ChannelN createChannel(AMQConnection connectionthrows IOException {
        ChannelN ch;
        synchronized (this.) {
            int channelNumber = .allocate();
            if (channelNumber == -1) {
                return null;
            } else {
                ch = addNewChannel(connectionchannelNumber);
        }; // now that it's been safely added
        return ch;
    public ChannelN createChannel(AMQConnection connectionint channelNumberthrows IOException {
        ChannelN ch;
        synchronized (this.) {
            if (.reserve(channelNumber)) {
                ch = addNewChannel(connectionchannelNumber);
            } else {
                return null;
        }; // now that it's been safely added
        return ch;
    private ChannelN addNewChannel(AMQConnection connectionint channelNumberthrows IOException {
        if (.containsKey(channelNumber)) {
            // That number's already allocated! Can't do it
            // This should never happen unless something has gone
            // badly wrong with our implementation.
            throw new IllegalStateException("We have attempted to "
                    + "create a channel with a number that is already in "
                    + "use. This should never happen. "
                    + "Please report this as a bug.");
        ChannelN ch = new ChannelN(connectionchannelNumberthis.);
        .put(ch.getChannelNumber(), ch);
        return ch;

Remove the channel from the channel map and free the number for re-use. This method must be safe to call multiple times on the same channel. If it is not then things go badly wrong.
    public void releaseChannelNumber(ChannelN channel) {
        // Warning, here be dragons. Not great big ones, but little baby ones
        // which will nibble on your toes and occasionally trip you up when
        // you least expect it. (Pixies? HP2)
        // Basically, there's a race that can end us up here. It almost never
        // happens, but it's easier to repair it when it does than prevent it
        // from happening in the first place.
        // If we end up doing a Channel.close in one thread and a
        // with the same channel number in another, the two can overlap in such
        // a way as to cause disconnectChannel on the old channel to try to
        // remove the new one. Ideally we would fix this race at the source,
        // but it's much easier to just catch it here.
        synchronized (this.) {
            int channelNumber = channel.getChannelNumber();
            ChannelN existing = .remove(channelNumber);
            // Nothing to do here. Move along.
            if (existing == null)
            // Oops, we've gone and stomped on someone else's channel. Put it
            // back and pretend we didn't touch it.
            else if (existing != channel) {
New to GrepCode? Check out our FAQ X