Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
   *
   * Licensed under the Apache License, Version 2.0 (the "License").
   * You may not use this file except in compliance with the License.
   * A copy of the License is located at
   *
   *  http://aws.amazon.com/apache2.0
   *
  * or in the "license" file accompanying this file. This file is distributed
  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
  * express or implied. See the License for the specific language governing
  * permissions and limitations under the License.
  */
 package com.amazonaws.services.sqs.buffered;
 
 
A buffer to operate on an SQS queue. The buffer batches outbound ( SendMessage, DeleteMessage, ChangeMessageVisibility) requests to the queue and pre-fetches messages to receive. In practice, the buffer does almost no work itself, and delegates it to SendQueueBufer and ReceiveQueueBuffer classes.

Any errors encountered are passed through to the callers, either as the appropriate Result objects or as exceptions.

When the buffer is not used, all internal processing associated with the buffer stops when any outstanding request to SQS completes. In that idle state, the buffer uses neither connections nor threads.

Instances of QueueBuffer are thread-safe.

 
 
 class QueueBuffer {
 
     private final SendQueueBuffer sendBuffer;
     private final ReceiveQueueBuffer receiveBuffer;
     private final AmazonSQSAsync realSqs;
     QueueBufferConfig config;

    
This executor that will be shared among all queue buffers. Since a single JVM can access hundreds of queues, it won't do to have hundreds of executors spinning up hundreds of threads for each queue. The DaemonThreadFactory creates daemon threads, which means they won't block the JVM from exiting if only they are still around.
 
     static ExecutorService executor = Executors.newCachedThreadPool(new DaemonThreadFactory());;
 
     QueueBuffer(QueueBufferConfig paramConfigString urlAmazonSQSAsync sqs) {
          = sqs;
          = paramConfig;
          = new SendQueueBuffer(sqsparamConfigurl);
          = new ReceiveQueueBuffer(sqsparamConfigurl);
     }

    
asynchronously enqueues a message to SQS.

Returns:
a Future object that will be notified when the operation is completed; never null
 
                                                  AsyncHandler<SendMessageRequestSendMessageResulthandler) {
         QueueBufferCallback<SendMessageRequestSendMessageResultcallback = null;
         if (handler != null) {
             callback = new QueueBufferCallback<SendMessageRequestSendMessageResult>(handlerrequest);
         }
         QueueBufferFuture<SendMessageRequestSendMessageResultfuture = .sendMessage(requestcallback);
         future.setBuffer(this);
         return future;
     }

    
Sends a message to SQS and returns the SQS reply.

Returns:
never null
 
         Future<SendMessageResultfuture = sendMessage(requestnull);
         return waitForFuture(future);
     }

    
Asynchronously deletes a message from SQS.

Returns:
a Future object that will be notified when the operation is completed; never null
        QueueBufferCallback<DeleteMessageRequestVoidcallback = null;
        if (handler != null) {
            callback = new QueueBufferCallback<DeleteMessageRequestVoid>(handlerrequest);
        }
        QueueBufferFuture<DeleteMessageRequestVoidfuture = .deleteMessage(requestcallback);
        future.setBuffer(this);
        return future;
    }

    
Deletes a message from SQS. Does not return until a confirmation from SQS has been received

Returns:
never null
    public void deleteMessageSync(DeleteMessageRequest request) {
        Future<Voidfuture = deleteMessage(requestnull);
        waitForFuture(future);
    }

    
asynchronously adjust a message's visibility timeout to SQS.

Returns:
a Future object that will be notified when the operation is completed; never null
                                                AsyncHandler<ChangeMessageVisibilityRequestVoidhandler) {
        QueueBufferCallback<ChangeMessageVisibilityRequestVoidcallback = null;
        if (handler != null) {
            callback = new QueueBufferCallback<ChangeMessageVisibilityRequestVoid>(handlerrequest);
        }
                callback);
        future.setBuffer(this);
        return future;
    }

    
Changes visibility of a message in SQS. Does not return until a confirmation from SQS has been received.
        Future<Voidfuture = .changeMessageVisibility(requestnull);
        waitForFuture(future);
    }

    
Submits a request to receive some messages from SQS.

Returns:
a Future object that will be notified when the operation is completed; never null;
                                                       AsyncHandler<ReceiveMessageRequestReceiveMessageResulthandler) {
        // only handle simple requests, because these are the settings we pre-fetch with
        boolean noAttributes = (rq.getAttributeNames() == null) || rq.getAttributeNames().isEmpty();
        boolean noMessageAttributesToRetrieve = (rq.getMessageAttributeNames() == null)
                || rq.getMessageAttributeNames().isEmpty();
        boolean bufferngEnabled = (.getMaxInflightReceiveBatches() > 0)
                && (.getMaxDoneReceiveBatches() > 0);
        if (noMessageAttributesToRetrieve && noAttributes && bufferngEnabled && (rq.getVisibilityTimeout() == null)) {
            QueueBufferCallback<ReceiveMessageRequestReceiveMessageResultcallback = null;
            if (handler != null) {
                callback = new QueueBufferCallback<ReceiveMessageRequestReceiveMessageResult>(handlerrq);
            }
                    rqcallback);
            future.setBuffer(this);
            return future;
        } else {
            return .receiveMessageAsync(rq);
        }
    }

    
Retrieves messages from an SQS queue.

Returns:
never null
        Future<ReceiveMessageResultfuture = receiveMessage(rqnull);
        return waitForFuture(future);
    }

    
Shuts down the queue buffer. Once this method has been called, the queue buffer is not operational and all subsequent calls to it may fail
    public void shutdown() {
        // send buffer does not require shutdown, only
        // shut down receive buffer
        .shutdown();
    }

    
this method carefully waits for futures. If waiting throws, it converts the exceptions to the exceptions that SQS clients expect. This is what we use to turn asynchronous calls into synchronous ones
    private <ResultType> ResultType waitForFuture(Future<ResultType> future) {
        ResultType toReturn = null;
        try {
            toReturn = future.get();
        } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            AmazonClientException ce = new AmazonClientException(
                    "Thread interrupted while waiting for execution result");
            ce.initCause(ie);
            throw ce;
        } catch (ExecutionException ee) {
            // if the cause of the execution exception is an SQS exception, extract it
            // and throw the extracted exception to the clients
            // otherwise, wrap ee in an SQS exception and throw that.
            Throwable cause = ee.getCause();
            if (cause instanceof AmazonClientException) {
                throw (AmazonClientExceptioncause;
            }
            AmazonClientException ce = new AmazonClientException(
                    "Caught an exception while waiting for request to complete...");
            ce.initCause(ee);
            throw ce;
        }
        return toReturn;
    }

    
We need daemon threads in our executor so that we don't keep the process running if our executor threads are the only ones left in the process.
    private static class DaemonThreadFactory implements ThreadFactory {
        static AtomicInteger threadCount = new AtomicInteger(0);
        public Thread newThread(Runnable r) {
            int threadNumber = .addAndGet(1);
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("SQSQueueBufferWorkerThread-" + threadNumber);
            return thread;
        }
    }
New to GrepCode? Check out our FAQ X