Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * JBoss, Home of Professional Open Source.
   * Copyright 2006, Red Hat Middleware LLC, and individual contributors
   * as indicated by the @author tags. See the copyright.txt file in the
   * distribution for a full listing of individual contributors.
   *
   * This is free software; you can redistribute it and/or modify it
   * under the terms of the GNU Lesser General Public License as
   * published by the Free Software Foundation; either version 2.1 of
  * the License, or (at your option) any later version.
  *
  * This software is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  * Lesser General Public License for more details.
  *
  * You should have received a copy of the GNU Lesser General Public
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  */
 package org.jboss.resource.adapter.jms.inflow;
 
 
A generic jms session pool.

Author(s):
Adrian Brock
Weston Price
Version:
$Revision: 71554 $
 
 public class JmsServerSession implements ServerSessionMessageListenerWork,
       WorkListener
 {
   
The log
 
    private static final Logger log = Logger.getLogger(JmsServerSession.class);

   
The session pool
 
The transacted flag
 
    boolean transacted;

   
The acknowledge mode
 
    int acknowledge;

   
The session
 
    Session session;

   
Any XA session
 
    XASession xaSession;

   
The endpoint
 
    MessageEndpoint endpoint;

   
Any DLQ handler
 
 
Create a new JmsServerSession

Parameters:
pool the server session pool
 
    {
       this. = pool;
 
    }

   
Setup the session
 
    public void setup() throws Exception
    {
       JmsActivation activation = .getActivation();
      JmsActivationSpec spec = activation.getActivationSpec();
       = activation.getDLQHandler();
      Connection connection = activation.getConnection();
      // Create the session
      if (connection instanceof XAConnection
            && activation.isDeliveryTransacted())
      {
          = ((XAConnectionconnection).createXASession();
          = .getSession();
      } else
      {
          = spec.isSessionTransacted();
          = spec.getAcknowledgeModeInt();
          = connection.createSession();
      }
      // Get the endpoint
      MessageEndpointFactory endpointFactory = activation
            .getMessageEndpointFactory();
      XAResource xaResource = null;
      if (activation.isDeliveryTransacted() &&  != null)
         xaResource = .getXAResource();
       = endpointFactory.createEndpoint(xaResource);
      // Set the message listener
      .setMessageListener(this);
   }

   
Stop the session
   public void teardown()
   {
      try
      {
         if ( != null)
            .release();
      } catch (Throwable t)
      {
         .debug("Error releasing endpoint " + t);
      }
      try
      {
         if ( != null)
            .close();
      } catch (Throwable t)
      {
         .debug("Error releasing xaSession " + t);
      }
      try
      {
         if ( != null)
            .close();
      } catch (Throwable t)
      {
         .debug("Error releasing session " + t);
      }
   }
   public void onMessage(Message message)
   {
      try
      {
         try
         {
            if ( == null
                  || .handleRedeliveredMessage(message) == false)
            {
               MessageListener listener = (MessageListener;
               listener.onMessage(message);
            }
         } finally
         {
            .afterDelivery();
            if ( != null)
               .messageDelivered(message);
         }
      }
      catch (Throwable t)
      {
         .error("Unexpected error delivering message " + messaget);
         if ( != null)
            .error();
      }
   }
   public Session getSession() throws JMSException
   {
      return ;
   }
   public void start() throws JMSException
   {
      JmsActivation activation = .getActivation();
      WorkManager workManager = activation.getWorkManager();
      try
      {
         workManager.scheduleWork(this, 0, nullthis);
      } catch (WorkException e)
      {
         .error("Unable to schedule work"e);
         throw new JMSException("Unable to schedule work: " + e.toString());
      }
   }
   public void run()
   {
      try
      {
      } catch (Throwable t)
      {
         .error("Error creating transaction demarcation. Cannot continue.");
         return;
      }
      try
      {
         .run();
      } catch (Throwable t)
      {
         if ( != null)
            .error();
      } finally
      {
         if ( != null)
            .end();
          = null;
      }
   }
   {
      return new DemarcationStrategyFactory().getStrategy();
   }
   public void release()
   {
   }
   public void workAccepted(WorkEvent e)
   {
   }
   public void workCompleted(WorkEvent e)
   {
      .returnServerSession(this);
   }
   public void workRejected(WorkEvent e)
   {
      .returnServerSession(this);
   }
   public void workStarted(WorkEvent e)
   {
   }
   private class DemarcationStrategyFactory
   {
      {
         TransactionDemarcationStrategy current = null;
         final JmsActivationSpec spec = .getActivation()
               .getActivationSpec();
         final JmsActivation activation = .getActivation();
         if (activation.isDeliveryTransacted() &&  != null)
         {
            try
            {
               current = new XATransactionDemarcationStrategy();
            } catch (Throwable t)
            {
               .error(this + " error creating transaction demarcation "t);
            }
         } else
         {
            return new LocalDemarcationStrategy();
         }
         return current;
      }
   }
   private interface TransactionDemarcationStrategy
   {
      void error();
      void end();
   }
   private class LocalDemarcationStrategy implements
   {
      public void end()
      {
         final JmsActivationSpec spec = .getActivation()
               .getActivationSpec();
         if (spec.isSessionTransacted())
         {
            if ( != null)
            {
               try
               {
                  .commit();
               } catch (JMSException e)
               {
                  .error("Failed to commit session transaction"e);
               }
            }
         }
      }
      public void error()
      {
         final JmsActivationSpec spec = .getActivation()
               .getActivationSpec();
         if (spec.isSessionTransacted())
         {
            if ( != null)
               try
               {
                  /*
                   * Looks strange, but this basically means
                   * 
                   * If the underlying connection was non-XA and the transaction
                   * attribute is REQUIRED we rollback. Also, if the underlying
                   * connection was non-XA and the transaction attribute is
                   * NOT_SUPPORT and the non standard redelivery behavior is
                   * enabled we rollback to force redelivery.
                   * 
                   */
                  if (.getActivation().isDeliveryTransacted()
                        || spec.getRedeliverUnspecified())
                  {
                     .rollback();
                  }
               } catch (JMSException e)
               {
                  .error("Failed to rollback session transaction"e);
               }
         }
      }
   }
   private class XATransactionDemarcationStrategy implements
   {
      boolean trace = .isTraceEnabled();
      Transaction trans = null;
      public XATransactionDemarcationStrategy() throws Throwable
      {
         final int timeout = .getActivation().getActivationSpec()
               .getTransactionTimeout();
         if (timeout > 0)
         {
            .trace("Setting transactionTimeout for JMSSessionPool to "
                  + timeout);
            .setTransactionTimeout(timeout);
         }
         .begin();
         try
         {
             = .getTransaction();
            if ()
               .trace(JmsServerSession.this + " using tx=" + );
            if ( != null)
            {
               XAResource res = .getXAResource();
               if (!.enlistResource(res))
               {
                  throw new JMSException("could not enlist resource");
               }
               if ()
                  .trace(JmsServerSession.this + " XAResource '" + res
                        + "' enlisted.");
            }
         } catch (Throwable t)
         {
            try
            {
               .rollback();
            } catch (Throwable ignored)
            {
               .trace(JmsServerSession.this
                     + " ignored error rolling back after failed enlist",
                     ignored);
            }
            throw t;
         }
      }
      public void error()
      {
         // Mark for tollback TX via TM
         try
         {
            if ()
               .trace(JmsServerSession.this
                     + " using TM to mark TX for rollback tx=" + );
            .setRollbackOnly();
         } catch (Throwable t)
         {
            
                  .error(
                        JmsServerSession.this + " failed to set rollback only",
                        t);
         }
      }
      public void end()
      {
         try
         {
            // Use the TM to commit the Tx (assert the correct association)
            Transaction currentTx = .getTransaction();
            if (.equals(currentTx) == false)
               throw new IllegalStateException(
                     "Wrong tx association: expected " +  + " was "
                           + currentTx);
            // Marked rollback
            if (.getStatus() == .)
            {
               if ()
                  .trace(JmsServerSession.this
                        + " rolling back JMS transaction tx=" + );
               // actually roll it back
               .rollback();
               // NO XASession? then manually rollback.
               // This is not so good but
               // it's the best we can do if we have no XASession.
               if ( == null
                     && .getActivation().isDeliveryTransacted())
               {
                  .rollback();
               }
            }
            else if (.getStatus() == .)
            {
               // Commit tx
               // This will happen if
               // a) everything goes well
               // b) app. exception was thrown
               if ()
                  .trace(JmsServerSession.this
                        + " commiting the JMS transaction tx=" + );
               .commit();
               // NO XASession? then manually commit. This is not so good but
               // it's the best we can do if we have no XASession.
               if ( == null
                     && .getActivation().isDeliveryTransacted())
               {
                  .commit();
               }
            } else
            {
               .suspend();
               if ( == null
                     && .getActivation().isDeliveryTransacted())
               {
                  .rollback();
               }
            }
         } catch (Throwable t)
         {
            .error(JmsServerSession.this + " failed to commit/rollback"t);
         }
      }
   }
New to GrepCode? Check out our FAQ X