Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
   *
   * Licensed under the Apache License, Version 2.0 (the "License").
   * You may not use this file except in compliance with the License.
   * A copy of the License is located at
   *
   *  http://aws.amazon.com/apache2.0
   *
  * or in the "license" file accompanying this file. This file is distributed
  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
  * express or implied. See the License for the specific language governing
  * permissions and limitations under the License.
  */
 package com.amazonaws.services.sqs.buffered;
 
 import java.util.List;
 
 
This class is responsible for buffering outgoing SQS requests, i.e. requests to send a message, delete a message and change the visibility of the message.
When a request arrives, the buffer adds the message to a message batch of an appropriate type (creating such a batch if there currently isn't one outstanding). When the outstanding batch becomes full, or when a configurable timeout expires, the buffer makes a call to SQS to execute the current batch.
Internally, the batch objects maintain a list of futures corresponding to the requests added to them. When a batch completes, it loads the results into the futures and marks the futures as complete.
 
 public class SendQueueBuffer {
     private static Log log = LogFactory.getLog(SendQueueBuffer.class);
 
     // Interface to support event notifications with a parameter.
     private interface Listener<T> {
         void invoke(T o);
     };

    
Config settings for this buffer
 
     private final QueueBufferConfig config;

    
Url of our queue
 
     private final String qUrl;

    
The AmazonSQS client to use for this buffer's operations.
 
     private final AmazonSQS sqsClient;

    
The executor service for the batching tasks.
 
     private final Executor executor;

    
Object used to serialize sendMessage calls.
 
     private final Object sendMessageLock = new Object();

    
Object used to serialize deleteMessage calls.
 
     private final Object deleteMessageLock = new Object();

    
Object used to serialize changeMessageVisibility calls.
 
     private final Object changeMessageVisibilityLock = new Object();

    
Current batching task for sendMessage. Using a size 1 array to allow "passing by reference". Synchronized by sendMessageLock.
    private final SendMessageBatchTask[] openSendMessageBatchTask = new SendMessageBatchTask[1];

    
Current batching task for deleteMessage. Using a size 1 array to allow "passing by reference". Synchronized by deleteMessageLock.
    private final DeleteMessageBatchTask[] openDeleteMessageBatchTask = new DeleteMessageBatchTask[1];

    
Current batching task for changeMessageVisibility. Using a size 1 array to allow "passing by reference". Synchronized by changeMessageVisibilityLock.
Permits controlling the number of in flight SendMessage batches.
    private final Semaphore inflightSendMessageBatches;

    
Permits controlling the number of in flight DeleteMessage batches.
    private final Semaphore inflightDeleteMessageBatches;

    
Permits controlling the number of in flight ChangeMessageVisibility batches.
    SendQueueBuffer(AmazonSQS sqsClientExecutor executorQueueBufferConfig paramConfigString url) {
        this. = sqsClient;
        this. = executor;
        this. = paramConfig;
         = url;
        int maxBatch = .getMaxInflightOutboundBatches();
        // must allow at least one outbound batch.
        maxBatch = maxBatch > 0 ? maxBatch : 1;
        this. = new Semaphore(maxBatch);
        this. = new Semaphore(maxBatch);
        this. = new Semaphore(maxBatch);
    }
    public QueueBufferConfig getConfig() {
        return ;
    }

    

Returns:
never null
                                                                                QueueBufferCallback<SendMessageRequestSendMessageResultcallback) {
                requestcallback);
        return result;
    }

    

Returns:
never null
                                                                       QueueBufferCallback<DeleteMessageRequestVoidcallback) {
                callback);
    }

    

Returns:
never null
                                                                                           QueueBufferCallback<ChangeMessageVisibilityRequestVoidcallback) {
                callback);
    }

    

Returns:
new OutboundBatchTask of appropriate type, never null
    @SuppressWarnings("unchecked")
    private <R extends AmazonWebServiceRequest, Result> OutboundBatchTask<R, Result> newOutboundBatchTask(R request) {
        if (request instanceof SendMessageRequest)
            return (OutboundBatchTask<R, Result>) new SendMessageBatchTask();
        else if (request instanceof DeleteMessageRequest)
            return (OutboundBatchTask<R, Result>) new DeleteMessageBatchTask();
        else if (request instanceof ChangeMessageVisibilityRequest)
            return (OutboundBatchTask<R, Result>) new ChangeMessageVisibilityBatchTask();
        else
            // this should never happen
            throw new IllegalArgumentException("Unsupported request type " + request.getClass().getName());
    }

    
Flushes all outstanding outbound requests (SendMessage, DeleteMessage, ChangeMessageVisibility) in this buffer.

The call returns successfully when all outstanding outbound requests submitted before the call are completed (i.e. processed by SQS).

    public void flush() {
        try {
            synchronized () {
            }
            synchronized () {
            }
            synchronized () {
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    
Submits an outbound request for delivery to the queue associated with this buffer.

Parameters:
operationLock the lock synchronizing calls for the call type ( sendMessage, deleteMessage, changeMessageVisibility )
openOutboundBatchTask the open batch task for this call type
request the request to submit
inflightOperationBatches the permits controlling the batches for this type of request
Returns:
never null
Throws:
com.amazonaws.AmazonClientException (see the various outbound calls for details)
    @SuppressWarnings("unchecked")
    <OBT extends OutboundBatchTask<R, Result>, R extends AmazonWebServiceRequest, Result> QueueBufferFuture<R, Result> submitOutboundRequest(Object operationLock,
                                                                                                                                             OBT[] openOutboundBatchTask,
                                                                                                                                             R request,
                                                                                                                                             final Semaphore inflightOperationBatches,
                                                                                                                                             QueueBufferCallback<R, Result> callback) {
        /*
         * Callers add requests to a single batch task (openOutboundBatchTask) until it is full or
         * maxBatchOpenMs elapses. The total number of batch task in flight is controlled by the
         * inflightOperationBatch semaphore capped at maxInflightOutboundBatches.
         */
        QueueBufferFuture<R, Result> theFuture = null;
        try {
            synchronized (operationLock) {
                if (openOutboundBatchTask[0] == null
                        || ((theFuture = openOutboundBatchTask[0].addRequest(requestcallback))) == null) {
                    OBT obt = (OBT) newOutboundBatchTask(request);
                    inflightOperationBatches.acquire();
                    openOutboundBatchTask[0] = obt;
                    // Register a listener for the event signaling that the
                    // batch task has completed (successfully or not).
                    openOutboundBatchTask[0].setOnCompleted(new Listener<OutboundBatchTask<R, Result>>() {
                        @Override
                        public void invoke(OutboundBatchTask<R, Result> task) {
                            inflightOperationBatches.release();
                        }
                    });
                    if (.isTraceEnabled()) {
                        .trace("Queue " +  + " created new batch for " + request.getClass().toString() + " "
                                + inflightOperationBatches.availablePermits() + " free slots remain");
                    }
                    theFuture = openOutboundBatchTask[0].addRequest(requestcallback);
                    .execute(openOutboundBatchTask[0]);
                    if (null == theFuture) {
                        // this can happen only if the request itself is flawed,
                        // so that it can't be added to any batch, even a brand
                        // new one
                        throw new AmazonClientException("Failed to schedule request " + request + " for execution");
                    }
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            AmazonClientException toThrow = new AmazonClientException("Interrupted while waiting for lock.");
            toThrow.initCause(e);
            throw toThrow;
        }
        return theFuture;
    }

    
Task to send a batch of outbound requests to SQS.

The batch task is constructed open and accepts requests until full, or until maxBatchOpenMs elapses. At that point, the batch closes and the collected requests are assembled into a single batch request to SQS. Specialized for each type of outbound request.

Instances of this class (and subclasses) are thread-safe.

Parameters:
<R> the type of the SQS request to batch
<Result> the type of result he futures issued by this task will return
    private abstract class OutboundBatchTask<R extends AmazonWebServiceRequest, Result> implements Runnable {
        protected final List<R> requests;
        protected final ArrayList<QueueBufferFuture<R, Result>> futures;
        private boolean closed;
        private volatile Listener<OutboundBatchTask<R, Result>> onCompleted;
        public OutboundBatchTask() {
            this. = new ArrayList<R>(.getMaxBatchSize());
            this. = new ArrayList<QueueBufferFuture<R, Result>>(.getMaxBatchSize());
        }
        public void setOnCompleted(Listener<OutboundBatchTask<R, Result>> value) {
             = value;
        }

        
Adds a request to the batch if it is still open and has capacity.

Returns:
the future that can be used to get the results of the execution, or null if the addition failed.
        public synchronized QueueBufferFuture<R, Result> addRequest(R requestQueueBufferCallback<R, Result> callback) {
            if () {
                return null;
            }
            QueueBufferFuture<R, Result> theFuture = addIfAllowed(requestcallback);
            // if the addition did not work, or this addition made us full,
            // we can close the request.
            if ((null == theFuture) || isFull()) {
                 = true;
                notify();
            }
            return theFuture;
        }

        
Adds the request to the batch if capacity allows it. Called by addRequest with a lock on this held.

Parameters:
request
Returns:
the future that will be signaled when the request is completed and can be used to retrieve the result. Can be null if the addition could not be done
        private QueueBufferFuture<R, Result> addIfAllowed(R requestQueueBufferCallback<R, Result> callback) {
            if (isOkToAdd(request)) {
                .add(request);
                QueueBufferFuture<R, Result> theFuture = new QueueBufferFuture<R, Result>(callback);
                .add(theFuture);
                onRequestAdded(request);
                return theFuture;
            } else {
                return null;
            }
        }

        
Checks whether it's okay to add the request to this buffer. Called by addIfAllowed with a lock on this held.

Parameters:
request the request to add
Returns:
true if the request is okay to add, false otherwise
        protected boolean isOkToAdd(R request) {
            return .size() < .getMaxBatchSize();
        }

        
A hook to be run when a request is successfully added to this buffer. Called by addIfAllowed with a lock on this held.

Parameters:
request the request that was added
        protected void onRequestAdded(R request) {
            // to be overridden by subclasses
        }

        
Checks whether the buffer is now full. Called by addIfAllowed with a lock on this held.

Returns:
whether the buffer is filled to capacity
        protected boolean isFull() {
            return .size() >= .getMaxBatchSize();
        }

        
Processes the batch once closed. Is NOT called with a lock on this. However, it's passed a local copy of both the requests and futures lists made while holding the lock.
        protected abstract void process(List<R> requestsList<QueueBufferFuture<R, Result>> futures);
        @Override
        public final void run() {
            try {
                long deadlineMs = ..convert(System.nanoTime(), .)
                        + .getMaxBatchOpenMs() + 1;
                long t = ..convert(System.nanoTime(), .);
                List<R> requests;
                List<QueueBufferFuture<R, Result>> futures;
                synchronized (this) {
                    while (! && (t < deadlineMs)) {
                        t = ..convert(System.nanoTime(), .);
                        // zero means "wait forever", can't have that.
                        long toWait = Math.max(1, deadlineMs - t);
                        wait(toWait);
                    }
                     = true;
                    requests = new ArrayList<R>(this.);
                    futures = new ArrayList<QueueBufferFuture<R, Result>>(this.);
                }
                process(requestsfutures);
            } catch (InterruptedException e) {
                failAll(e);
            } catch (AmazonClientException e) {
                failAll(e);
            } catch (RuntimeException e) {
                failAll(e);
                throw e;
            } catch (Error e) {
                failAll(new AmazonClientException("Error encountered"e));
                throw e;
            } finally {
                // make a copy of the listener since it (theoretically) can be
                // modified from the outside.
                Listener<OutboundBatchTask<R, Result>> listener = ;
                if (listener != null) {
                    listener.invoke(this);
                }
            }
        }
        private void failAll(Exception e) {
            for (QueueBufferFuture<R, Result> f : ) {
                f.setFailure(e);
            }
        }
    }
        int batchSizeBytes = 0;
        @Override
        protected boolean isOkToAdd(SendMessageRequest request) {
            return (.size() < .getMaxBatchSize())
                    && ((request.getMessageBody().getBytes().length + ) < .getMaxBatchSizeBytes());
        }
        @Override
        protected void onRequestAdded(SendMessageRequest request) {
             += request.getMessageBody().getBytes().length;
        }
        @Override
        protected boolean isFull() {
            return (.size() >= .getMaxBatchSize()) || ( >= .getMaxBatchSizeBytes());
        }
        @Override
        protected void process(List<SendMessageRequestrequests,
                               List<QueueBufferFuture<SendMessageRequestSendMessageResult>> futures) {
            if (requests.isEmpty()) {
                return;
            }
            SendMessageBatchRequest batchRequest = new SendMessageBatchRequest().withQueueUrl();
            ResultConverter.appendUserAgent(batchRequest.);
            List<SendMessageBatchRequestEntryentries = new ArrayList<SendMessageBatchRequestEntry>(requests.size());
            for (int i = 0, n = requests.size(); i < ni++) {
                entries.add(new SendMessageBatchRequestEntry().withId(Integer.toString(i))
                        .withMessageBody(requests.get(i).getMessageBody())
                        .withDelaySeconds(requests.get(i).getDelaySeconds())
                        .withMessageAttributes(requests.get(i).getMessageAttributes()));
            }
            batchRequest.setEntries(entries);
            SendMessageBatchResult batchResult = .sendMessageBatch(batchRequest);
            for (SendMessageBatchResultEntry entry : batchResult.getSuccessful()) {
                int index = Integer.parseInt(entry.getId());
                futures.get(index).setSuccess(ResultConverter.convert(entry));
            }
            for (BatchResultErrorEntry errorEntry : batchResult.getFailed()) {
                int index = Integer.parseInt(errorEntry.getId());
                if (errorEntry.isSenderFault()) {
                    futures.get(index).setFailure(ResultConverter.convert(errorEntry));
                } else {
                    // retry.
                    try {
                        // this will retry internally up to 3 times.
                        futures.get(index).setSuccess(.sendMessage(requests.get(index)));
                    } catch (AmazonClientException ace) {
                        futures.get(index).setFailure(ace);
                    }
                }
            }
        }
    }
        @Override
        protected void process(List<DeleteMessageRequestrequests,
                               List<QueueBufferFuture<DeleteMessageRequestVoid>> futures) {
            if (requests.isEmpty()) {
                return;
            }
            DeleteMessageBatchRequest batchRequest = new DeleteMessageBatchRequest().withQueueUrl();
            ResultConverter.appendUserAgent(batchRequest.);
                    requests.size());
            for (int i = 0, n = requests.size(); i < ni++) {
                entries.add(new DeleteMessageBatchRequestEntry().withId(Integer.toString(i)).withReceiptHandle(
                        requests.get(i).getReceiptHandle()));
            }
            batchRequest.setEntries(entries);
            DeleteMessageBatchResult batchResult = .deleteMessageBatch(batchRequest);
            for (DeleteMessageBatchResultEntry entry : batchResult.getSuccessful()) {
                int index = Integer.parseInt(entry.getId());
                futures.get(index).setSuccess(null);
            }
            for (BatchResultErrorEntry errorEntry : batchResult.getFailed()) {
                int index = Integer.parseInt(errorEntry.getId());
                if (errorEntry.isSenderFault()) {
                    futures.get(index).setFailure(ResultConverter.convert(errorEntry));
                } else {
                    try {
                        // retry.
                        .deleteMessage(requests.get(index));
                        futures.get(index).setSuccess(null);
                    } catch (AmazonClientException ace) {
                        futures.get(index).setFailure(ace);
                    }
                }
            }
        }
    }
        @Override
        protected void process(List<ChangeMessageVisibilityRequestrequests,
                               List<QueueBufferFuture<ChangeMessageVisibilityRequestVoid>> futures) {
            if (requests.isEmpty()) {
                return;
            }
            ChangeMessageVisibilityBatchRequest batchRequest = new ChangeMessageVisibilityBatchRequest()
                    .withQueueUrl();
            ResultConverter.appendUserAgent(batchRequest.);
                    requests.size());
            for (int i = 0, n = requests.size(); i < ni++) {
                entries.add(new ChangeMessageVisibilityBatchRequestEntry().withId(Integer.toString(i))
                        .withReceiptHandle(requests.get(i).getReceiptHandle())
                        .withVisibilityTimeout(requests.get(i).getVisibilityTimeout()));
            }
            batchRequest.setEntries(entries);
            ChangeMessageVisibilityBatchResult batchResult = .changeMessageVisibilityBatch(batchRequest);
            for (ChangeMessageVisibilityBatchResultEntry entry : batchResult.getSuccessful()) {
                int index = Integer.parseInt(entry.getId());
                futures.get(index).setSuccess(null);
            }
            for (BatchResultErrorEntry errorEntry : batchResult.getFailed()) {
                int index = Integer.parseInt(errorEntry.getId());
                if (errorEntry.isSenderFault()) {
                    futures.get(index).setFailure(ResultConverter.convert(errorEntry));
                } else {
                    try {
                        // retry.
                        .changeMessageVisibility(requests.get(index));
                        futures.get(index).setSuccess(null);
                    } catch (AmazonClientException ace) {
                        futures.get(index).setFailure(ace);
                    }
                }
            }
        }
    }
New to GrepCode? Check out our FAQ X