Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2002-2013 the original author or authors.
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *      http://www.apache.org/licenses/LICENSE-2.0
   *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package org.jadira.jms.template;
 
 import java.util.List;
 
 import  javax.jms.Destination;
 import  javax.jms.JMSException;
 import  javax.jms.Message;
 import  javax.jms.MessageConsumer;
 import  javax.jms.MessageListener;
 import  javax.jms.Session;
 
 import  org.springframework.jms.JmsException;
 import  org.springframework.jms.connection.JmsResourceHolder;
 import  org.springframework.jms.core.JmsTemplate;
 import  org.springframework.jms.core.SessionCallback;
 import  org.springframework.jms.listener.DefaultMessageListenerContainer;
 import  org.springframework.jms.support.JmsUtils;
 import  org.springframework.transaction.support.TransactionSynchronization;
 import  org.springframework.transaction.support.TransactionSynchronizationAdapter;
 import  org.springframework.transaction.support.TransactionSynchronizationManager;

BatchedJmsTemplate customises Spring's JmsTemplate with additional methods that enable multiple items to be processed in a single transaction for the various supported operations. The additional methods are identified by the suffix 'batch'.

As with BatchedMessageListenerContainer, Within each transaction, the first read is a blocking read, that blocks for JmsTemplate.setReceiveTimeout(long). Subsequent messages up to the maximum batch size setBatchSize(int) are read as non-blocking reads, with the batch completing as soon as the queue cannot service further messages.

Users of this class must handle rollback appropriately. A rollback triggered by failure processing a single message will cause all the messages in the transaction to rollback. It is recommended to design you message processing so that rollback only occurs for fatal, unexpected and unrecoverable errors such as a failure in the infrastructure. You should handle other errors by, for example, delivering messages directly to an error queue rather than throwing an exception. To assist in constructing this pattern, the AbstractMessageDriven POJO is also provided which provides the basic framework for implementing a MessageListener that is aligned with this contract.

NB. Due to the design and structure of Spring's DefaultMessageListenerContainer and its superclasses, implementing this class must by necessity duplicate certain parts of DefaultMessageListenerContainer. Consequently, this class has been managed at a source code level as a derivative of DefaultMessageListenerContainer and copyright messages and attributions reflect this.

Author(s):
Mark Pollack and Juergen Hoeller were the original authors of the JmsTemplate. Modifications to this class to enable batching were made by Chris Pheby.
 
 public class BatchedJmsTemplate extends JmsTemplate {
 
     private static final TransactionSynchronization BATCH_SYNCHRONIZATION = new BatchTransactionSynchronization();

    
Flag to indicate the start of a batch
 
     private static final ThreadLocal<BooleanIS_START_OF_BATCH = new ThreadLocal<Boolean>() {
         protected Boolean initialValue() {
             return .;
         }
     };

    
The default maximum size for a transactional batch
 
     public static final int DEFAULT_BATCH_SIZE = 150;

    
The configured maximum batch size, must be at least 1
 
     private int batchSize = ;

    
Creates a new instance
 
     public BatchedJmsTemplate() {
     }

    
Configures the maximum number of messages that can be read in a transaction

Parameters:
batchSize The maximum batch size
 
     public void setBatchSize(int batchSize) {
         this. = batchSize;
     }

    
Get the maximum number of messages that can be read in a transaction

Returns:
The maximum batch size
    public int getBatchSize() {
        return ;
    }

    
Receive a batch of up to default batch size for the default destination. Other than batching this method is the same as JmsTemplate.receive()

Returns:
A list of Message
Throws:
JmsException The JmsException
    public List<Message> receiveBatch() throws JmsException {
        return receiveBatch();
    }

    
Receive a batch of up to batchSize. Other than batching this method is the same as JmsTemplate.receive()

Parameters:
batchSize The batch size
Returns:
A list of Message
Throws:
JmsException The JmsException
    public List<Message> receiveBatch(int batchSizethrows JmsException {
        Destination defaultDestination = getDefaultDestination();
        if (defaultDestination != null) {
            return receiveBatch(defaultDestinationbatchSize);
        } else {
            return receiveBatch(getRequiredDefaultDestinationName(), batchSize);
        }
    }

    
Receive a batch of up to default batch size for given destination. Other than batching this method is the same as JmsTemplate.receive(Destination)

Parameters:
destination The Destination
Returns:
A list of Message
Throws:
JmsException The JmsException
    public List<Message> receiveBatch(Destination destinationthrows JmsException {
        return receiveBatch(destination);
    }

    
Receive a batch of up to batchSize for given destination. Other than batching this method is the same as JmsTemplate.receive(Destination)

Parameters:
destination The Destination
batchSize The batch size
Returns:
A list of Message
Throws:
JmsException The JmsException
    public List<Message> receiveBatch(final Destination destinationfinal int batchSizethrows JmsException {
        return execute(new SessionCallback<List<Message>>() {
            public List<Message> doInJms(Session sessionthrows JMSException {
                return doBatchReceive(sessiondestinationnullbatchSize);
            }
        }, true);
    }

    
Receive a batch of up to default batch size for given destinationName. Other than batching this method is the same as JmsTemplate.receive(String)

Parameters:
destinationName The Destination name
Returns:
A list of Message
Throws:
JmsException The JmsException
    public List<Message> receiveBatch(String destinationNamethrows JmsException {
        return receiveBatch(destinationNamegetBatchSize());
    }

    
Receive a batch of up to default batch size for given destination. Other than batching this method is the same as JmsTemplate.receive(String)

Parameters:
destinationName The Destination
batchSize The batch size
Returns:
A list of Message
Throws:
JmsException The JmsException
    public List<Message> receiveBatch(final String destinationNamefinal int batchSizethrows JmsException {
        return execute(new SessionCallback<List<Message>>() {
            public List<Message> doInJms(Session sessionthrows JMSException {
                Destination destination = resolveDestinationName(sessiondestinationName);
                return doBatchReceive(sessiondestinationnullbatchSize);
            }
        }, true);
    }

    
Receive a batch of up to default batch size for default destination and given message selector. Other than batching this method is the same as JmsTemplate.receiveSelected(String)

Parameters:
messageSelector The Selector
Returns:
A list of Message
Throws:
JmsException The JmsException
    public List<Message> receiveSelectedBatch(String messageSelectorthrows JmsException {
        return receiveSelectedBatch(messageSelectorgetBatchSize());
    }

    
Receive a batch of up to batchSize for default destination and given message selector. Other than batching this method is the same as JmsTemplate.receiveSelected(String)

Parameters:
messageSelector The Selector
batchSize The batch size
Returns:
A list of Message
Throws:
JmsException The JmsException
    public List<Message> receiveSelectedBatch(String messageSelectorint batchSizethrows JmsException {
        Destination defaultDestination = getDefaultDestination();
        if (defaultDestination != null) {
            return receiveSelectedBatch(defaultDestinationmessageSelectorbatchSize);
        } else {
            return receiveSelectedBatch(getRequiredDefaultDestinationName(), messageSelectorbatchSize);
        }
    }

    
Receive a batch of up to default batch size for given destination and message selector. Other than batching this method is the same as JmsTemplate.receiveSelected(Destination, String)

Parameters:
destination The Destination
messageSelector The Selector
Returns:
A list of Message
Throws:
JmsException The JmsException
    public List<Message> receiveSelectedBatch(Destination destinationString messageSelectorthrows JmsException {
        return receiveSelectedBatch(destinationmessageSelectorgetBatchSize());
    }

    
Receive a batch of up to batchSize for given destination and message selector. Other than batching this method is the same as JmsTemplate.receiveSelected(Destination, String)

Parameters:
destination The Destination
messageSelector The Selector
batchSize The batch size
Returns:
A list of Message
Throws:
JmsException The JmsException
    public List<Message> receiveSelectedBatch(final Destination destinationfinal String messageSelector,
            final int batchSizethrows JmsException {
        return execute(new SessionCallback<List<Message>>() {
            public List<Message> doInJms(Session sessionthrows JMSException {
                return doBatchReceive(sessiondestinationmessageSelectorbatchSize);
            }
        }, true);
    }

    
Receive a batch of up to default batch size for given destination name and message selector. Other than batching this method is the same as JmsTemplate.receiveSelected(String, String)

Parameters:
destinationName The destination name
messageSelector The Selector
Returns:
A list of Message
Throws:
JmsException The JmsException
    public List<Message> receiveSelectedBatch(String destinationNameString messageSelectorthrows JmsException {
        return receiveSelectedBatch(destinationNamemessageSelectorgetBatchSize());
    }

    
Receive a batch of up to batchSize for given destination name and message selector. Other than batching this method is the same as JmsTemplate.receiveSelected(String, String)

Parameters:
destinationName The destination name
messageSelector The Selector
batchSize The batch size
Returns:
A list of Message
Throws:
JmsException The JmsException
    public List<Message> receiveSelectedBatch(final String destinationNamefinal String messageSelector,
            final int batchSizethrows JmsException {
        return execute(new SessionCallback<List<Message>>() {
            public List<Message> doInJms(Session sessionthrows JMSException {
                Destination destination = resolveDestinationName(sessiondestinationName);
                return doBatchReceive(sessiondestinationmessageSelectorbatchSize);
            }
        }, true);
    }

    
Receive a batch of up to default batch size for default destination and convert each message in the batch. Other than batching this method is the same as JmsTemplate.receiveAndConvert()

Returns:
A list of Message
Throws:
JmsException The JmsException
    public List<ObjectreceiveAndConvertBatch() throws JmsException {
        return receiveAndConvertBatch(getBatchSize());
    }

    
Receive a batch of up to batchSize for default destination and convert each message in the batch. Other than batching this method is the same as JmsTemplate.receiveAndConvert()

Parameters:
batchSize The batch size
Returns:
A list of Message
Throws:
JmsException The JmsException
    public List<ObjectreceiveAndConvertBatch(int batchSizethrows JmsException {
        List<Message> messages = receiveBatch(batchSize);
        List<Objectresult = new ArrayList<Object>(messages.size());
        for (Message next : messages) {
            result.add(doConvertFromMessage(next));
        }
        return result;
    }

    
Receive a batch of up to default batch size for the given Destination and convert each message in the batch. Other than batching this method is the same as JmsTemplate.receiveAndConvert(Destination)

Parameters:
destination The Destination
Returns:
A list of Message
Throws:
JmsException The JmsException
    public List<ObjectreceiveAndConvertBatch(Destination destinationthrows JmsException {
        return receiveAndConvertBatch(destinationgetBatchSize());
    }

    
Receive a batch of up to batchSize for given Destination and convert each message in the batch. Other than batching this method is the same as JmsTemplate.receiveAndConvert(Destination)

Parameters:
destination The Destination
batchSize The batch size
Returns:
A list of Message
Throws:
JmsException The JmsException
    public List<ObjectreceiveAndConvertBatch(Destination destinationint batchSizethrows JmsException {
        List<Message> messages = receiveBatch(destinationbatchSize);
        List<Objectresult = new ArrayList<Object>(messages.size());
        for (Message next : messages) {
            result.add(doConvertFromMessage(next));
        }
        return result;
    }

    
Receive a batch of up to default batch size for given destination name and convert each message in the batch. Other than batching this method is the same as JmsTemplate.receiveAndConvert(String)

Parameters:
destinationName The destination name
Returns:
A list of Message
Throws:
JmsException The JmsException
    public List<ObjectreceiveAndConvertBatch(String destinationNamethrows JmsException {
        return receiveAndConvertBatch(destinationNamegetBatchSize());
    }

    
Receive a batch of up to batchSize for given destination name and convert each message in the batch. Other than batching this method is the same as JmsTemplate.receiveAndConvert(String)

Parameters:
destinationName The destination name
batchSize The batch size
Returns:
A list of Message
Throws:
JmsException The JmsException
    public List<ObjectreceiveAndConvertBatch(String destinationNameint batchSizethrows JmsException {
        List<Message> messages = receiveBatch(destinationNamebatchSize);
        List<Objectresult = new ArrayList<Object>(messages.size());
        for (Message next : messages) {
            result.add(doConvertFromMessage(next));
        }
        return result;
    }

    
Receive a batch of up to default batch size for default destination and message selector and convert each message in the batch. Other than batching this method is the same as JmsTemplate.receiveSelectedAndConvert(String)

Parameters:
messageSelector The Selector
Returns:
A list of Message
Throws:
JmsException The JmsException
    public List<ObjectreceiveSelectedAndConvertBatch(String messageSelectorthrows JmsException {
        return receiveSelectedAndConvertBatch(messageSelectorgetBatchSize());
    }

    
Receive a batch of up to batchSize for default destination and message selector and convert each message in the batch. Other than batching this method is the same as JmsTemplate.receiveSelectedAndConvert(String)

Parameters:
messageSelector The Selector
batchSize The batch size
Returns:
A list of Message
Throws:
JmsException The JmsException
    public List<ObjectreceiveSelectedAndConvertBatch(String messageSelectorint batchSizethrows JmsException {
        List<Message> messages = receiveSelectedBatch(messageSelectorbatchSize);
        List<Objectresult = new ArrayList<Object>(messages.size());
        for (Message next : messages) {
            result.add(doConvertFromMessage(next));
        }
        return result;
    }

    
Receive a batch of up to default batch size for given Destination and message selector and convert each message in the batch. Other than batching this method is the same as JmsTemplate.receiveSelectedAndConvert(Destination, String)

Parameters:
destination The Destination
messageSelector The Selector
Returns:
A list of Message
Throws:
JmsException The JmsException
    public List<ObjectreceiveSelectedAndConvertBatch(Destination destinationString messageSelector)
            throws JmsException {
        return receiveSelectedAndConvertBatch(destinationmessageSelectorgetBatchSize());
    }

    
Receive a batch of up to batchSize for given Destination and message selector and convert each message in the batch. Other than batching this method is the same as JmsTemplate.receiveSelectedAndConvert(Destination, String)

Parameters:
destination The Destination
messageSelector The Selector
batchSize The batch size
Returns:
A list of Message
Throws:
JmsException The JmsException
    public List<ObjectreceiveSelectedAndConvertBatch(Destination destinationString messageSelectorint batchSize)
            throws JmsException {
        List<Message> messages = receiveSelectedBatch(destinationmessageSelectorbatchSize);
        List<Objectresult = new ArrayList<Object>(messages.size());
        for (Message next : messages) {
            result.add(doConvertFromMessage(next));
        }
        return result;
    }

    
Receive a batch of up to default batch size for given destination name and message selector and convert each message in the batch. Other than batching this method is the same as JmsTemplate.receiveSelectedAndConvert(String, String)

Parameters:
destinationName The destination name
messageSelector The Selector
Returns:
A list of Message
Throws:
JmsException The JmsException
    public List<ObjectreceiveSelectedAndConvertBatch(String destinationNameString messageSelector)
            throws JmsException {
        return receiveSelectedAndConvertBatch(destinationNamemessageSelectorgetBatchSize());
    }

    
Receive a batch of up to batchSize for given destination name and message selector and convert each message in the batch. Other than batching this method is the same as JmsTemplate.receiveSelectedAndConvert(String, String)

Parameters:
destinationName The destination name
messageSelector The Selector
batchSize The batch size
Returns:
A list of Message
Throws:
JmsException The JmsException
    public List<ObjectreceiveSelectedAndConvertBatch(String destinationNameString messageSelectorint batchSize)
            throws JmsException {
        List<Message> messages = receiveSelectedBatch(destinationNamebatchSize);
        List<Objectresult = new ArrayList<Object>(messages.size());
        for (Message next : messages) {
            result.add(doConvertFromMessage(next));
        }
        return result;
    }

    
    @Override
    public Message receive() throws JmsException {
        Destination defaultDestination = getDefaultDestination();
        if (defaultDestination != null) {
            return receive(defaultDestination);
        } else {
            return receive(getRequiredDefaultDestinationName());
        }
    }

    
    @Override
    public Message receive(final Destination destinationthrows JmsException {
        return execute(new SessionCallback<Message>() {
            public Message doInJms(Session sessionthrows JMSException {
                return doSingleReceive(sessiondestinationnull);
            }
        }, true);
    }

    
    @Override
    public Message receive(final String destinationNamethrows JmsException {
        return execute(new SessionCallback<Message>() {
            public Message doInJms(Session sessionthrows JMSException {
                Destination destination = resolveDestinationName(sessiondestinationName);
                return doSingleReceive(sessiondestinationnull);
            }
        }, true);
    }

    
    @Override
    public Message receiveSelected(String messageSelectorthrows JmsException {
        Destination defaultDestination = getDefaultDestination();
        if (defaultDestination != null) {
            return receiveSelected(defaultDestinationmessageSelector);
        } else {
            return receiveSelected(getRequiredDefaultDestinationName(), messageSelector);
        }
    }

    
    @Override
    public Message receiveSelected(final Destination destinationfinal String messageSelectorthrows JmsException {
        return execute(new SessionCallback<Message>() {
            public Message doInJms(Session sessionthrows JMSException {
                return doSingleReceive(sessiondestinationmessageSelector);
            }
        }, true);
    }

    
    @Override
    public Message receiveSelected(final String destinationNamefinal String messageSelectorthrows JmsException {
        return execute(new SessionCallback<Message>() {
            public Message doInJms(Session sessionthrows JMSException {
                Destination destination = resolveDestinationName(sessiondestinationName);
                return doSingleReceive(sessiondestinationmessageSelector);
            }
        }, true);
    }

    
Method replicates the simple logic from JmsTemplate#getRequiredDefaultDestinationName which is private and therefore cannot be accessed from this class

Returns:
The default destination name
Throws:
IllegalStateException Example if no destination or destination name is specified
        String name = getDefaultDestinationName();
        if (name == null) {
            throw new IllegalStateException(
                    "No 'defaultDestination' or 'defaultDestinationName' specified. Check configuration of JmsTemplate.");
        }
        return name;
    }

    
Method replicates the simple logic from JmsTemplate#doReceive(MessageConsumer, long) which is private and therefore cannot be accessed from this class

Parameters:
consumer The consumer to use
timeout The timeout to apply
Returns:
A Message
Throws:
JMSException Indicates an error occurred
    private Message doReceive(MessageConsumer consumerlong timeoutthrows JMSException {
        if (timeout == RECEIVE_TIMEOUT_NO_WAIT) {
            return consumer.receiveNoWait();
        } else if (timeout > 0) {
            return consumer.receive(timeout);
        } else {
            return consumer.receive();
        }
    }
    protected List<Message> doBatchReceive(Session session, Destination destinationString messageSelector,
            int batchSizethrows JMSException {
        return doBatchReceive(session, createConsumer(sessiondestinationmessageSelector), batchSize);
    }
    protected List<Message> doBatchReceive(Session session, MessageConsumer consumerint batchSize)
            throws JMSException {
        try {
            final List<Message> result;
            long timeout = determineTimeout();
            Message message = doReceive(consumertimeout);
            if (message == null) {
                result = new ArrayList<Message>(0);
            } else {
                result = new ArrayList<Message>(batchSize);
                result.add(message);
                for (int i = 1; i < batchSizei++) {
                    message = doReceive(consumer, RECEIVE_TIMEOUT_NO_WAIT);
                    if (message == null) {
                        break;
                    }
                    result.add(message);
                }
            }
            if (session.getTransacted()) {
                if (isSessionLocallyTransacted(session)) {
                    JmsUtils.commitIfNecessary(session);
                }
            } else if (isClientAcknowledge(session)) {
                if (message != null) {
                    message.acknowledge();
                }
            }
            return result;
        } finally {
            JmsUtils.closeMessageConsumer(consumer);
        }
    }
    protected Message doSingleReceive(Session session, Destination destinationString messageSelector)
            throws JMSException {
        return doSingleReceive(session, createConsumer(sessiondestinationmessageSelector));
    }
    protected Message doSingleReceive(Session session, MessageConsumer consumerthrows JMSException {
        if (!session.getTransacted() || isSessionLocallyTransacted(session)) {
            // If we are not using JTA we should use standard JmsTemplate behaviour
            return super.doReceive(sessionconsumer);
        }
        // Otherwise batching - the batch can span multiple receive() calls, until you commit the
        // batch
        try {
            final Message message;
            if (..equals(.get())) {
                // Register Synchronization
                TransactionSynchronizationManager.registerSynchronization();
                // Use transaction timeout (if available).
                long timeout = determineTimeout();
                message = doReceive(consumertimeout);
                .set(.);
            } else {
                message = doReceive(consumer, RECEIVE_TIMEOUT_NO_WAIT);
            }
            if (isClientAcknowledge(session)) {
                // Manually acknowledge message, if any.
                if (message != null) {
                    message.acknowledge();
                }
            }
            return message;
        } finally {
            JmsUtils.closeMessageConsumer(consumer);
        }
    }

Determines receive timeout, using logic equivalent to that of @return The timeout determined
    private long determineTimeout() {
        long timeout = getReceiveTimeout();
        JmsResourceHolder resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager
                .getResource(getConnectionFactory());
        if (resourceHolder != null && resourceHolder.hasTimeout()) {
            timeout = Math.min(timeoutresourceHolder.getTimeToLiveInMillis());
        }
        return timeout;
    }

    
A simple TransactionSynchronization implementation that resets the batch indicator so that the next read begins a new batch
    private static class BatchTransactionSynchronization extends TransactionSynchronizationAdapter {
        @Override
        public void afterCompletion(int status) {
            .set(.);
        }
    }
New to GrepCode? Check out our FAQ X