Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
   *
   * Licensed under the Apache License, Version 2.0 (the "License").
   * You may not use this file except in compliance with the License.
   * A copy of the License is located at
   *
   *  http://aws.amazon.com/apache2.0
   *
  * or in the "license" file accompanying this file. This file is distributed
  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
  * express or implied. See the License for the specific language governing
  * permissions and limitations under the License.
  */
 
 package com.amazonaws.services.sqs.buffered;
 
 import java.util.List;
 import java.util.Map;
 
AmazonSQSBufferedAsyncClient provides client-side batching of outgoing sendMessage, deleteMessage and changeMessageVisibility calls.
After receiving a call, rather than executing it right away, this client waits for a configurable period of time ( default=200ms) for other calls of the same type to come in; if such calls do come in, they are also not executed immediately, but instead are added to the batch. When the batch becomes full or the timeout period expires, the entire batch is executed at once and the results are returned to the callers. This method of operation leads to reduced operating costs (since SQS charges per call and fewer calls are made) and increased overall throughput (since more work is performed per call, and all fixed costs of making a call are amortized over a greater amount of work). The cost of this method is increased latency for individual calls, since calls spend some time waiting on the client side for the potential batch-mates to appear before they are actually executed.
This client also performs pre-fetching of messages from SQS. After the first receiveMessage call is made, the client attempts not only to satisfy that call, but also pre-fetch extra messages to store in a temporary buffer. Future receiveMessage calls will be satisfied from the buffer, and only if the buffer is empty will the calling thread have to wait for the messages to be fetched. The size of the buffer and the maximum number of threads used for prefetching are configurable.
AmazonSQSBufferedAsyncClient is thread-safe.
 
 public class AmazonSQSBufferedAsyncClient implements AmazonSQSAsync {
 
     public static final String USER_AGENT = AmazonSQSBufferedAsyncClient.class.getSimpleName() + "/"
             + VersionInfoUtils.getVersion();
 
     private final CachingMap buffers = new CachingMap(16, (float) 0.75, true);
     private final AmazonSQSAsync realSQS;
     private final QueueBufferConfig bufferConfigExemplar;
 
     public AmazonSQSBufferedAsyncClient(AmazonSQSAsync paramRealSQS) {
         this(paramRealSQSnew QueueBufferConfig());
     }
 
     // route all future constructors to the most general one, because validation
     // happens here
     public AmazonSQSBufferedAsyncClient(AmazonSQSAsync paramRealSQSQueueBufferConfig config) {
         config.validate();
          = paramRealSQS;
         = config;
    }
    /*
     * (non-Javadoc)
     * @see com.amazonaws.services.sqs.AmazonSQS#setRegion(com.amazonaws.regions.Region)
     */
    @Override
    public void setRegion(Region regionthrows IllegalArgumentException {
        .setRegion(region);
    }
    public void setQueueAttributes(SetQueueAttributesRequest setQueueAttributesRequestthrows AmazonServiceException,
            AmazonClientException {
        ResultConverter.appendUserAgent(setQueueAttributesRequest);
        .setQueueAttributes(setQueueAttributesRequest);
    }
            throws AmazonServiceExceptionAmazonClientException {
        ResultConverter.appendUserAgent(changeMessageVisibilityBatchRequest);
        return .changeMessageVisibilityBatch(changeMessageVisibilityBatchRequest);
    }
    public void changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest)
            throws AmazonServiceExceptionAmazonClientException {
        ResultConverter.appendUserAgent(changeMessageVisibilityRequest);
        QueueBuffer buffer = getQBuffer(changeMessageVisibilityRequest.getQueueUrl());
        buffer.changeMessageVisibilitySync(changeMessageVisibilityRequest);
    }
    public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest sendMessageBatchRequest)
            throws AmazonServiceExceptionAmazonClientException {
        ResultConverter.appendUserAgent(sendMessageBatchRequest);
        return .sendMessageBatch(sendMessageBatchRequest);
    }
    public SendMessageResult sendMessage(SendMessageRequest sendMessageRequestthrows AmazonServiceException,
            AmazonClientException {
        QueueBuffer buffer = getQBuffer(sendMessageRequest.getQueueUrl());
        ResultConverter.appendUserAgent(sendMessageRequest);
        return buffer.sendMessageSync(sendMessageRequest);
    }
    public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageRequest)
            throws AmazonServiceExceptionAmazonClientException {
        ResultConverter.appendUserAgent(receiveMessageRequest);
        QueueBuffer buffer = getQBuffer(receiveMessageRequest.getQueueUrl());
        return buffer.receiveMessageSync(receiveMessageRequest);
    }
    public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest)
            throws AmazonServiceExceptionAmazonClientException {
        ResultConverter.appendUserAgent(deleteMessageBatchRequest);
        return .deleteMessageBatch(deleteMessageBatchRequest);
    }
    public void deleteMessage(DeleteMessageRequest deleteMessageRequestthrows AmazonServiceException,
            AmazonClientException {
        ResultConverter.appendUserAgent(deleteMessageRequest);
        QueueBuffer buffer = getQBuffer(deleteMessageRequest.getQueueUrl());
        buffer.deleteMessageSync(deleteMessageRequest);
    }
    public void shutdown() {
        for (QueueBuffer buffer : .values()) {
            buffer.shutdown();
        }
        .shutdown();
    }
            throws AmazonServiceExceptionAmazonClientException {
        ResultConverter.appendUserAgent(changeMessageVisibilityBatchRequest);
        return .changeMessageVisibilityBatchAsync(changeMessageVisibilityBatchRequest);
    }
    public Future<VoidchangeMessageVisibilityAsync(ChangeMessageVisibilityRequest changeMessageVisibilityRequest)
            throws AmazonServiceExceptionAmazonClientException {
        ResultConverter.appendUserAgent(changeMessageVisibilityRequest);
        QueueBuffer buffer = getQBuffer(changeMessageVisibilityRequest.getQueueUrl());
        return buffer.changeMessageVisibility(changeMessageVisibilityRequestnull);
    }
            throws AmazonServiceExceptionAmazonClientException {
        ResultConverter.appendUserAgent(sendMessageBatchRequest);
        return .sendMessageBatchAsync(sendMessageBatchRequest);
    }
    public Future<SendMessageResultsendMessageAsync(SendMessageRequest sendMessageRequest)
            throws AmazonServiceExceptionAmazonClientException {
        ResultConverter.appendUserAgent(sendMessageRequest);
        QueueBuffer buffer = getQBuffer(sendMessageRequest.getQueueUrl());
        return buffer.sendMessage(sendMessageRequestnull);
    }
            throws AmazonServiceExceptionAmazonClientException {
        ResultConverter.appendUserAgent(receiveMessageRequest);
        QueueBuffer buffer = getQBuffer(receiveMessageRequest.getQueueUrl());
        return buffer.receiveMessage(receiveMessageRequestnull);
    }
            throws AmazonServiceExceptionAmazonClientException {
        ResultConverter.appendUserAgent(deleteMessageBatchRequest);
        return .deleteMessageBatchAsync(deleteMessageBatchRequest);
    }
    public void setEndpoint(String endpointthrows IllegalArgumentException {
        .setEndpoint(endpoint);
    }
    public Future<VoidsetQueueAttributesAsync(SetQueueAttributesRequest setQueueAttributesRequest)
            throws AmazonServiceExceptionAmazonClientException {
        ResultConverter.appendUserAgent(setQueueAttributesRequest);
        return .setQueueAttributesAsync(setQueueAttributesRequest);
    }
    public Future<GetQueueUrlResultgetQueueUrlAsync(GetQueueUrlRequest getQueueUrlRequest)
            throws AmazonServiceExceptionAmazonClientException {
        ResultConverter.appendUserAgent(getQueueUrlRequest);
        return .getQueueUrlAsync(getQueueUrlRequest);
    }
    public Future<VoidremovePermissionAsync(RemovePermissionRequest removePermissionRequest)
            throws AmazonServiceExceptionAmazonClientException {
        ResultConverter.appendUserAgent(removePermissionRequest);
        return .removePermissionAsync(removePermissionRequest);
    }
    public GetQueueUrlResult getQueueUrl(GetQueueUrlRequest getQueueUrlRequestthrows AmazonServiceException,
            AmazonClientException {
        ResultConverter.appendUserAgent(getQueueUrlRequest);
        return .getQueueUrl(getQueueUrlRequest);
    }
    public void removePermission(RemovePermissionRequest removePermissionRequestthrows AmazonServiceException,
            AmazonClientException {
        ResultConverter.appendUserAgent(removePermissionRequest);
        .removePermission(removePermissionRequest);
    }
            throws AmazonServiceExceptionAmazonClientException {
        ResultConverter.appendUserAgent(getQueueAttributesRequest);
        return .getQueueAttributesAsync(getQueueAttributesRequest);
    }
    public GetQueueAttributesResult getQueueAttributes(GetQueueAttributesRequest getQueueAttributesRequest)
            throws AmazonServiceExceptionAmazonClientException {
        ResultConverter.appendUserAgent(getQueueAttributesRequest);
        return .getQueueAttributes(getQueueAttributesRequest);
    }
    public Future<VoidpurgeQueueAsync(PurgeQueueRequest purgeQueueRequestthrows AmazonServiceException,
            AmazonClientException {
        ResultConverter.appendUserAgent(purgeQueueRequest);
        return .purgeQueueAsync(purgeQueueRequest);
    }
    public void purgeQueue(PurgeQueueRequest purgeQueueRequestthrows AmazonServiceExceptionAmazonClientException {
        ResultConverter.appendUserAgent(purgeQueueRequest);
        .purgeQueue(purgeQueueRequest);
    }
    public Future<VoiddeleteQueueAsync(DeleteQueueRequest deleteQueueRequestthrows AmazonServiceException,
            AmazonClientException {
        ResultConverter.appendUserAgent(deleteQueueRequest);
        return .deleteQueueAsync(deleteQueueRequest);
    }
    public void deleteQueue(DeleteQueueRequest deleteQueueRequestthrows AmazonServiceExceptionAmazonClientException {
        ResultConverter.appendUserAgent(deleteQueueRequest);
        .deleteQueue(deleteQueueRequest);
    }
            AmazonClientException {
        ResultConverter.appendUserAgent(listQueuesRequest);
        return .listQueuesAsync(listQueuesRequest);
    }
    public ListQueuesResult listQueues(ListQueuesRequest listQueuesRequestthrows AmazonServiceException,
            AmazonClientException {
        ResultConverter.appendUserAgent(listQueuesRequest);
        return .listQueues(listQueuesRequest);
    }
    public Future<CreateQueueResultcreateQueueAsync(CreateQueueRequest createQueueRequest)
            throws AmazonServiceExceptionAmazonClientException {
        ResultConverter.appendUserAgent(createQueueRequest);
        return .createQueueAsync(createQueueRequest);
    }
    public CreateQueueResult createQueue(CreateQueueRequest createQueueRequestthrows AmazonServiceException,
            AmazonClientException {
        ResultConverter.appendUserAgent(createQueueRequest);
        return .createQueue(createQueueRequest);
    }
    public Future<VoidaddPermissionAsync(AddPermissionRequest addPermissionRequestthrows AmazonServiceException,
            AmazonClientException {
        ResultConverter.appendUserAgent(addPermissionRequest);
        return .addPermissionAsync(addPermissionRequest);
    }
    public void addPermission(AddPermissionRequest addPermissionRequestthrows AmazonServiceException,
            AmazonClientException {
        ResultConverter.appendUserAgent(addPermissionRequest);
        .addPermission(addPermissionRequest);
    }
        return .listQueues();
    }
        ResultConverter.appendUserAgent(request);
        return .getCachedResponseMetadata(request);
    }
    public Future<VoiddeleteMessageAsync(DeleteMessageRequest deleteMessageRequestthrows AmazonServiceException,
            AmazonClientException {
        ResultConverter.appendUserAgent(deleteMessageRequest);
        QueueBuffer buffer = getQBuffer(deleteMessageRequest.getQueueUrl());
        return buffer.deleteMessage(deleteMessageRequestnull);
    }

    
Returns (creating it if necessary) a queue buffer for a particular queue Since we are only storing a limited number of queue buffers, it is possible that as a result of calling this method the least recently used queue buffer will be removed from our queue buffer cache

Returns:
a queue buffer associated with the provided queue URL. Never null
    private synchronized QueueBuffer getQBuffer(String qUrl) {
        QueueBuffer toReturn = .get(qUrl);
        if (null == toReturn) {
            QueueBufferConfig config = new QueueBufferConfig();
            toReturn = new QueueBuffer(configqUrl);
            .put(qUrltoReturn);
        }
        return toReturn;
    }
    class CachingMap extends LinkedHashMap<StringQueueBuffer> {
        private static final long serialVersionUID = 1;
        private static final int MAX_ENTRIES = 100;
        public CachingMap(int initialfloat loadFactorboolean accessOrder) {
            super(initialloadFactoraccessOrder);
        }
        protected boolean removeEldestEntry(java.util.Map.Entry<StringQueueBuffereldest) {
            return size() > ;
        }
    }
    public Future<VoidchangeMessageVisibilityAsync(ChangeMessageVisibilityRequest changeMessageVisibilityRequest,
                                                     AsyncHandler<ChangeMessageVisibilityRequestVoidasyncHandler)
            throws AmazonServiceExceptionAmazonClientException {
        ResultConverter.appendUserAgent(changeMessageVisibilityRequest);
        QueueBuffer buffer = getQBuffer(changeMessageVisibilityRequest.getQueueUrl());
        return buffer.changeMessageVisibility(changeMessageVisibilityRequestasyncHandler);
    }
    public Future<SendMessageResultsendMessageAsync(SendMessageRequest sendMessageRequest,
                                                      AsyncHandler<SendMessageRequestSendMessageResultasyncHandler)
            throws AmazonServiceExceptionAmazonClientException {
        ResultConverter.appendUserAgent(sendMessageRequest);
        QueueBuffer buffer = getQBuffer(sendMessageRequest.getQueueUrl());
        return buffer.sendMessage(sendMessageRequestasyncHandler);
    }
                                                            AsyncHandler<ReceiveMessageRequestReceiveMessageResultasyncHandler)
            throws AmazonServiceExceptionAmazonClientException {
        ResultConverter.appendUserAgent(receiveMessageRequest);
        QueueBuffer buffer = getQBuffer(receiveMessageRequest.getQueueUrl());
        return buffer.receiveMessage(receiveMessageRequestasyncHandler);
    }
    public Future<VoiddeleteMessageAsync(DeleteMessageRequest deleteMessageRequest,
                                           AsyncHandler<DeleteMessageRequestVoidasyncHandler)
            throws AmazonServiceExceptionAmazonClientException {
        ResultConverter.appendUserAgent(deleteMessageRequest);
        QueueBuffer buffer = getQBuffer(deleteMessageRequest.getQueueUrl());
        return buffer.deleteMessage(deleteMessageRequestasyncHandler);
    }
    public Future<VoidsetQueueAttributesAsync(SetQueueAttributesRequest setQueueAttributesRequest,
                                                AsyncHandler<SetQueueAttributesRequestVoidasyncHandler)
            throws AmazonServiceExceptionAmazonClientException {
        return .setQueueAttributesAsync(setQueueAttributesRequestasyncHandler);
    }
                                                                                        AsyncHandler<ChangeMessageVisibilityBatchRequestChangeMessageVisibilityBatchResultasyncHandler)
            throws AmazonServiceExceptionAmazonClientException {
        return .changeMessageVisibilityBatchAsync(changeMessageVisibilityBatchRequestasyncHandler);
    }
    public Future<GetQueueUrlResultgetQueueUrlAsync(GetQueueUrlRequest getQueueUrlRequest,
                                                      AsyncHandler<GetQueueUrlRequestGetQueueUrlResultasyncHandler)
            throws AmazonServiceExceptionAmazonClientException {
        return .getQueueUrlAsync(getQueueUrlRequestasyncHandler);
    }
    public Future<VoidremovePermissionAsync(RemovePermissionRequest removePermissionRequest,
                                              AsyncHandler<RemovePermissionRequestVoidasyncHandler)
            throws AmazonServiceExceptionAmazonClientException {
        return .removePermissionAsync(removePermissionRequestasyncHandler);
    }
                                                                    AsyncHandler<GetQueueAttributesRequestGetQueueAttributesResultasyncHandler)
            throws AmazonServiceExceptionAmazonClientException {
        return .getQueueAttributesAsync(getQueueAttributesRequestasyncHandler);
    }
                                                                AsyncHandler<SendMessageBatchRequestSendMessageBatchResultasyncHandler)
            throws AmazonServiceExceptionAmazonClientException {
        return .sendMessageBatchAsync(sendMessageBatchRequestasyncHandler);
    }
    public Future<VoidpurgeQueueAsync(PurgeQueueRequest purgeQueueRequest,
                                        AsyncHandler<PurgeQueueRequestVoidasyncHandler)
            throws AmazonServiceExceptionAmazonClientException {
        return .purgeQueueAsync(purgeQueueRequestasyncHandler);
    }
    public Future<VoiddeleteQueueAsync(DeleteQueueRequest deleteQueueRequest,
                                         AsyncHandler<DeleteQueueRequestVoidasyncHandler)
            throws AmazonServiceExceptionAmazonClientException {
        return .deleteQueueAsync(deleteQueueRequestasyncHandler);
    }
    public Future<ListQueuesResultlistQueuesAsync(ListQueuesRequest listQueuesRequest,
                                                    AsyncHandler<ListQueuesRequestListQueuesResultasyncHandler)
            throws AmazonServiceExceptionAmazonClientException {
        return .listQueuesAsync(listQueuesRequestasyncHandler);
    }
                                                                    AsyncHandler<DeleteMessageBatchRequestDeleteMessageBatchResultasyncHandler)
            throws AmazonServiceExceptionAmazonClientException {
        return .deleteMessageBatchAsync(deleteMessageBatchRequestasyncHandler);
    }
    public Future<CreateQueueResultcreateQueueAsync(CreateQueueRequest createQueueRequest,
                                                      AsyncHandler<CreateQueueRequestCreateQueueResultasyncHandler)
            throws AmazonServiceExceptionAmazonClientException {
        return .createQueueAsync(createQueueRequestasyncHandler);
    }
    public Future<VoidaddPermissionAsync(AddPermissionRequest addPermissionRequest,
                                           AsyncHandler<AddPermissionRequestVoidasyncHandler)
            throws AmazonServiceExceptionAmazonClientException {
        return .addPermissionAsync(addPermissionRequestasyncHandler);
    }
    @Override
            throws AmazonServiceExceptionAmazonClientException {
        ResultConverter.appendUserAgent(listDeadLetterSourceQueuesRequest);
        return .listDeadLetterSourceQueues(listDeadLetterSourceQueuesRequest);
    }
    @Override
            throws AmazonServiceExceptionAmazonClientException {
        ResultConverter.appendUserAgent(listDeadLetterSourceQueuesRequest);
        return .listDeadLetterSourceQueuesAsync(listDeadLetterSourceQueuesRequest);
    }
    @Override
                                                                                    AsyncHandler<ListDeadLetterSourceQueuesRequestListDeadLetterSourceQueuesResultasyncHandler)
            throws AmazonServiceExceptionAmazonClientException {
        return .listDeadLetterSourceQueuesAsync(listDeadLetterSourceQueuesRequestasyncHandler);
    }
    @Override
    public void setQueueAttributes(String queueUrlMap<StringStringattributesthrows AmazonServiceException,
            AmazonClientException {
        setQueueAttributes(new SetQueueAttributesRequest(queueUrlattributes));
    }
    @Override
                                                                           List<ChangeMessageVisibilityBatchRequestEntryentries)
            throws AmazonServiceExceptionAmazonClientException {
        return changeMessageVisibilityBatch(new ChangeMessageVisibilityBatchRequest(queueUrlentries));
    }
    @Override
    public void changeMessageVisibility(String queueUrlString receiptHandleInteger visibilityTimeout)
            throws AmazonServiceExceptionAmazonClientException {
        changeMessageVisibility(new ChangeMessageVisibilityRequest(queueUrlreceiptHandlevisibilityTimeout));
    }
    @Override
        return getQueueUrl(new GetQueueUrlRequest(queueName));
    }
    @Override
    public void removePermission(String queueUrlString labelthrows AmazonServiceExceptionAmazonClientException {
        removePermission(new RemovePermissionRequest(queueUrllabel));
    }
    @Override
            throws AmazonServiceExceptionAmazonClientException {
        return sendMessageBatch(new SendMessageBatchRequest(queueUrlentries));
    }
    @Override
    public void deleteQueue(String queueUrlthrows AmazonServiceExceptionAmazonClientException {
        deleteQueue(new DeleteQueueRequest(queueUrl));
    }
    @Override
    public SendMessageResult sendMessage(String queueUrlString messageBodythrows AmazonServiceException,
            AmazonClientException {
        return sendMessage(new SendMessageRequest(queueUrlmessageBody));
    }
    @Override
        return receiveMessage(new ReceiveMessageRequest(queueUrl));
    }
    @Override
    public ListQueuesResult listQueues(String queueNamePrefixthrows AmazonServiceExceptionAmazonClientException {
        return listQueues(new ListQueuesRequest(queueNamePrefix));
    }
    @Override
            throws AmazonServiceExceptionAmazonClientException {
        return deleteMessageBatch(new DeleteMessageBatchRequest(queueUrlentries));
    }
    @Override
        return createQueue(new CreateQueueRequest(queueName));
    }
    @Override
    public void addPermission(String queueUrlString labelList<StringaWSAccountIdsList<Stringactions)
            throws AmazonServiceExceptionAmazonClientException {
        addPermission(new AddPermissionRequest(queueUrllabelaWSAccountIdsactions));
    }
    @Override
    public void deleteMessage(String queueUrlString receiptHandlethrows AmazonServiceException,
            AmazonClientException {
        deleteMessage(new DeleteMessageRequest(queueUrlreceiptHandle));
    }
    @Override
    public GetQueueAttributesResult getQueueAttributes(String queueUrlList<StringattributeNames) {
        return getQueueAttributes(new GetQueueAttributesRequest(queueUrlattributeNames));
    }
New to GrepCode? Check out our FAQ X