Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
   *
   * Licensed under the Apache License, Version 2.0 (the "License").
   * You may not use this file except in compliance with the License.
   * A copy of the License is located at
   *
   *  http://aws.amazon.com/apache2.0
   *
  * or in the "license" file accompanying this file. This file is distributed
  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
  * express or implied. See the License for the specific language governing
  * permissions and limitations under the License.
  */
 
 package com.amazonaws.services.sqs.buffered;
 
 import java.util.List;
 
 
The ReceiveQueueBuffer class is responsible for dequeueing of messages from a single SQS queue. It uses the provided executor to pre-fetch messages from the server and keeps them in a buffer which it uses to satisfy incoming requests. The number of requests pre-fetched and kept in the buffer, as well as the maximum number of threads used to retrieve the messages are configurable.

Synchronization strategy: - Threads must hold the TaskSpawnSyncPoint object monitor to spawn a new task or modify the number of inflight tasks - Threads must hold the monitor of the "futures" list to modify the list - Threads must hold the monitor of the "finishedTasks" list to modify the list - If you need to lock both futures and finishedTasks, lock futures first and finishedTasks second

 
 public class ReceiveQueueBuffer {

    
Simple interface to represent a condition

Parameters:
<T>
 
     private interface Predicate<T> {
        

Parameters:
t Object being evaluated against the condition
Returns:
True if t meets the condition, false if not
 
         boolean test(T t);
     }
 
     private static Log log = LogFactory.getLog(ReceiveQueueBuffer.class);
 
     private final QueueBufferConfig config;
 
     private final String qUrl;
 
     private final Executor executor;
 
     private final AmazonSQS sqsClient;
 
     private long bufferCounter = 0;

    
This buffer's queue visibility timeout. Used to detect expired message that should not be returned by the receiveMessage call. Synchronized by receiveMessageLock. -1 indicates that the time is uninitialized.
 
     private volatile long visibilityTimeoutNanos = -1;

    
Used as permits controlling the number of in flight receive batches. Synchronized by taskSpawnSyncPoint.
 
     private volatile int inflightReceiveMessageBatches;

    
synchronize on this object to create new receive batches or modify inflight message count
 
     private final Object taskSpawnSyncPoint = new Object();

    
shutdown buffer does not retrieve any more messages from sqs
 
     volatile boolean shutDown = false;

    
message delivery futures we gave out
 
     private final LinkedList<ReceiveMessageFuturefutures = new LinkedList<ReceiveMessageFuture>();

    
finished batches are stored in this list.
    ReceiveQueueBuffer(AmazonSQS paramSQSExecutor paramExecutorQueueBufferConfig paramConfigString url) {
         = paramConfig;
         = paramExecutor;
         = paramSQS;
         = url;
    }

    
Prevents spawning of new retrieval batches and waits for all in-flight retrieval batches to finish
    public void shutdown() {
         = true;
        try {
            while ( > 0)
                Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    
Submits the request for retrieval of messages from the queue and returns a future that will be signalled when the request is satisfied. The future may already be signalled by the time it is returned.

Returns:
never null
                                                                                              QueueBufferCallback<ReceiveMessageRequestReceiveMessageResultcallback) {
        if () {
            throw new AmazonClientException("The client has been shut down.");
        }
        // issue the future...
        int numMessages = 10;
        if (rq.getMaxNumberOfMessages() != null) {
            numMessages = rq.getMaxNumberOfMessages();
        }
        QueueBufferFuture<ReceiveMessageRequestReceiveMessageResulttoReturn = issueFuture(numMessagescallback);
        // attempt to satisfy it right away...
        satisfyFuturesFromBuffer();
        // spawn more receive tasks if we need them...
        spawnMoreReceiveTasks();
        return toReturn;
    }

    
Creates and returns a new future object. Sleeps if the list of already-issued but as yet unsatisfied futures is over a throttle limit.

Returns:
never null
    private ReceiveMessageFuture issueFuture(int size,
                                             QueueBufferCallback<ReceiveMessageRequestReceiveMessageResultcallback) {
        synchronized () {
            ReceiveMessageFuture theFuture = new ReceiveMessageFuture(callbacksize);
            .addLast(theFuture);
            return theFuture;
        }
    }

    
Attempts to satisfy some or all of the already-issued futures from the local buffer. If the buffer is empty or there are no futures, this method won't do anything.
    private void satisfyFuturesFromBuffer() {
        synchronized () {
            synchronized () {
                // attempt to satisfy futures until we run out of either futures or
                // finished tasks
                while ((!.isEmpty()) && (!.isEmpty())) {
                    // Remove any expired tasks before attempting to fufill the future
                    pruneExpiredTasks();
                    // Fufill the future from a non expired task if there is one. There is still a
                    // slight chance that the first task could have expired between the time we
                    // pruned and the time we fufill the future
                    if (!.isEmpty()) {
                        fufillFuture(.poll());
                    }
                }
            }
        }
    }

    
Fills the future with whatever results were received by the full batch currently at the head of the completed batch queue. Those results may be retrieved messages, or an exception. This method assumes that you are holding the finished tasks lock locks when invoking it. violate this assumption at your own peril
    private void fufillFuture(ReceiveMessageFuture future) {
        ReceiveMessageResult result = new ReceiveMessageResult();
        LinkedList<Messagemessages = new LinkedList<Message>();
        result.setMessages(messages);
        Exception exception = task.getException();
        int numRetrieved = 0;
        boolean batchDone = false;
        while (numRetrieved < future.getRequestedSize()) {
            Message msg = task.removeMessage();
            // a non-empty batch can still give back a null
            // message if the message expired.
            if (msg != null) {
                messages.add(msg);
                ++numRetrieved;
            } else {
                batchDone = true;
                break;
            }
        }
        // we may have just drained the batch.
        batchDone = batchDone || task.isEmpty() || (exception != null);
        if (batchDone) {
            .removeFirst();
        }
        result.setMessages(messages);
        // if after the above runs the exception is not null,
        // the finished batch has encountered an error, and we will
        // report that in the Future. Otherwise, we will fill
        // the future with the receive result
        if (exception != null) {
            future.setFailure(exception);
        } else {
            future.setSuccess(result);
        }
    }

    
Prune any expired tasks that do not have an exception associated with them. This method assumes that you are holding the finishedTasks lock when invoking it
    private void pruneExpiredTasks() {
        int numberExpiredTasksPruned = pruneHeadTasks(new Predicate<ReceiveQueueBuffer.ReceiveMessageBatchTask>() {
            @Override
            public boolean test(ReceiveMessageBatchTask t) {
                return t.isExpired() && t.getException() == null;
            }
        });
        // If we pruned any tasks because they are expired we also want to prune any empty tasks
        // afterwards so we have a chance to receive those expired messages again.
        if (numberExpiredTasksPruned > 0) {
                @Override
                public boolean test(ReceiveMessageBatchTask t) {
                    return t.isEmpty() && t.getException() == null;
                }
            });
        }
    }

    
Prune all tasks at the beginning of the finishedTasks list that meet the given condition. Once a task is found that does not meet the given condition the pruning stops. This method assumes that you are holding the finishedTasks lock when invoking it.

Parameters:
pruneCondition Condition on whether a task is eligible to be pruned
Returns:
Number of total tasks pruned from finishedTasks
    private int pruneHeadTasks(Predicate<ReceiveMessageBatchTaskpruneCondition) {
        int numberPruned = 0;
        while (!.isEmpty()) {
            if (pruneCondition.test(.getFirst())) {
                .removeFirst();
                numberPruned++;
            } else {
                break;
            }
        }
        return numberPruned;
    }

    
maybe create more receive tasks. extra receive tasks won't be created if we are already at the maximum number of receive tasks, or if we are at the maximum number of prefetched buffers
    private void spawnMoreReceiveTasks() {
        if () {
            return;
        }
        int desiredBatches = .getMaxDoneReceiveBatches();
        desiredBatches = desiredBatches < 1 ? 1 : desiredBatches;
        synchronized () {
            if (.size() >= desiredBatches)
                return;
            // if we have some finished batches already, and
            // existing inflight batches will bring us to the limit,
            // don't spawn more. if our finished tasks cache is empty, we will
            // always spawn a thread.
            if (.size() > 0 && (.size() + ) >= desiredBatches) {
                return;
            }
        }
        synchronized () {
            if ( == -1) {
                GetQueueAttributesRequest request = new GetQueueAttributesRequest().withQueueUrl()
                        .withAttributeNames("VisibilityTimeout");
                ResultConverter.appendUserAgent(request.);
                long visibilityTimeoutSeconds = Long.parseLong(.getQueueAttributes(request).getAttributes()
                        .get("VisibilityTimeout"));
                 = ..convert(visibilityTimeoutSeconds.);
            }
            int max = .getMaxInflightReceiveBatches();
            // must allow at least one inflight receive task, or receive won't
            // work at all.
            max = max > 0 ? max : 1;
            int toSpawn = max - ;
            if (toSpawn > 0) {
                ReceiveMessageBatchTask task = new ReceiveMessageBatchTask(this);
                ++;
                ++;
                if (.isTraceEnabled()) {
                    .trace("Spawned receive batch #" +  + " (" +  + " of "
                            + max + " inflight) for queue " + );
                }
                .execute(task);
            }
        }
    }

    
This method is called by the batches after they have finished retrieving the messages.
        synchronized () {
            .addLast(batch);
            if (.isTraceEnabled()) {
                .info("Queue " +  + " now has " + .size() + " receive results cached ");
            }
        }
        synchronized () {
            --;
        }
        satisfyFuturesFromBuffer();
        spawnMoreReceiveTasks();
    }

    
Clears and nacks any pre-fetched messages in this buffer.
    public void clear() {
        boolean done = false;
        while (!done) {
            ReceiveMessageBatchTask currentBatch = null;
            synchronized () {
                currentBatch = .poll();
            }
            if (currentBatch != null) {
                currentBatch.clear();
            } else {
                // ran out of batches to clear
                done = true;
            }
        }
    }
        /* how many messages did the request ask for */
        private int requestedSize;
            super(cb);
             = paramSize;
        }
        public int getRequestedSize() {
            return ;
        }
    }

    
Task to receive messages from SQS.

The batch task is constructed !open until the ReceiveMessage completes. At that point, the batch opens and its messages (if any) become available to read.

    private class ReceiveMessageBatchTask implements Runnable {
        private Exception exception = null;
        private List<Messagemessages;
        private long visibilityDeadlineNano;
        private boolean open = false;
        private ReceiveQueueBuffer parentBuffer;

        
Constructs a receive task waiting the specified time before calling SQS.

Parameters:
waitTimeMs the time to wait before calling SQS
        ReceiveMessageBatchTask(ReceiveQueueBuffer paramParentBuffer) {
             = paramParentBuffer;
             = Collections.emptyList();
        }
        synchronized boolean isEmpty() {
            if (!) {
                throw new IllegalStateException("batch is not open");
            }
            return .isEmpty();
        }

        

Returns:
the exception that was thrown during execution, or null if there was no exception
        synchronized Exception getException() {
            if (!) {
                throw new IllegalStateException("batch is not open");
            }
            return ;
        }

        
Returns a message if one is available.

The call adjusts the message count.

Returns:
a message or null if none is available
        synchronized Message removeMessage() {
            if (!) {
                throw new IllegalStateException("batch is not open");
            }
            // our messages expired.
            if (isExpired()) {
                .clear();
                return null;
            }
            if (.isEmpty())
                return null;
            else
                return .remove(.size() - 1);
        }
        boolean isExpired() {
            return System.nanoTime() > ;
        }

        
Nacks and clears all messages remaining in the batch.
        synchronized void clear() {
            if (!) {
                throw new IllegalStateException("batch is not open");
            }
            if (!isExpired()) {
                ChangeMessageVisibilityBatchRequest batchRequest = new ChangeMessageVisibilityBatchRequest()
                        .withQueueUrl();
                ResultConverter.appendUserAgent(batchRequest.);
                        .size());
                int i = 0;
                for (Message m : ) {
                    entries.add(new ChangeMessageVisibilityBatchRequestEntry().withId(Integer.toString(i))
                            .withReceiptHandle(m.getReceiptHandle()).withVisibilityTimeout(0));
                    ++i;
                }
                try {
                    batchRequest.setEntries(entries);
                    .changeMessageVisibilityBatch(batchRequest);
                } catch (AmazonClientException e) {
                    // Log and ignore.
                    .warn("ReceiveMessageBatchTask: changeMessageVisibility failed " + e);
                }
            }
            .clear();
        }

        
Attempts to retrieve messages from SQS and upon completion (successful or unsuccessful) reports the batch as complete and open
        public void run() {
            try {
                 = System.nanoTime() + ;
                ReceiveMessageRequest request = new ReceiveMessageRequest().withMaxNumberOfMessages(
                        .getMaxBatchSize());
                ResultConverter.appendUserAgent(request.);
                if (.getVisibilityTimeoutSeconds() > 0) {
                    request.setVisibilityTimeout(.getVisibilityTimeoutSeconds());
                     = System.nanoTime()
                            + ..convert(.getVisibilityTimeoutSeconds(), .);
                }
                if (.isLongPoll()) {
                    request.withWaitTimeSeconds(.getLongPollWaitTimeoutSeconds());
                }
                 = .receiveMessage(request).getMessages();
            } catch (AmazonClientException e) {
                 = e;
            } finally {
                // whatever happened, we are done and can be considered open
                 = true;
                .reportBatchFinished(this);
            }
        }
    }
New to GrepCode? Check out our FAQ X