Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
   *
   * Licensed under the Apache License, Version 2.0 (the "License").
   * You may not use this file except in compliance with the License.
   * A copy of the License is located at
   *
   *  http://aws.amazon.com/apache2.0
   *
  * or in the "license" file accompanying this file. This file is distributed
  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
  * express or implied. See the License for the specific language governing
  * permissions and limitations under the License.
  */
 package com.amazonaws.services.sqs.buffered;
 
 import java.util.List;
 
 import  org.apache.commons.logging.Log;
 import  org.apache.commons.logging.LogFactory;
 
 import  com.amazonaws.services.sqs.model.SendMessageBatchRequest;
 import  com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
 import  com.amazonaws.services.sqs.model.SendMessageBatchResult;
 import  com.amazonaws.services.sqs.model.SendMessageBatchResultEntry;
 import  com.amazonaws.services.sqs.model.SendMessageRequest;
 import  com.amazonaws.services.sqs.model.SendMessageResult;


This class is responsible for buffering outgoing SQS requests, i.e. requests to send a message, delete a message and change the visibility of the message.
When a request arrives, the buffer adds the message to a message batch of an appropriate type (creating such a batch if there currently isn't one outstanding). When the outstanding batch becomes full, or when a configurable timeout expires, the buffer makes a call to SQS to execute the current batch.
Internally, the batch objects maintain a list of futures corresponding to the requests added to them. When a batch completes, it loads the results into the futures and marks the futures as complete.
 
 public class SendQueueBuffer {
     private static Log log = LogFactory.getLog(SendQueueBuffer.class);
 
     // Interface to support event notifications with a parameter.
     private interface Listener<T> {
         void invoke(T o);
     };

    
Config settings for this buffe
 
     private final QueueBufferConfig config;

    
Url of our queue
 
     private final String qUrl;

    
The AmazonSQS client to use for this buffer's operations.
 
     private final AmazonSQS sqsClient;

    
The executor service for the batching tasks.
 
     private final Executor executor;

    
Object used to serialize sendMessage calls.
 
     private final Object sendMessageLock = new Object();

    
Object used to serialize deleteMessage calls.
 
     private final Object deleteMessageLock = new Object();

    
Object used to serialize changeMessageVisibility calls.
 
     private final Object changeMessageVisibilityLock = new Object();

    
Current batching task for sendMessage. Using a size 1 array to allow "passing by reference". Synchronized by sendMessageLock.
    private final SendMessageBatchTask[] openSendMessageBatchTask = new SendMessageBatchTask[1];

    
Current batching task for deleteMessage. Using a size 1 array to allow "passing by reference". Synchronized by deleteMessageLock.
    private final DeleteMessageBatchTask[] openDeleteMessageBatchTask = new DeleteMessageBatchTask[1];

    
Current batching task for changeMessageVisibility. Using a size 1 array to allow "passing by reference". Synchronized by changeMessageVisibilityLock.
Permits controlling the number of in flight SendMessage batches.
    private final Semaphore inflightSendMessageBatches;

    
Permits controlling the number of in flight DeleteMessage batches.
    private final Semaphore inflightDeleteMessageBatches;

    
Permits controlling the number of in flight ChangeMessageVisibility batches.
    SendQueueBuffer(AmazonSQS sqsClientExecutor executorQueueBufferConfig paramConfigString url) {
        this. = sqsClient;
        this. = executor;
        this. = paramConfig;
         = url;
        int maxBatch = .getMaxInflightOutboundBatches();
        //must allow at least one outbound batch.
        maxBatch = maxBatch > 0 ? maxBatch : 1;
        this. = new SemaphoremaxBatch);
        this. = new SemaphoremaxBatch);
        this. = new SemaphoremaxBatch);
    }
    public QueueBufferConfig getConfig() {
        return ;
    }

      

Returns:
never null
      public QueueBufferFuture< SendMessageRequest, SendMessageResult > sendMessage( SendMessageRequest requestQueueBufferCallback<SendMessageRequest, SendMessageResult> callback)
      {
          QueueBufferFuture<SendMessageRequest, SendMessageResult>  result =
                  submitOutboundRequest(requestcallback);
          return result;
      }

      

Returns:
never null
          return submitOutboundRequest(,
                  ,
                  request,
                  ,
                  callback);
      }

      

Returns:
never null
          return submitOutboundRequest(
                ,
                ,
                request,
                ,
                callback);
      }

    

Returns:
new OutboundBatchTask of appropriate type, never null
    @SuppressWarnings("unchecked")
    private <R extends AmazonWebServiceRequest, Result> OutboundBatchTask<R, Result> newOutboundBatchTask(
            R request) {
        if (request instanceof SendMessageRequest)
            return (OutboundBatchTask<R, Result>) new SendMessageBatchTask();
        else if (request instanceof DeleteMessageRequest)
            return (OutboundBatchTask<R, Result>) new DeleteMessageBatchTask();
        else if (request instanceof ChangeMessageVisibilityRequest)
            return (OutboundBatchTask<R, Result>) new ChangeMessageVisibilityBatchTask();
        else
            // this should never happen
            throw new IllegalArgumentException("Unsupported request type " + request.getClass().getName());
    }

    
Flushes all outstanding outbound requests (SendMessage, DeleteMessage, ChangeMessageVisibility) in this buffer.

The call returns successfully when all outstanding outbound requests submitted before the call are completed (i.e. processed by SQS).

    public void flush() {
        try {
            synchronized () {
                
                        .acquire(.getMaxInflightOutboundBatches());
                
                        .release(.getMaxInflightOutboundBatches());
            }
            synchronized () {
                
                        .acquire(.getMaxInflightOutboundBatches());
                
                        .release(.getMaxInflightOutboundBatches());
            }
            synchronized () {
                
                        .acquire(.getMaxInflightOutboundBatches());
                
                        .release(.getMaxInflightOutboundBatches());
            }
        } catchInterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    
Submits an outbound request for delivery to the queue associated with this buffer.

Parameters:
operationLock the lock synchronizing calls for the call type ( sendMessage, deleteMessage, changeMessageVisibility )
openOutboundBatchTask the open batch task for this call type
request the request to submit
inflightOperationBatches the permits controlling the batches for this type of request
Returns:
never null
Throws:
AmazonClientException (see the various outbound calls for details)
    @SuppressWarnings("unchecked")
    <OBT extends OutboundBatchTask<R, Result>, R extends AmazonWebServiceRequest, Result> QueueBufferFuture<R, Result> submitOutboundRequest(
            Object operationLock,
            OBT[] openOutboundBatchTask,
            R request,
            final Semaphore inflightOperationBatches,
            QueueBufferCallback<R, Result> callback) {
        /*
         * Callers add requests to a single batch task (openOutboundBatchTask)
         * until it is full or maxBatchOpenMs elapses. The total number of batch
         * task in flight is controlled by the inflightOperationBatch semaphore
         * capped at maxInflightOutboundBatches.
         */
        QueueBufferFuture<R, Result> theFuture = null;
        try {
            synchronized (operationLock) {
                if (openOutboundBatchTask[0] == null || ((theFuture = openOutboundBatchTask[0].addRequest(requestcallback))) == null) {
                    OBT obt = (OBT) newOutboundBatchTask(request);
                    inflightOperationBatches.acquire();
                    openOutboundBatchTask[0] = obt;
                    // Register a listener for the event signaling that the
                    // batch task has completed (successfully or not).
                    openOutboundBatchTask[0]. = new Listener<OutboundBatchTask<R, Result>>() {
                        public void invoke(OutboundBatchTask<R, Result> task) {
                            inflightOperationBatches.release();
                        }
                    };
                    if ( .isTraceEnabled() ) {
                        .trace("Queue " +  + " created new batch for " + request.getClass().toString()
                                + " " + inflightOperationBatches.availablePermits()
                                + " free slots remain");
                    }
                    theFuture = openOutboundBatchTask[0].addRequest(requestcallback);
                    .execute(openOutboundBatchTask[0]);
                    if ( null == theFuture ) {
                        //this can happen only if the request itself is flawed,
                        //so that it can't be added to any batch, even a brand
                        //new one
                        throw new AmazonClientException("Failed to schedule request "request + " for execution" );
                    }
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            AmazonClientException toThrow = new AmazonClientException("Interrupted while waiting for lock.");
            toThrow.initCause(e);
            throw toThrow;
        }
        return theFuture;
    }


    
Task to send a batch of outbound requests to SQS.

The batch task is constructed open and accepts requests until full, or until maxBatchOpenMs elapses. At that point, the batch closes and the collected requests are assembled into a single batch request to SQS. Specialized for each type of outbound request.

Instances of this class (and subclasses) are thread-safe.

Parameters:
<R> the type of the SQS request to batch
<Result> the type of result he futures issued by this task will return
    private abstract class OutboundBatchTask<R extends AmazonWebServiceRequest, Result> implements Runnable {
        final List<R> requests;
        final ArrayList<QueueBufferFuture<R, Result>> futures;
        AtomicBoolean open = new AtomicBoolean(true);
        volatile Listener<OutboundBatchTask<R, Result>> onCompleted = null;
        OutboundBatchTask() {
             = new ArrayList<R>(.getMaxBatchSize());
             = new ArrayList<QueueBufferFuture<R, Result>>(.getMaxBatchSize());
        }

        
Adds a request to the batch if it is still open and has capacity.

Returns:
the future that can be used to get the results of the execution, or null if the addition failed.
        synchronized QueueBufferFuture<R, Result> addRequest(R requestQueueBufferCallback<R, Result> callback) {
            if (!.get())
                return null;
            QueueBufferFuture<R, Result> theFuture = addIfAllowed(requestcallback);
            // if the addition did not work, or this addition made us full,
            // we can close the request
            if ((null == theFuture) || isFull()) {
                .set(false);
            }
            // the batch request is as full as it will ever be. no need to wait
            // for the timeout, we can run it now.
            if (!.get())
                notify();
            return theFuture;
        }

        
Adds the request to the batch if capacity allows it.

Parameters:
request
Returns:
the future that will be signaled when the request is completed and can be used to retrieve the result. Can be null if the addition could not be done
        synchronized QueueBufferFuture<R, Result> addIfAllowed(R requestQueueBufferCallback<R,Result> callback) {
            if (isOkToAdd(request)) {
                .add(request);
                QueueBufferFuture<R, Result> theFuture = new QueueBufferFuture<R, Result>(callback);
                .add(theFuture);
                onRequestAdded(request);
                return theFuture;
            } else
                return null;
        }
        protected synchronized boolean isOkToAdd(R request) {
            return .size() < .getMaxBatchSize();
        }
        protected synchronized void onRequestAdded(R request) {
            // to be overridden by subclasses
        }

        

Returns:
whether the buffer is filled to capacity
        synchronized boolean isFull() {
            return .size() >= .getMaxBatchSize();
        }

        
Processes the batch once closed.
        abstract void process();
        @Override
        public synchronized void run() {
            try {
                long deadlineMs = ..convert(System.nanoTime(), . ) +
                        .getMaxBatchOpenMs() +1;
                long t = ..convert(System.nanoTime(), . );
                while (.get()  && (t  < deadlineMs ) ) {
                    t = ..convert(System.nanoTime(), . );
                    //zero means "wait forever", can't have that.
                    long toWait = Math.max(1, deadlineMs - t);
                    wait(toWait);
                }
                .set(false);
                process();
            } catch (InterruptedException e) {
                failAlle );
            } catch (AmazonClientException e) {
                failAlle );
            } catch (RuntimeException e) {
                failAlle );
                throw e;
            } catch (Error e) {
                failAllnew AmazonClientException("Error encountered"e) );
                throw e;
            } finally {
                //make a copy of the listener since it can be modified from outside
                Listener<OutboundBatchTask<R, Result>> completionListener = ;
                if (completionListener != null)
                    completionListener.invoke(this);
            }
        }
        private void failAllException e) {
            forQueueBufferFuture<R, Result> f :  ) {
                f.setFailure(e);
            }
        }
    }
    private class SendMessageBatchTask extends
            OutboundBatchTask<SendMessageRequest, SendMessageResult> {
        int batchSizeBytes = 0;
        @Override
        protected synchronized boolean isOkToAdd(SendMessageRequest request) {
            return ( .size() < .getMaxBatchSize() ) &&
                    ((request.getMessageBody().getBytes().length + ) < .getMaxBatchSizeBytes());
        }
        @Override
        protected void onRequestAdded(SendMessageRequest request) {
             += request.getMessageBody().getBytes().length;
        }
        @Override
        synchronized boolean isFull() {
            return ( .size() >= .getMaxBatchSize() ) ||
                    (  >= .getMaxBatchSizeBytes());
        }
        @Override
        void process() {
            if (.isEmpty())
                return;
            SendMessageBatchRequest batchRequest = new SendMessageBatchRequest()
                    .withQueueUrl();
            ResultConverter.appendUserAgent(batchRequest.);
            List<SendMessageBatchRequestEntry> entries = new ArrayList<SendMessageBatchRequestEntry>(
                    .size());
            for (int i = 0, n = .size(); i < ni++)
                entries.add(new SendMessageBatchRequestEntry()
                        .withId(Integer.toString(i))
                        .withMessageBody(.get(i).getMessageBody())
                        .withDelaySeconds(.get(i).getDelaySeconds())
                        .withMessageAttributes(.get(i).getMessageAttributes()));
            batchRequest.setEntries(entries);
            SendMessageBatchResult batchResult = 
                    .sendMessageBatch(batchRequest);
            for (SendMessageBatchResultEntry entry : batchResult
                    .getSuccessful()) {
                int index = Integer.parseInt(entry.getId());
                .get(index).setSuccess(ResultConverter.convert(entry));
            }
            for (BatchResultErrorEntry errorEntry : batchResult.getFailed()) {
                int index = Integer.parseInt(errorEntry.getId());
                if ( errorEntry.isSenderFault() ) {
                    .get(index).setFailure( ResultConverter.convert(errorEntry));
                } else {
                    //retry.
                    try {
                        //this will retry internally up to 3 times.
                        .get(index).setSuccess(.sendMessage(.get(index)));
                    } catch ( AmazonClientException ace ) {
                        .get(index).setFailure(ace);
                    }
                }
            }
        }
    }
    private class DeleteMessageBatchTask extends
            OutboundBatchTask<DeleteMessageRequestVoid> {
        @Override
        void process() {
            if (.isEmpty())
                return;
            DeleteMessageBatchRequest batchRequest = new DeleteMessageBatchRequest()
                    .withQueueUrl();
            ResultConverter.appendUserAgent(batchRequest.);
                    .size());
            for (int i = 0, n = .size(); i < ni++)
                entries.add(new DeleteMessageBatchRequestEntry().withId(
                        Integer.toString(i)).withReceiptHandle(
                        .get(i).getReceiptHandle()));
            batchRequest.setEntries(entries);
            DeleteMessageBatchResult batchResult = 
                    .deleteMessageBatch(batchRequest);
            for (DeleteMessageBatchResultEntry entry : batchResult
                    .getSuccessful()) {
                int index = Integer.parseInt(entry.getId());
                .get(index).setSuccess(null);
            }
            for (BatchResultErrorEntry errorEntry : batchResult.getFailed()) {
                int index = Integer.parseInt(errorEntry.getId());
                if ( errorEntry.isSenderFault() ) {
                      .get(index).setFailure( ResultConverter.convert(errorEntry));
                } else {
                    try {
                        //retry.
                        .deleteMessage(.get(index));
                        .get(index).setSuccess(null);
                    } catch ( AmazonClientException ace ) {
                        .get(index).setFailure(ace);
                    }
                }
           }
        }
    }
    private class ChangeMessageVisibilityBatchTask extends
        @Override
        void process() {
            if (.isEmpty())
                return;
            ChangeMessageVisibilityBatchRequest batchRequest = new ChangeMessageVisibilityBatchRequest()
                    .withQueueUrl();
            ResultConverter.appendUserAgent(batchRequest.);
                    .size());
            for (int i = 0, n = .size(); i < ni++)
                entries.add(new ChangeMessageVisibilityBatchRequestEntry()
                        .withId(Integer.toString(i))
                        .withReceiptHandle(.get(i).getReceiptHandle())
                        .withVisibilityTimeout(
                                .get(i).getVisibilityTimeout()));
            batchRequest.setEntries(entries);
            ChangeMessageVisibilityBatchResult batchResult = 
                    .changeMessageVisibilityBatch(batchRequest);
            for (ChangeMessageVisibilityBatchResultEntry entry : batchResult
                    .getSuccessful()) {
                int index = Integer.parseInt(entry.getId());
                .get(index).setSuccess(null);
            }
            for (BatchResultErrorEntry errorEntry : batchResult.getFailed()) {
                int index = Integer.parseInt(errorEntry.getId());
                if ( errorEntry.isSenderFault() ) {
                    .get(index).setFailure( ResultConverter.convert(errorEntry));
                } else {
                    try {
                        //retry.
                        .changeMessageVisibility(.get(index));
                        .get(index).setSuccess(null);
                    } catch ( AmazonClientException ace ) {
                        .get(index).setFailure(ace);
                    }
                }
            }
        }
    }
New to GrepCode? Check out our FAQ X