Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * JBoss, Home of Professional Open Source.
   * Copyright 2006, Red Hat Middleware LLC, and individual contributors
   * as indicated by the @author tags. See the copyright.txt file in the
   * distribution for a full listing of individual contributors.
   *
   * This is free software; you can redistribute it and/or modify it
   * under the terms of the GNU Lesser General Public License as
   * published by the Free Software Foundation; either version 2.1 of
  * the License, or (at your option) any later version.
  *
  * This software is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  * Lesser General Public License for more details.
  *
  * You should have received a copy of the GNU Lesser General Public
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  */
 package org.jboss.resource.adapter.jms.inflow;
 
 
 
A generic jms session pool.

Author(s):
Adrian Brock
Version:
$Revision: 76819 $
 
 public class JmsServerSessionPool implements ServerSessionPool
 {
   
The logger
 
    private static final Logger log = Logger.getLogger(JmsServerSessionPool.class);
      
   
The activation
 
    JmsActivation activation;

   
The consumer
 
The server sessions
 
    ArrayList serverSessions = new ArrayList();
   
   
Whether the pool is stopped
 
    boolean stopped = false;
   
   
The number of sessions
 
    int sessionCount = 0;
   
   
   
Create a new session pool

Parameters:
activation the jms activation
 
    public JmsServerSessionPool(JmsActivation activation)
    {
       this. = activation;
    }

   

Returns:
the activation
 
    public JmsActivation getActivation()
    {
       return ;
    }
   
   
Start the server session pool

Throws:
Exeption for any error
 
    public void start() throws Exception
    {
       setupSessions();
       setupConsumer();
    }

   
Stop the server session pool
 
    public void stop()
    {
       teardownConsumer();
       teardownSessions();
    }
   
   {
      boolean trace = .isTraceEnabled();
      if (trace)
         .trace("getServerSession");
      ServerSession result = null;
      
      try
      {
         synchronized ()
         {
            while (true)
            {
               int sessionsSize = .size();
               
               if ()
                  throw new Exception("Cannot get a server session after the pool is stopped");
               
               else if (sessionsSize > 0)
               {
                  result = (ServerSession.remove(sessionsSize-1);
                  break;
               }
               
               else
               {
                  try
                  {
                     .wait();
                  }
                  catch (InterruptedException ignored)
                  {
                  }
               }
            }
         }
      }
      catch (Throwable t)
      {
         throw new JMSException("Unable to get a server session " + t);
      }
      
      if (trace)
         .trace("Returning server session " + result);
      
      return result;
   }

   
Return the server session

Parameters:
session the session
   protected void returnServerSession(JmsServerSession session)
   {
      synchronized ()
      {
         if ()
         {
            session.teardown();
            --;
         }
         else
            .add(session);
         .notifyAll();
      }
   }
   
   
Setup the sessions

Throws:
Exeption for any error
   protected void setupSessions() throws Exception
   {
      ArrayList clonedSessions = null;
      
      // Create the sessions
      synchronized ()
      {
         for (int i = 0; i < spec.getMaxSessionInt(); ++i)
         {
            JmsServerSession session = new JmsServerSession(this);
            .add(session);
         }
          = .size();
         clonedSessions = (ArrayList.clone();
      }
      
      // Start the sessions
      for (int i = 0; i < clonedSessions.size(); ++ i)
      {
         JmsServerSession session = (JmsServerSessionclonedSessions.get(i);
         session.setup();
      }
   }

   
Stop the sessions
   protected void teardownSessions()
   {
      synchronized ()
      {
         // Disallow any new sessions
          = true;
         .notifyAll();
         
         // Stop inactive sessions
         for (int i = 0; i < .size(); ++i)
         {
            JmsServerSession session = (JmsServerSession.get(i);
            session.teardown();
            --;
         }
         .clear();
         {        
            int attempts = 0;
            int forceClearAttempts = .getActivationSpec().getForceClearAttempts();
            long forceClearInterval = .getActivationSpec().getForceClearOnShutdownInterval();
            
            .trace(this + " force clear behavior in effect. Waiting for " + forceClearInterval + " milliseconds for " + forceClearAttempts + " attempts.");
           
            while(( > 0) && (attempts < forceClearAttempts))
            {
               try
               {
                  int currentSessions = ;
                  .wait(forceClearInterval);
                  // Number of session didn't change
                  if ( == currentSessions)
                  {
                     ++attempts;
                     .trace(this + " clear attempt failed " + attempts); 
                  }
               }
               catch(InterruptedException ignore)
               {
               }
            
            }
         }
         else
         {
            // Wait for inuse sessions
            while ( > 0)
            {
               try
               {
                  .wait();
               }
               catch (InterruptedException ignore)
               {
               }
            }
         }
      }
   }
   
   
Setup the connection consumer

Throws:
Exeption for any error
   protected void setupConsumer() throws Exception
   {
      Connection connection = .getConnection();
      String selector = spec.getMessageSelector();
      int maxMessages = spec.getMaxMessagesInt();
      if (.isTopic())
      {
         Topic topic = (Topic.getDestination();
         String subscriptionName = spec.getSubscriptionName();
         if (spec.isDurable())
             = connection.createDurableConnectionConsumer(topicsubscriptionNameselectorthismaxMessages);
         else
             = connection.createConnectionConsumer(topicselectorthismaxMessages);
      }
      else
      {
         Queue queue = (Queue.getDestination();
          = connection.createConnectionConsumer(queueselectorthismaxMessages);
      }
      .debug("Created consumer " + );
   }

   
Stop the connection consumer
   protected void teardownConsumer()
   {
      try
      {
         if ( != null)
         {
            .debug("Closing the " + );
            .close();
         }
      }
      catch (Throwable t)
      {
         .debug("Error closing the consumer " + t);
      }
   }
New to GrepCode? Check out our FAQ X