Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2006-2007 the original author or authors.
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *      http://www.apache.org/licenses/LICENSE-2.0
   *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
 package org.springframework.batch.repeat.support;
 
 import java.util.List;
 
Simple implementation and base class for batch templates implementing org.springframework.batch.repeat.RepeatOperations. Provides a framework including interceptors and policies. Subclasses just need to provide a method that gets the next result and one that waits for all the results to be returned from concurrent processes or threads.
N.B. the template accumulates thrown exceptions during the iteration, and they are all processed together when the main loop ends (i.e. finished processing the items). Clients that do not want to stop execution when an exception is thrown can use a specific org.springframework.batch.repeat.CompletionPolicy that does not finish when exceptions are received. This is not the default behaviour.
Clients that want to take some business action when an exception is thrown by the org.springframework.batch.repeat.RepeatCallback can consider using a custom org.springframework.batch.repeat.RepeatListener instead of trying to customise the org.springframework.batch.repeat.CompletionPolicy. This is generally a friendlier interface to implement, and the org.springframework.batch.repeat.RepeatListener.after(org.springframework.batch.repeat.RepeatContext,org.springframework.batch.repeat.RepeatStatus) method is passed in the result of the callback, which would be an instance of java.lang.Throwable if the business processing had thrown an exception. If the exception is not to be propagated to the caller, then a non-default org.springframework.batch.repeat.CompletionPolicy needs to be provided as well, but that could be off the shelf, with the business action implemented only in the interceptor.

Author(s):
Dave Syer
 
 public class RepeatTemplate implements RepeatOperations {
 
 	protected Log logger = LogFactory.getLog(getClass());
 
 	private RepeatListener[] listeners = new RepeatListener[] {};
 
 
Set the listeners for this template, registering them for callbacks at appropriate times in the iteration.

Parameters:
listeners
 
 	public void setListeners(RepeatListener[] listeners) {
 		this. = Arrays.asList(listeners).toArray(new RepeatListener[listeners.length]);
 	}

Register an additional listener.

Parameters:
listener
 
 	public void registerListener(RepeatListener listener) {
 		list.add(listener);
 		 = (RepeatListener[]) list.toArray(new RepeatListener[list.size()]);
 	}

Setter for exception handler strategy. The exception handler is called at the end of a batch, after the org.springframework.batch.repeat.CompletionPolicy has determined that the batch is complete. By default all exceptions are re-thrown.

	public void setExceptionHandler(ExceptionHandler exceptionHandler) {
		this. = exceptionHandler;
	}

Setter for policy to decide when the batch is complete. The default is to complete normally when the callback returns a org.springframework.batch.repeat.RepeatStatus which is not marked as continuable, and abnormally when the callback throws an exception (but the decision to re-throw the exception is deferred to the org.springframework.batch.repeat.exception.ExceptionHandler).

Parameters:
terminationPolicy a TerminationPolicy.
Throws:
java.lang.IllegalArgumentException if the argument is null
See also:
setExceptionHandler(org.springframework.batch.repeat.exception.ExceptionHandler)
	public void setCompletionPolicy(CompletionPolicy terminationPolicy) {
		Assert.notNull(terminationPolicy);
		this. = terminationPolicy;
	}

Execute the batch callback until the completion policy decides that we are finished. Wait for the whole batch to finish before returning even if the task executor is asynchronous.

	public RepeatStatus iterate(RepeatCallback callback) {
		RepeatContext outer = RepeatSynchronizationManager.getContext();
		try {
			// This works with an asynchronous TaskExecutor: the
			// interceptors have to wait for the child processes.
			result = executeInternal(callback);
		}
		finally {
			RepeatSynchronizationManager.clear();
			if (outer != null) {
				RepeatSynchronizationManager.register(outer);
			}
		}
		return result;
	}

Internal convenience method to loop over interceptors and batch callbacks.

Parameters:
callback the callback to process each element of the loop.
Returns:
the aggregate of ContinuationPolicy#canContinue(Object ) for all the results from the callback.
	private RepeatStatus executeInternal(final RepeatCallback callback) {
		// Reset the termination policy if there is one...
		RepeatContext context = start();
		// Make sure if we are already marked complete before we start then no
		// processing takes place.
		boolean running = !isMarkedComplete(context);
		for (int i = 0; i < .i++) {
			RepeatListener interceptor = [i];
			interceptor.open(context);
			running = running && !isMarkedComplete(context);
			if (!running)
				break;
		}
		// Return value, default is to allow continued processing.
		// This is the list of exceptions thrown by all active callbacks
		Collection<Throwablethrowables = state.getThrowables();
		// Keep a separate list of exceptions we handled that need to be
		// rethrown
		Collection<Throwabledeferred = new ArrayList<Throwable>();
		try {
			while (running) {
				/*
				 * Run the before interceptors here, not in the task executor so
				 * that they all happen in the same thread - it's easier for
				 * tracking batch status, amongst other things.
				 */
				for (int i = 0; i < .i++) {
					RepeatListener interceptor = [i];
					interceptor.before(context);
					// Allow before interceptors to veto the batch by setting
					// flag.
					running = running && !isMarkedComplete(context);
				}
				// Check that we are still running (should always be true) ...
				if (running) {
					try {
						result = getNextResult(contextcallbackstate);
						executeAfterInterceptors(contextresult);
					}
					catch (Throwable throwable) {
						doHandle(throwablecontextdeferred);
					}
					// N.B. the order may be important here:
					if (isComplete(contextresult) || isMarkedComplete(context) || !deferred.isEmpty()) {
						running = false;
					}
				}
			}
			result = result.and(waitForResults(state));
			for (Throwable throwable : throwables) {
				doHandle(throwablecontextdeferred);
			}
			// Explicitly drop any references to internal state...
			state = null;
		}
		/*
		 * No need for explicit catch here - if the business processing threw an
		 * exception it was already handled by the helper methods. An exception
		 * here is necessarily fatal.
		 */
		finally {
			try {
				if (!deferred.isEmpty()) {
					Throwable throwable = (Throwabledeferred.iterator().next();
					.debug("Handling fatal exception explicitly (rethrowing first of " + deferred.size() + "): "
throwable.getClass().getName() + ": " + throwable.getMessage());
					rethrow(throwable);
				}
			}
			finally {
				try {
					for (int i = .i-- > 0;) {
						RepeatListener interceptor = [i];
						interceptor.close(context);
					}
				}
				finally {
					context.close();
				}
			}
		}
		return result;
	}
	private void doHandle(Throwable throwableRepeatContext contextCollection<Throwabledeferred) {
		// An exception alone is not sufficient grounds for not
		// continuing
		Throwable unwrappedThrowable = unwrapIfRethrown(throwable);
		try {
			for (int i = .i-- > 0;) {
				RepeatListener interceptor = [i];
				// This is not an error - only log at debug
				// level.
				.debug("Exception intercepted (" + (i + 1) + " of " + . + ")"unwrappedThrowable);
				interceptor.onError(contextunwrappedThrowable);
			}
			.debug("Handling exception: " + throwable.getClass().getName() + ", caused by: "
unwrappedThrowable.getClass().getName() + ": " + unwrappedThrowable.getMessage());
			.handleException(contextunwrappedThrowable);
		}
		catch (Throwable handled) {
			deferred.add(handled);
		}
	}

Re-throws the original throwable if it is unchecked, wraps checked exceptions into org.springframework.batch.repeat.RepeatException.
	private static void rethrow(Throwable throwablethrows RuntimeException {
		if (throwable instanceof Error) {
			throw (Errorthrowable;
		}
		else if (throwable instanceof RuntimeException) {
			throw (RuntimeExceptionthrowable;
		}
		else {
			throw new RepeatException("Exception in batch process"throwable);
		}
	}

Unwraps the throwable if it has been wrapped by rethrow(java.lang.Throwable).
	private static Throwable unwrapIfRethrown(Throwable throwable) {
		if (throwable instanceof RepeatException) {
			return throwable.getCause();
		}
		else {
			return throwable;
		}
	}

Create an internal state object that is used to store data needed internally in the scope of an iteration. Used by subclasses to manage the queueing and retrieval of asynchronous results. The default just provides an accumulation of Throwable instances for processing at the end of the batch.

	}

Get the next completed result, possibly executing several callbacks until one finally finishes. Normally a subclass would have to override both this method and createInternalState(org.springframework.batch.repeat.RepeatContext) because the implementation of this method would rely on the details of the internal state.

Parameters:
context current BatchContext.
callback the callback to execute.
state maintained by the implementation.
Returns:
a finished result.
See also:
isComplete(org.springframework.batch.repeat.RepeatContext)
createInternalState(org.springframework.batch.repeat.RepeatContext)
			throws Throwable {
		update(context);
			.debug("Repeat operation about to start at count=" + context.getStartedCount());
		}
		return callback.doInIteration(context);
	}

If necessary, wait for results to come back from remote or concurrent processes. By default does nothing and returns true.

Parameters:
state the internal state.
Returns:
true if canContinue(org.springframework.batch.repeat.RepeatStatus) is true for all results retrieved.
	protected boolean waitForResults(RepeatInternalState state) {
		// no-op by default
		return true;
	}

Check return value from batch operation.

Parameters:
value the last callback result.
Returns:
true if the value is org.springframework.batch.repeat.RepeatStatus.CONTINUABLE.
	protected final boolean canContinue(RepeatStatus value) {
		return ((RepeatStatusvalue).isContinuable();
	}
	private boolean isMarkedComplete(RepeatContext context) {
		boolean complete = context.isCompleteOnly();
		if (context.getParent() != null) {
			complete = complete || isMarkedComplete(context.getParent());
		}
		if (complete) {
			.debug("Repeat is complete according to context alone.");
		}
		return complete;
	}

Convenience method to execute after interceptors on a callback result.

Parameters:
context the current batch context.
value the result of the callback to process.
	protected void executeAfterInterceptors(final RepeatContext contextRepeatStatus value) {
		// Don't re-throw exceptions here: let the exception handler deal with
		// that...
		if (value != null && value.isContinuable()) {
			for (int i = .i-- > 0;) {
				RepeatListener interceptor = [i];
				interceptor.after(contextvalue);
			}
		}
	}

	protected boolean isComplete(RepeatContext contextRepeatStatus result) {
		boolean complete = .isComplete(contextresult);
		if (complete) {
			.debug("Repeat is complete according to policy and result value.");
		}
		return complete;
	}

	protected boolean isComplete(RepeatContext context) {
		boolean complete = .isComplete(context);
		if (complete) {
			.debug("Repeat is complete according to policy alone not including result.");
		}
		return complete;
	}

	protected RepeatContext start() {
		RepeatContext parent = RepeatSynchronizationManager.getContext();
		RepeatContext context = .start(parent);
		RepeatSynchronizationManager.register(context);
		.debug("Starting repeat context.");
		return context;
	}

	protected void update(RepeatContext context) {
	}
New to GrepCode? Check out our FAQ X