Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2002-2007 the original author or authors.
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *      http://www.apache.org/licenses/LICENSE-2.0
   *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
 package org.springframework.batch.repeat.support;
 
 
An implementation of the ResultQueue that throttles the number of expected results, limiting it to a maximum at any given time.

Author(s):
Dave Syer
 
 public class ResultHolderResultQueue implements ResultQueue<ResultHolder> {
 
 	// Accumulation of result objects as they finish.
 	private final BlockingQueue<ResultHolderresults;
 
 	// Accumulation of dummy objects flagging expected results in the future.
 	private final Semaphore waits;
 
 	private final Object lock = new Object();
 
 	private volatile int count = 0;

Parameters:
throttleLimit the maximum number of results that can be expected at any given time.
 
 	public ResultHolderResultQueue(int throttleLimit) {
 		 = new Semaphore(throttleLimit);
 	}
 
 	public boolean isEmpty() {
 		return .isEmpty();
 	}
 
 	/*
 	 * (non-Javadoc)
 	 * 
 	 * @see org.springframework.batch.repeat.support.ResultQueue#isExpecting()
 	 */
 	public boolean isExpecting() {
 		// Base the decision about whether we expect more results on a
 		// counter of the number of expected results actually collected.
 		// Do not synchronize! Otherwise put and expect can deadlock.
 		return  > 0;
 	}

Tell the queue to expect one more result. Blocks until a new result is available if already expecting too many (as determined by the throttle limit).

 
 	public void expect() throws InterruptedException {
 		// Don't acquire the lock in a synchronized block - might deadlock
 		synchronized () {
 			++;
 		}
 	}
 
 	public void put(ResultHolder holderthrows IllegalArgumentException {
 		if (!isExpecting()) {
 			throw new IllegalArgumentException("Not expecting a result.  Call expect() before put().");
 		}
 		.add(holder);
 		// Take from the waits queue now to allow another result to
 		// accumulate. But don't decrement the counter.
 		synchronized () {
 		}
 	}

Get the next result as soon as it becomes available.

Release result immediately if:
  • There is a result that is continuable.
Otherwise block if either:
  • There is no result (as per contract of ResultQueue).
  • The number of results is less than the number expected.
Error if either:
  • Not expecting.
  • Interrupted.

		if (!isExpecting()) {
			throw new NoSuchElementException("Not expecting a result.  Call expect() before take().");
		}
		synchronized () {
			value = .take();
			if (isContinuable(value)) {
				// Decrement the counter only when the result is collected.
				return value;
			}
		}
		.put(value);
		synchronized () {
			while ( > .size()) {
			}
			value = .take();
		}
		return value;
	}
	private boolean isContinuable(ResultHolder value) {
		return value.getResult() != null && value.getResult().isContinuable();
	}

Compares ResultHolders so that one that is continuable ranks lowest.

Author(s):
Dave Syer
	private static class ResultHolderComparator implements Comparator<ResultHolder> {
		public int compare(ResultHolder h1ResultHolder h2) {
			RepeatStatus result1 = h1.getResult();
			RepeatStatus result2 = h2.getResult();
			if (result1 == null && result2 == null) {
				return 0;
			}
			if (result1 == null) {
				return -1;
			}
			else if (result2 == null) {
				return 1;
			}
			if ((result1.isContinuable() && result2.isContinuable())
					|| (!result1.isContinuable() && !result2.isContinuable())) {
				return 0;
			}
			if (result1.isContinuable()) {
				return -1;
			}
			return 1;
		}
	}
New to GrepCode? Check out our FAQ X