Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2006-2007 the original author or authors.
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *      http://www.apache.org/licenses/LICENSE-2.0
   *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
 package org.springframework.batch.repeat.support;
 
Provides org.springframework.batch.repeat.RepeatOperations support including interceptors that can be used to modify or monitor the behaviour at run time.
This implementation is sufficient to be used to configure transactional behaviour for each item by making the org.springframework.batch.repeat.RepeatCallback transactional, or for the whole batch by making the execute method transactional (but only then if the task executor is synchronous).
This class is thread safe if its collaborators are thread safe (interceptors, terminationPolicy, callback). Normally this will be the case, but clients need to be aware that if the task executor is asynchronous, then the other collaborators should be also. In particular the org.springframework.batch.repeat.RepeatCallback that is wrapped in the execute method must be thread safe - often it is based on some form of data source, which itself should be both thread safe and transactional (multiple threads could be accessing it at any given time, and each thread would have its own transaction).

Author(s):
Dave Syer
 
 public class TaskExecutorRepeatTemplate extends RepeatTemplate {

 
 	public static final int DEFAULT_THROTTLE_LIMIT = 4;
 
 
 	private TaskExecutor taskExecutor = new SyncTaskExecutor();

Public setter for the throttle limit. The throttle limit is the largest number of concurrent tasks that can be executing at one time - if a new task arrives and the throttle limit is breached we wait for one of the executing tasks to finish before submitting the new one to the org.springframework.core.task.TaskExecutor. Default value is DEFAULT_THROTTLE_LIMIT. N.B. when used with a thread pooled org.springframework.core.task.TaskExecutor the thread pool might prevent the throttle limit actually being reached (so make the core pool size larger than the throttle limit if possible).

Parameters:
throttleLimit the throttleLimit to set.
 
 	public void setThrottleLimit(int throttleLimit) {
 		this. = throttleLimit;
 	}

Setter for task executor to be used to run the individual item callbacks.

Parameters:
taskExecutor a TaskExecutor
Throws:
java.lang.IllegalArgumentException if the argument is null
 
 	public void setTaskExecutor(TaskExecutor taskExecutor) {
 		Assert.notNull(taskExecutor);
 		this. = taskExecutor;
 	}

Use the setTaskExecutor(org.springframework.core.task.TaskExecutor) to generate a result. The internal state in this case is a queue of unfinished result holders of type ResultHolder. The holder with the return value should not be on the queue when this method exits. The queue is scoped in the calling method so there is no need to synchronize access.
 
 	protected RepeatStatus getNextResult(RepeatContext contextRepeatCallback callbackRepeatInternalState state)
 			throws Throwable {
		ExecutingRunnable runnable = null;
		do {
			/*
			 * Wrap the callback in a runnable that will add its result to the
			 * queue when it is ready.
			 */
			runnable = new ExecutingRunnable(callbackcontextqueue);

Tell the runnable that it can expect a result. This could have been in-lined with the constructor, but it might block, so it's better to do it here, since we have the option (it's a private class).
			runnable.expect();
			/*
			 * Start the task possibly concurrently / in the future.
			 */
			/*
			 * Allow termination policy to update its state. This must happen
			 * immediately before or after the call to the task executor.
			 */
			update(context);
			/*
			 * Keep going until we get a result that is finished, or early
			 * termination...
			 */
while (queue.isEmpty() && !isComplete(context));
		/*
		 * N.B. If the queue is empty then take() blocks until a result appears,
		 * and there must be at least one because we just submitted one to the
		 * task executor.
		 */
		ResultHolder result = queue.take();
		if (result.getError() != null) {
			throw result.getError();
		}
		return result.getResult();
	}

Wait for all the results to appear on the queue and execute the after interceptors for each one.

	protected boolean waitForResults(RepeatInternalState state) {
		boolean result = true;
		while (queue.isExpecting()) {
			/*
			 * Careful that no runnables that are not going to finish ever get
			 * onto the queue, else this may block forever.
			 */
			ResultHolder future;
			try {
				future = (ResultHolderqueue.take();
			}
			catch (InterruptedException e) {
				throw new RepeatException("InterruptedException while waiting for result.");
			}
			if (future.getError() != null) {
				state.getThrowables().add(future.getError());
			}
			else {
				RepeatStatus status = future.getResult();
				result = result && canContinue(status);
			}
		}
		Assert.state(queue.isEmpty(), "Future results queue should be empty at end of batch.");
		return result;
	}
		// Queue of pending results:
	}

A runnable that puts its result on a queue when it is done.

Author(s):
Dave Syer
	private class ExecutingRunnable implements RunnableResultHolder {
		private final RepeatCallback callback;
		private final RepeatContext context;
		private final ResultQueue<ResultHolderqueue;
		private volatile RepeatStatus result;
		private volatile Throwable error;
		public ExecutingRunnable(RepeatCallback callbackRepeatContext contextResultQueue<ResultHolderqueue) {
			super();
			this. = callback;
			this. = context;
			this. = queue;
		}

Tell the queue to expect a result.
		public void expect() {
			try {
			}
			catch (InterruptedException e) {
				throw new RepeatException("InterruptedException waiting for to acquire lock on input.");
			}
		}

Execute the batch callback, and store the result, or any exception that is thrown for retrieval later by caller.

		public void run() {
			boolean clearContext = false;
			try {
				if (RepeatSynchronizationManager.getContext() == null) {
					clearContext = true;
					RepeatSynchronizationManager.register();
				}
					.debug("Repeat operation about to start at count=" + .getStartedCount());
				}
			}
			catch (Exception e) {
				 = e;
			}
			finally {
				if (clearContext) {
					RepeatSynchronizationManager.clear();
				}
				.put(this);
			}
		}

Get the result - never blocks because the queue manages waiting for the task to finish.
		public RepeatStatus getResult() {
			return ;
		}

Get the error - never blocks because the queue manages waiting for the task to finish.
		public Throwable getError() {
			return ;
		}

Getter for the context.
			return this.;
		}
	}

Author(s):
Dave Syer
	private static class ResultQueueInternalState extends RepeatInternalStateSupport {
		private final ResultQueue<ResultHolderresults;

Parameters:
throttleLimit the throttle limit for the result queue
		public ResultQueueInternalState(int throttleLimit) {
			super();
			this. = new ResultHolderResultQueue(throttleLimit);
		}

Returns:
the result queue
			return ;
		}
	}
New to GrepCode? Check out our FAQ X