Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * JBoss, Home of Professional Open Source.
   * Copyright 2006, Red Hat Middleware LLC, and individual contributors
   * as indicated by the @author tags. See the copyright.txt file in the
   * distribution for a full listing of individual contributors.
   *
   * This is free software; you can redistribute it and/or modify it
   * under the terms of the GNU Lesser General Public License as
   * published by the Free Software Foundation; either version 2.1 of
  * the License, or (at your option) any later version.
  *
  * This software is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  * Lesser General Public License for more details.
  *
  * You should have received a copy of the GNU Lesser General Public
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  */
 package org.jboss.resource.connectionmanager;
 
 import java.util.Set;
 
 
The internal pool implementation

Author(s):
David Jencks
Adrian Brock
Weston Price
Version:
$Revision: 76091 $
 
 {
   
The managed connection factory
 
    private final ManagedConnectionFactory mcf;

   
The connection listener factory
 
    private final ConnectionListenerFactory clf;

   
The default subject
 
    private final Subject defaultSubject;

   
The default connection request information
 
    private final ConnectionRequestInfo defaultCri;

   
The pooling parameters
 
    private final PoolParams poolParams;

   
Copy of the maximum size from the pooling parameters. Dynamic changes to this value are not compatible with the semaphore which cannot change be dynamically changed.
 
    private int maxSize;

   
The available connection event listeners
 
    private ArrayList cls;

   
The permits used to control who can checkout a connection
 
    private final Semaphore permits;

   
The log
 
    private final Logger log;

   
Whether trace is enabled
 
    private final boolean trace;

   
Stats
 
    private final Counter connectionCounter = new Counter();

   
The checked out connections
 
    private final HashSet checkedOut = new HashSet();

   
Whether the pool has been started
 
    private boolean started = false;

   
Whether the pool has been shutdown
 
    private AtomicBoolean shutdown = new AtomicBoolean(false);

   
the max connections ever checked out
 
   private volatile int maxUsedConnections = 0;

   
Create a new internal pool

Parameters:
mcf the managed connection factory
subject the subject
cri the connection request information
poolParams the pooling parameters
log the log
         ConnectionRequestInfo criPoolParams poolParamsLogger log)
   {
      this. = mcf;
      this. = clf;
       = subject;
       = cri;
      this. = poolParams;
      this. = poolParams.maxSize;
      this. = log;
      this. = log.isTraceEnabled();
       = new ArrayList(this.);
       = new Semaphore(this.);
  
      if(poolParams.prefill){
         
         PoolFiller.fillPool(this);
         
      }
   }

   
Initialize the pool
   protected void initialize()
   {
      if (. != 0)
         IdleRemover.registerPool(this.);
      if (. > 0)
      {
         .debug("Registering for background validation at interval " + .);
         ConnectionValidator.registerPool(this.);
      }
   }
   public long getAvailableConnections()
   {
      return .availablePermits();
   }
   public int getMaxConnectionsInUseCount()
   {
      return ;
   }
   public int getConnectionInUseCount()
   {
      return .size();
   }

   
todo distinguish between connection dying while match called and bad match strategy. In latter case we should put it back in the pool.
   {
      subject = (subject == null) ?  : subject;
      cri = (cri == null) ?  : cri;
      long startWait = System.currentTimeMillis();
      try
      {
         .updateBlockTime(System.currentTimeMillis() - startWait);
         
         {
            long poolBlockTime =  System.currentTimeMillis() - startWait ;
            .updateBlockTime(poolBlockTime);
            //We have a permit to get a connection. Is there one in the pool already?
            ConnectionListener cl = null;
            do
            {
               synchronized ()
               {
                  if (.get())
                  {
                     .release();
                     throw new ResourceException("The pool has been shutdown");
                  }
                  if (.size() > 0)
                  {
                     cl = (ConnectionListener.remove(.size() - 1);
                     .add(cl);
                     int size = (int) ( - .availablePermits());
                     if (size > )
                         = size;
                  }
               }
               if (cl != null)
               {
                  //Yes, we retrieved a ManagedConnection from the pool. Does it match?
                  try
                  {
                     Object matchedMC = .matchManagedConnections(Collections.singleton(cl.getManagedConnection()),
                           subjectcri);
                     if (matchedMC != null)
                     {
                        if ()
                           .trace("supplying ManagedConnection from pool: " + cl);
                        cl.grantPermit(true);
                        return cl;
                     }
                     //Match did not succeed but no exception was thrown.
                     //Either we have the matching strategy wrong or the
                     //connection died while being checked.  We need to
                     //distinguish these cases, but for now we always
                     //destroy the connection.
                     .warn("Destroying connection that could not be successfully matched: " + cl);
                     synchronized ()
                     {
                        .remove(cl);
                     }
                     doDestroy(cl);
                     cl = null;
                  }
                  catch (Throwable t)
                  {
                     .warn("Throwable while trying to match ManagedConnection, destroying connection: " + clt);
                     synchronized ()
                     {
                        .remove(cl);
                     }
                     doDestroy(cl);
                     cl = null;
                  }
                  //We made it here, something went wrong and we should validate if we should continue attempting to acquire a connection
                  if(.)
                  {
                     .trace("Fast failing for connection attempt. No more attempts will be made to acquire connection from pool and a new connection will be created immeadiately");
                     break;
                  }
               
               }
            }
            while (.size() > 0);//end of do loop
            //OK, we couldnt find a working connection from the pool.  Make a new one.
            try
            {
               //No, the pool was empty, so we have to make a new one.
               cl = createConnectionEventListener(subjectcri);
               synchronized ()
               {
                  .add(cl);
                  int size = (int) ( - .availablePermits());
                  if (size > )
                      = size;
               }
               //lack of synch on "started" probably ok, if 2 reads occur we will just
               //run fillPool twice, no harm done.
               if ( == false)
               {
                   = true;
                  if (. > 0)
                     PoolFiller.fillPool(this);
               }
               if ()
                  .trace("supplying new ManagedConnection: " + cl);
               cl.grantPermit(true);
               return cl;
            }
            catch (Throwable t)
            {
               .warn("Throwable while attempting to get a new connection: " + clt);
               //return permit and rethrow
               synchronized ()
               {
                  .remove(cl);
               }
               .release();
               JBossResourceException.rethrowAsResourceException("Unexpected throwable while trying to create a connection: " + clt);
               throw new UnreachableStatementException();
            }
         }
         else
         {
            // we timed out
            throw new ResourceException("No ManagedConnections available within configured blocking timeout ( "
                  + . + " [ms] )");
         }
      }
      catch (InterruptedException ie)
      {
         long end = System.currentTimeMillis() - startWait;
         .updateBlockTime(end);
         throw new ResourceException("Interrupted while requesting permit! Waited " + end + " ms");
      }
   }
   public void returnConnection(ConnectionListener clboolean kill)
   {
      synchronized ()
      {
         if (cl.getState() == .)
         {
            if ()
               .trace("ManagedConnection is being returned after it was destroyed" + cl);
            if (cl.hasPermit())
            {
               // release semaphore
               cl.grantPermit(false);
               .release();
            }
            return;
         }
      }
      if ()
         .trace("putting ManagedConnection back into pool kill=" + kill + " cl=" + cl);
      try
      {
         cl.getManagedConnection().cleanup();
      }
      catch (ResourceException re)
      {
         .warn("ResourceException cleaning up ManagedConnection: " + clre);
         kill = true;
      }
      synchronized ()
      {
         // We need to destroy this one
         if (cl.getState() == . || cl.getState() == .)
            kill = true;
         .remove(cl);
         // This is really an error
         if (kill == false && .size() >= .)
         {
            .warn("Destroying returned connection, maximum pool size exceeded " + cl);
            kill = true;
         }
         // If we are destroying, check the connection is not in the pool
         if (kill)
         {
            // Adrian Brock: A resource adapter can asynchronously notify us that
            // a connection error occurred.
            // This could happen while the connection is not checked out.
            // e.g. JMS can do this via an ExceptionListener on the connection.
            // I have twice had to reinstate this line of code, PLEASE DO NOT REMOVE IT!
            .remove(cl);
         }
         // return to the pool
         else
         {
            cl.used();
            if (.contains(cl) == false)
               .add(cl);
            else
               .warn("Attempt to return connection twice (ignored): " + clnew Throwable("STACKTRACE"));
         }
         if (cl.hasPermit())
         {
            // release semaphore
            cl.grantPermit(false);
            .release();
         }
      }
      if (kill)
      {
         if ()
            .trace("Destroying returned connection " + cl);
         doDestroy(cl);
      }
   }
   public void flush()
   {
      ArrayList destroy = null;
      synchronized ()
      {
         if ()
            .trace("Flushing pool checkedOut=" +  + " inPool=" + );
         // Mark checked out connections as requiring destruction
         for (Iterator i = .iterator(); i.hasNext();)
         {
            ConnectionListener cl = (ConnectionListeneri.next();
            if ()
               .trace("Flush marking checked out connection for destruction " + cl);
            cl.setState(.);
         }
         // Destroy connections in the pool
         while (.size() > 0)
         {
            ConnectionListener cl = (ConnectionListener.remove(0);
            if (destroy == null)
               destroy = new ArrayList();
            destroy.add(cl);
         }
      }
      // We need to destroy some connections
      if (destroy != null)
      {
         for (int i = 0; i < destroy.size(); ++i)
         {
            ConnectionListener cl = (ConnectionListenerdestroy.get(i);
            if ()
               .trace("Destroying flushed connection " + cl);
            doDestroy(cl);
         }
         // We destroyed something, check the minimum.
         if (.get() == false && . > 0)
            PoolFiller.fillPool(this);
      }
   }
   public void removeIdleConnections()
   {
      ArrayList destroy = null;
      long timeout = System.currentTimeMillis() - .;
      while (true)
      {
         synchronized ()
         {
            
            // Nothing left to destroy
            if (.size() == 0)
               break;
            // Check the first in the list
            ConnectionListener cl = (ConnectionListener.get(0);
            if (cl.isTimedOut(timeout) && shouldRemove())
            {
               .incTimedOut();
               // We need to destroy this one
               .remove(0);
               if (destroy == null)
                  destroy = new ArrayList();
               destroy.add(cl);
            }
            else
            {
               //They were inserted chronologically, so if this one isn't timed out, following ones won't be either.
               break;
            }
         }
      }
      // We found some connections to destroy
      if (destroy != null)
      {
         for (int i = 0; i < destroy.size(); ++i)
         {
            ConnectionListener cl = (ConnectionListenerdestroy.get(i);
            if ()
               .trace("Destroying timedout connection " + cl);
            doDestroy(cl);
         }
         // We destroyed something, check the minimum.
         if (.get() == false && . > 0)
            PoolFiller.fillPool(this);
      }
   }

   
   
For testing
   public void shutdownWithoutClear()
   {
      IdleRemover.unregisterPool(this);
      IdleRemover.waitForBackgroundThread();
      ConnectionValidator.unRegisterPool(this);
      ConnectionValidator.waitForBackgroundThread();
      fillToMin();
      .set(true);
   }
   public void shutdown()
   {
      .set(true);
      IdleRemover.unregisterPool(this);
      ConnectionValidator.unRegisterPool(this);
      flush();
   }
   public void fillToMin()
   {
      while (true)
      {
         // Get a permit - avoids a race when the pool is nearly full
         // Also avoids unnessary fill checking when all connections are checked out
         try
         {
            {
               try
               {
                  if (.get())
                     return;
                  // We already have enough connections
                  if (getMinSize() - .getGuaranteedCount() <= 0)
                     return;
                  // Create a connection to fill the pool
                  try
                  {
                     ConnectionListener cl = createConnectionEventListener();
                     synchronized ()
                     {
                        if ()
                           .trace("Filling pool cl=" + cl);
                        .add(cl);
                     }
                  }
                  catch (ResourceException re)
                  {
                     .warn("Unable to fill pool "re);
                     return;
                  }
               }
               finally
               {
                  .release();
               }
            }
         }
         catch (InterruptedException ignored)
         {
            .trace("Interrupted while requesting permit in fillToMin");
         }
      }
   }
   public int getConnectionCount()
   {
      return .getCount();
   }
   
   public long getTotalBlockTime()
   {
   }
   
   public int getTimedOut()
   {
      return .getTimedOut();
   }
   
   public long getAverageBlockTime()
   {
   }
   
   public long getMaxWaitTime()
   {
       return .getMaxWaitTime();
   }
   public int getConnectionCreatedCount()
   {
      return .getCreatedCount();
   }
   public int getConnectionDestroyedCount()
   {
   }
   
   {
      synchronized ()
      {         
         Set result = new HashSet();
         result.addAll(); 
         result.addAll(); 
         return result;
      }
   }

   
Create a connection event listener

Parameters:
subject the subject
cri the connection request information
Returns:
the new listener
Throws:
javax.resource.ResourceException for any error
         throws ResourceException
   {
      ManagedConnection mc = .createManagedConnection(subjectcri);
      .inc();
      try
      {
         return .createConnectionListener(mcthis);
      }
      catch (ResourceException re)
      {
         .dec();
         mc.destroy();
         throw re;
      }
   }

   
Destroy a connection

Parameters:
cl the connection to destroy
   private void doDestroy(ConnectionListener cl)
   {
      if (cl.getState() == .)
      {
         .trace("ManagedConnection is already destroyed " + cl);
         return;
      }
      .dec();
      try
      {
         cl.getManagedConnection().destroy();
      }
      catch (Throwable t)
      {
         .debug("Exception destroying ManagedConnection " + clt);
      }
   }
   
   private boolean shouldRemove()
   {      
      boolean remove = true;
      
      if(.)
      {
         remove = .size() > .;
         
         .trace("StrictMin is active. Current connection will be removed is " + remove);
         
      }
      
      return remove;
      
   }
   
   public void validateConnections() throws Exception
   {
      if ()
         .trace("Attempting to  validate connections for pool " + this);
      {
         boolean destroyed = false;
         try
         {
            while (true)
            {
               ConnectionListener cl = null;
               synchronized ()
               {
                  if (.size() == 0)
                  {
                     break;
                  }
                  cl = removeForFrequencyCheck();
               }
               if (cl == null)
               {
                  break;
               }
               try
               {
                  Set candidateSet = Collections.singleton(cl.getManagedConnection());
                  if ( instanceof ValidatingManagedConnectionFactory)
                  {
                     ValidatingManagedConnectionFactory vcf = (ValidatingManagedConnectionFactory;
                     candidateSet = vcf.getInvalidConnections(candidateSet);
                     if (candidateSet != null && candidateSet.size() > 0)
                     {
                        if (cl.getState() != .)
                        {
                           doDestroy(cl);
                           destroyed = true;
                        }
                     }
                  }
                  else
                  {
                     .warn("warning: background validation was specified with a non compliant ManagedConnectionFactory interface.");
                  }
               }
               finally
               {
                  if(!destroyed)
                  {
                     synchronized ()
                     {
                        returnForFrequencyCheck(cl);
                     }
                     
                  }
               }
            }
         }
         finally
         {
            .release();
            if (destroyed && .get() == false && . > 0)
            {
               PoolFiller.fillPool(this);
            }
         }
      }
   }
   {
      .debug("Checking for connection within frequency");
      ConnectionListener cl = null;
      for (Iterator iter = .iterator(); iter.hasNext();)
      {
         cl = (ConnectionListeneriter.next();
         long lastCheck = cl.getLastValidatedTime();
         if ((System.currentTimeMillis() - lastCheck) >= .)
         {
            .remove(cl);
            break;
         }
         else
         {
            cl = null;
         }
      }
      return cl;
   }
   {
      .debug("Returning for connection within frequency");
      cl.setLastValidatedTime(System.currentTimeMillis());
      .add(cl);
   }
   
Guard against configurations or dynamic changes that may increase the minimum beyond the maximum
   private int getMinSize()
   {
      if (. > )
         return ;
      
      return .;
   }
   public static class PoolParams
   {
	   public int minSize = 0;
		public int maxSize = 10;
		public int blockingTimeout = 30000; // milliseconds
		public long idleTimeout = 1000 * 60 * 30; // milliseconds, 30 minutes.
		public long backgroundInterval = 0;
		public boolean prefill;
      
        public boolean stictMin;
        
        //Do we want to immeadiately break when a connection cannot be matched and not evaluate the rest of the pool?
        public boolean useFastFail;
   }

   
Stats
   private static class Counter
   {
      private int created = 0;
      private int destroyed = 0;
      // Total wait time to get Connection from Pool.
      private long totalBlockTime;
      
      // Idle timed out Connection Count.
      private int timedOut;
      // The maximum wait time */      
      private long maxWaitTime;
      
      synchronized int getGuaranteedCount()
      {
         return  - ;
      }
      int getCount()
      {
         return  - ;
      }
      int getCreatedCount()
      {
         return ;
      }
      int getDestroyedCount()
      {
         return ;
      }
      synchronized void inc()
      {
         ++;
      }
      synchronized void dec()
      {
         ++;
      }
   
      synchronized void updateBlockTime(long latest)
      {
          += latest;
         if ( < latest)
             = latest;
      }
      long getTotalBlockTime()
      {
         return ;
      }
 
      int getTimedOut()
      {
         return ;
      }
      
      synchronized void incTimedOut()
      {
         ++;
      }
      long getMaxWaitTime()
      {
          return ;
      }
   }
New to GrepCode? Check out our FAQ X