Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  //  The contents of this file are subject to the Mozilla Public License
  //  Version 1.1 (the "License"); you may not use this file except in
  //  compliance with the License. You may obtain a copy of the License
  //  at http://www.mozilla.org/MPL/
  //
  //  Software distributed under the License is distributed on an "AS IS"
  //  basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
  //  the License for the specific language governing rights and
  //  limitations under the License.
 //
 //  The Original Code is RabbitMQ.
 //
 //  The Initial Developer of the Original Code is VMware, Inc.
 //  Copyright (c) 2007-2011 VMware, Inc.  All rights reserved.
 //
 
 package com.rabbitmq.client.impl;
 
 import java.util.Map;
 import java.util.Set;
 
Manages a set of channels, indexed by channel number (1.._channelMax).
 
 
 public final class ChannelManager {
     private static final int SHUTDOWN_TIMEOUT_SECONDS = 10;

    
Monitor for _channelMap and channelNumberAllocator
 
     private final Object monitor = new Object();
        
Mapping from 1.._channelMax to ChannelN instance
 
         private final Map<IntegerChannelN_channelMap = new HashMap<IntegerChannelN>();
         private final IntAllocator channelNumberAllocator;
 
     private final ConsumerWorkService workService;
 
     private final Set<CountDownLatchshutdownSet = new HashSet<CountDownLatch>();

    
Maximum channel number available on this connection.
 
     private final int _channelMax;
 
     public int getChannelMax(){
       return ;
     }
 
     public ChannelManager(ConsumerWorkService workServiceint channelMax) {
         if (channelMax == 0) {
             // The framing encoding only allows for unsigned 16-bit integers
             // for the channel number
             channelMax = (1 << 16) - 1;
         }
          = channelMax;
          = new IntAllocator(1, channelMax);
         
         this. = workService;
     }

    
Looks up a channel on this connection.

Parameters:
channelNumber the number of the required channel
Returns:
the channel on this connection with number channelNumber
Throws:
UnknownChannelException if there is no channel with number channelNumber on this connection
 
     public ChannelN getChannel(int channelNumber) {
         synchronized (this.) {
             ChannelN ch = .get(channelNumber);
             if(ch == nullthrow new UnknownChannelException(channelNumber);
             return ch;
         }
     }
 
     public void handleSignal(ShutdownSignalException signal) {
         Set<ChannelNchannels;
         synchronized(this.) {
             channels = new HashSet<ChannelN>(.values());
         }
         for (ChannelN channel : channels) {
             releaseChannelNumber(channel);
             channel.processShutdownSignal(signaltruetrue);
             .add(channel.getShutdownLatch());
         }
         scheduleShutdownProcessing();
     }
 
     private void scheduleShutdownProcessing() {
         final Set<CountDownLatchsdSet = new HashSet<CountDownLatch>();
         final ConsumerWorkService ssWorkService = ;
         Thread shutdownThread = new Threadnew Runnable() {
             public void run() {
                 for (CountDownLatch latch : sdSet) {
                     try { latch.await(.); } catch (Throwable e) { }
                }
                ssWorkService.shutdown();
            }}, "ConsumerWorkServiceShutdown");
        shutdownThread.setDaemon(true);
        shutdownThread.start();
    }
    public ChannelN createChannel(AMQConnection connectionthrows IOException {
        ChannelN ch;
        synchronized (this.) {
            int channelNumber = .allocate();
            if (channelNumber == -1) {
                return null;
            } else {
                ch = addNewChannel(connectionchannelNumber);
            }
        }
        ch.open(); // now that it's been safely added
        return ch;
    }
    public ChannelN createChannel(AMQConnection connectionint channelNumberthrows IOException {
        ChannelN ch;
        synchronized (this.) {
            if (.reserve(channelNumber)) {
                ch = addNewChannel(connectionchannelNumber);
            } else {
                return null;
            }
        }
        ch.open(); // now that it's been safely added
        return ch;
    }
    private ChannelN addNewChannel(AMQConnection connectionint channelNumberthrows IOException {
        if (.containsKey(channelNumber)) {
            // That number's already allocated! Can't do it
            // This should never happen unless something has gone
            // badly wrong with our implementation.
            throw new IllegalStateException("We have attempted to "
                    + "create a channel with a number that is already in "
                    + "use. This should never happen. "
                    + "Please report this as a bug.");
        }
        ChannelN ch = new ChannelN(connectionchannelNumberthis.);
        .put(ch.getChannelNumber(), ch);
        return ch;
    }

    
Remove the channel from the channel map and free the number for re-use. This method must be safe to call multiple times on the same channel. If it is not then things go badly wrong.
    public void releaseChannelNumber(ChannelN channel) {
        // Warning, here be dragons. Not great big ones, but little baby ones
        // which will nibble on your toes and occasionally trip you up when
        // you least expect it. (Pixies? HP2)
        // Basically, there's a race that can end us up here. It almost never
        // happens, but it's easier to repair it when it does than prevent it
        // from happening in the first place.
        // If we end up doing a Channel.close in one thread and a Channel.open
        // with the same channel number in another, the two can overlap in such
        // a way as to cause disconnectChannel on the old channel to try to
        // remove the new one. Ideally we would fix this race at the source,
        // but it's much easier to just catch it here.
        synchronized (this.) {
            int channelNumber = channel.getChannelNumber();
            ChannelN existing = .remove(channelNumber);
            // Nothing to do here. Move along.
            if (existing == null)
                return;
            // Oops, we've gone and stomped on someone else's channel. Put it
            // back and pretend we didn't touch it.
            else if (existing != channel) {
                .put(channelNumberexisting);
                return;
            }
            .free(channelNumber);
        }
    }
New to GrepCode? Check out our FAQ X