Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2011-2015 Amazon Technologies, Inc.
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at:
   *
   *    http://aws.amazon.com/apache2.0
   *
  * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
  * OR CONDITIONS OF ANY KIND, either express or implied. See the
  * License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.amazonaws.services.dynamodbv2.datamodeling;
 
 import java.util.List;
 
 
 public class ParallelScanTask {

    
The list of hard copies of ScanRequest with different segment number.
 
     private final List<ScanRequestparallelScanRequests;
 
     private final int totalSegments;

    
Cache all the future tasks, so that we can extract the exception when we see failed segment scan.
 
     private final List<Future<ScanResult>> segmentScanFutureTasks;

    
Cache all the most recent ScanResult on each segment.
 
     private final List<ScanResultsegmentScanResults;

    
The current state of the scan on each segment. Used as the monitor for synchronization.
 
     private final List<SegmentScanStatesegmentScanStates;
 
     private ExecutorService executorService;
 
     private final AmazonDynamoDB dynamo;
 
     @Deprecated
     public ParallelScanTask(DynamoDBMapper mapperAmazonDynamoDB dynamoList<ScanRequestparallelScanRequests) {
         this(dynamoparallelScanRequests);
     }
 
     ParallelScanTask(AmazonDynamoDB dynamoList<ScanRequestparallelScanRequests) {
         this. = dynamo;
         this. = parallelScanRequests;
         this. = parallelScanRequests.size();
          = Executors.newCachedThreadPool();
 
         // Create synchronized views of the list to guarantee any changes are visible across all threads.
          = Collections.synchronizedList(new ArrayList<ScanResult>());
 
         initSegmentScanStates();
     }
 
     String getTableName() {
         return .get(0).getTableName();
     }
 
     public boolean isAllSegmentScanFinished() {
         synchronized() {
             for (int segment = 0; segment < segment++) {
                 if (.get(segment) != .)
                     return false;
             }
             // Shut down if all data have been scanned and loaded.
             .shutdown();
             return true;
         }
     }
 
     public List<ScanResultgetNextBatchOfScanResults() throws AmazonClientException {
        
Kick-off all the parallel scan tasks.
        startScanNextPages();
        
Wait till all the tasks have finished.
        synchronized() {
            while (.contains(.)
                    || .contains(.)) {
                try {
                    .wait();
                } catch (InterruptedException ie) {
                    throw new AmazonClientException("Parallel scan interrupted by other thread."ie);
                }
            }
            
Keep the lock on segmentScanStates until all the cached results are marshaled and returned.
            return marshalParallelScanResults();
        }
    }
    private void startScanNextPages() {
        for (int segment = 0; segment < segment++) {
            final int currentSegment = segment;
            final SegmentScanState currentSegmentState = .get(currentSegment);
            
Assert: Should never see any task in state of "Scanning" when starting a new batch.
            if (currentSegmentState == .){
                throw new AmazonClientException("Should never see a 'Scanning' state when starting parallel scans.");
            }
            
Skip any failed or completed segment, and clear the corresponding cached result.
            else if (currentSegmentState == .
                    || currentSegmentState == .) {
                .set(currentSegmentnull);
                continue;
            }
            
Otherwise, submit a new future task and save it in segmentScanFutureTasks.
            else {
                // Update the state to "Scanning" and notify any waiting thread.
                synchronized() {
                    .set(currentSegment.);
                    .notifyAll();
                }
                Future<ScanResultfutureTask = .submit(new Callable<ScanResult>() {
                    @Override
                    public ScanResult call() throws Exception {
                        try {
                            if (currentSegmentState == .) {
                                return scanNextPageOfSegment(currentSegmenttrue);
                            }
                            else if (currentSegmentState == .) {
                                return scanNextPageOfSegment(currentSegmentfalse);
                            }
                            else {
                                throw new AmazonClientException("Should not start a new future task");
                            }
                        } catch (Exception e) {
                            synchronized() {
                                .set(currentSegment.);
                                .notifyAll();
                            }
                            throw e;
                        }
                    }
                });
                // Cache the future task (for getting the Exceptions in the working thread).
                .set(currentSegmentfutureTask);
            }
        }
    }
        List<ScanResultscanResults = new LinkedList<ScanResult>();
        for (int segment = 0; segment < segment++) {
            SegmentScanState currentSegmentState = .get(segment);
            
Rethrow the exception from any failed segment scan.
            if (currentSegmentState == .) {
                try {
                    .get(segment).get();
                    throw new AmazonClientException("No Exception found in the failed scan task.");
                } catch (ExecutionException ee) {
                    if ( ee.getCause() instanceof AmazonClientException) {
                        throw (AmazonClientException) (ee.getCause());
                    } else {
                        throw new AmazonClientException("Internal error during the scan on segment #" + segment + ".",
                                ee.getCause());
                    }
                } catch (Exception e) {
                    throw new AmazonClientException("Error during the scan on segment #" + segment + "."e);
                }
            }
            
Get the ScanResult from cache if the segment scan has finished.
            else if (currentSegmentState == .
                    || currentSegmentState == .) {
                ScanResult scanResult = .get(segment);
                scanResults.add(scanResult);
            }
            else if (currentSegmentState == .
                    || currentSegmentState == .){
                throw new AmazonClientException("Should never see a 'Scanning' or 'Waiting' state when marshalling parallel scan results.");
            }
        }
        return scanResults;
    }
    private ScanResult scanNextPageOfSegment(int currentSegmentboolean checkLastEvaluatedKey) {
        ScanRequest segmentScanRequest = .get(currentSegment);
        if (checkLastEvaluatedKey) {
            ScanResult lastScanResult = .get(currentSegment);
            segmentScanRequest.setExclusiveStartKey(lastScanResult.getLastEvaluatedKey());
        } else {
            segmentScanRequest.setExclusiveStartKey(null);
        }
        ScanResult scanResult = .scan(DynamoDBMapper.applyUserAgent(segmentScanRequest));

        
Cache the scan result in segmentScanResults. We should never try to get these scan results by calling get() on the cached future tasks.
        .set(currentSegmentscanResult);

        
Update the state and notify any waiting thread.
        synchronized() {
            if (null == scanResult.getLastEvaluatedKey())
                .set(currentSegment.);
            else
                .set(currentSegment.);
            .notifyAll();
        }
        return scanResult;
    }
    private void initSegmentScanStates() {
        for (int segment = 0; segment < segment++) {
            .add(null);
            .add(null);
        }
    }

    
Enumeration of the possible states of the scan on a segment.
    private static enum SegmentScanState {
        
The scan on the segment is waiting for resources to execute and has not started yet.
        Waiting,

        
The scan is in process, and hasn't finished yet.
        Scanning,

        
The scan has already failed.
        Failed,

        
The scan on the current page has finished, but there are more pages in the segment to be scanned.
        HasNextPage,

        
The scan on the whole segment has completed.
        SegmentScanCompleted,
    }
New to GrepCode? Check out our FAQ X