Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2012 International Business Machines Corp.
   * 
   * See the NOTICE file distributed with this work for additional information
   * regarding copyright ownership. Licensed under the Apache License, 
   * Version 2.0 (the "License"); you may not use this file except in compliance
   * with the License. You may obtain a copy of the License at
   * 
   *   http://www.apache.org/licenses/LICENSE-2.0
  * 
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.ibm.jbatch.container.impl;
 
 import java.util.List;
 
 
Change the name of this class to something else!! Or change BaseStepControllerImpl.
 
 public abstract class BaseStepControllerImpl implements IExecutionElementController {
 
 	private final static String sourceClass = BatchletStepControllerImpl.class.getName();
 	private final static Logger logger = Logger.getLogger();
 
 	protected JobInstance jobInstance;
 
 	protected Step step;
 	protected StepStatus stepStatus;
 
 
 	protected long rootJobExecutionId;
 
 	protected static IBatchKernelService batchKernel = ServicesManagerImpl.getInstance().getBatchKernelService();
 
 
 
 
 	protected BaseStepControllerImpl(RuntimeJobExecution jobExecutionStep stepStepContextImpl stepContextlong rootJobExecutionId) {
 		this. = jobExecution;
 		this. = jobExecution.getJobInstance();
 		this. = stepContext;
 		this. = rootJobExecutionId;
 		if (step == null) {
 			throw new IllegalArgumentException("Step parameter to ctor cannot be null.");
 		}
 		this. = step;
 	}
 	
 	protected BaseStepControllerImpl(RuntimeJobExecution jobExecutionStep stepStepContextImpl stepContext,  long rootJobExecutionIdBlockingQueue<PartitionDataWrapperanalyzerStatusQueue) {
 		this(jobExecutionstepstepContextrootJobExecutionId);
 		this. = analyzerStatusQueue;
 	}
	///////////////////////////
	// ABSTRACT METHODS ARE HERE
	///////////////////////////
	protected abstract void setupStepArtifacts();
	protected abstract void invokePreStepArtifacts();
	protected abstract void invokePostStepArtifacts();
	// This is only useful from the partition threads
	protected abstract void sendStatusFromPartitionToAnalyzerIfPresent();
		// Here we're just setting up to decide if we're going to run the step or not (if it's already complete and 
		// allow-start-if-complete=false.
		try {
			boolean executeStep = shouldStepBeExecuted();
			if (!executeStep) {
				.fine("Not going to run this step.  Returning previous exit status of: " + .getExitStatus());
catch (Throwable t) {
			// Treat an error at this point as unrecoverable, so fail job too.
			rethrowWithWarning("Caught throwable while determining if step should be executed.  Failing job."t);
		}
		// At this point we have a StepExecution.  Setup so that we're ready to invoke artifacts.
		try {
catch (Throwable t) {
			// Treat an error at this point as unrecoverable, so fail job too.
			rethrowWithWarning("Caught throwable while starting step.  Failing job."t);
		}
		// At this point artifacts are in the picture so we want to try to invoke afterStep() on a failure.
		try {
			invokePreStepArtifacts();    //Call PartitionReducer and StepListener(s)
catch (Exception e) {
			// We're going to continue on so that we can execute the afterStep() and analyzer
			try {
				StringWriter sw = new StringWriter();
				PrintWriter pw = new PrintWriter(sw);
				.warning("Caught exception executing step: " + sw.toString());
catch(Throwable t) {
				// Since the first one is the original first failure, let's rethrow t1 and not the second error,
				// but we'll log a severe error pointing out that the failure didn't get persisted..
				// We won't try to call the afterStep() in this case either.
				StringWriter sw = new StringWriter();
				PrintWriter pw = new PrintWriter(sw);
				rethrowWithSevere("ERROR. PERSISTING BATCH STATUS FAILED.  STEP EXECUTION STATUS TABLES MIGHT HAVE CONSISTENCY ISSUES" +
						"AND/OR UNEXPECTED ENTRIES."t);
			}
catch (Throwable t) {
			StringWriter sw = new StringWriter();
			PrintWriter pw = new PrintWriter(sw);
			.warning("Failing both step AND job after catching error executing step: " + sw.toString());
		}
		//
		// At this point we may have already failed the step, but we still try to invoke the end of step artifacts.
		//
		try {
			//Call PartitionAnalyzer, PartitionReducer and StepListener(s)
catch (Throwable t) {
			StringWriter sw = new StringWriter();
			PrintWriter pw = new PrintWriter(sw);
			.warning("Error invoking end of step artifacts. Stack trace: " + sw.toString());
		}
		//
		// No more application code is on the path from here on out (excluding the call to the PartitionAnalyzer
		// analyzeStatus().  If an exception bubbles up and leaves the statuses inconsistent or incorrect then so be it; 
		// maybe there's a runtime bug that will need to be fixed.
		// 
		try {
			// Now that all step-level artifacts have had a chance to run, 
			// we set the exit status to one of the defaults if it is still unset.
			// This is going to be the very last sequence of calls from the step running on the main thread,
			// since the call back to the partition analyzer only happens on the partition threads.
			// On the partition threads, then, we harden the status at the partition level before we
			// send it back to the main thread.
catch (Throwable t) {
			// Don't let an exception caught here prevent us from persisting the failed batch status.
			rethrowWithWarning("Failure ending step execution"t);
		//
		// Only happens on main thread.
		//
		.finer("Returning step batchStatus: " + .getBatchStatus() + 
				", exitStatus: " + .getExitStatus()); 
else {
		}
	}
	private void defaultExitStatusIfNecessary() {
		String stepExitStatus = .getExitStatus();
		if (stepExitStatus != null) {
			.fine("Returning with user-set exit status: " + stepExitStatus);
else if (processRetVal != null) {
			.fine("Returning with exit status from batchlet.process(): " + processRetVal);
			.setExitStatus(processRetVal);
else {
			.fine("Returning with default exit status");
		}
	}	
	private void markStepFailed() {
	}
	protected void markJobAndStepFailed() {
	}
	private void startStep() {
		// Update status
		//Set Step context properties
		//Set up step artifacts like step listeners, partition reducers
		// Move batch status to started.
		long time = System.currentTimeMillis();
		Timestamp startTS = new Timestamp(time);
	}


The only valid states at this point are STARTED,STOPPING, or FAILED. been able to get to STOPPED, or COMPLETED yet at this point in the code.
	private void transitionToFinalBatchStatus() {
		BatchStatus currentBatchStatus = .getBatchStatus();
		if (currentBatchStatus.equals(.)) {
else if (currentBatchStatus.equals(.)) {
else if (currentBatchStatus.equals(.)) {
			updateBatchStatus(.);           // Should have already been done but maybe better for possible code refactoring to have it here.
else {
			throw new IllegalStateException("Step batch status should not be in a " + currentBatchStatus.name() + " state");
		}
	}
	protected void updateBatchStatus(BatchStatus updatedBatchStatus) {
		.fine("Updating batch status from : " + .getBatchStatus() + ", to: " + updatedBatchStatus);
		.setBatchStatus(updatedBatchStatus);
		.setBatchStatus(updatedBatchStatus);
	}
	protected boolean shouldStepBeExecuted() {
			.finer("In shouldStepBeExecuted() with stepContext =  " + this.);
		}
		if ( == null) {
			.finer("No existing step status found.  Create new step execution and proceed to execution.");
			// create new step execution
			// create new step status for this run
			return true;
else {
			.finer("Existing step status found.");
			// if a step status already exists for this instance id. It means this
			// is a restart and we need to get the previously persisted data
				// Seems better to let the start count get incremented without getting a step execution than
				// vice versa (in an unexpected error case).
				// create new step execution
				return true;
else {
				return false;
			}
		}
	}
	private boolean shouldStepBeExecutedOnRestart() {
		BatchStatus stepBatchStatus = .getBatchStatus();
		if (stepBatchStatus.equals(.)) {
			// A bit of parsing involved since the model gives us a String not a
			// boolean, but it should default to 'false', which is the spec'd default.
				.fine("Step: " + .getId() + " already has batch status of COMPLETED, so won't be run again since it does not allow start if complete.");
				return false;
else {
				.fine("Step: " + .getId() + " already has batch status of COMPLETED, and allow-start-if-complete is set to 'true'");
			}
		}
		// The spec default is '0', which we get by initializing to '0' in the next line
		int startLimit = 0;
		String startLimitString = .getStartLimit();
		if (startLimitString != null) {
			try {
				startLimit = Integer.parseInt(startLimitString);
catch (NumberFormatException e) {
				throw new IllegalArgumentException("Could not parse start limit value.  Received NumberFormatException for start-limit value:  " + startLimitString 
" for stepId: " + .getId() + ", with start-limit=" + .getStartLimit());
			}
		}
		if (startLimit < 0) {
			throw new IllegalArgumentException("Found negative start-limit of " + startLimit + "for stepId: " + .getId());
		}
		if (startLimit > 0) {
			int newStepStartCount = .getStartCount() + 1;
			if (newStepStartCount > startLimit) {
				throw new IllegalStateException("For stepId: " + .getId() + ", tried to start step for the " + newStepStartCount
" time, but startLimit = " + startLimit);
else {
				.fine("Starting (possibly restarting) step: " + .getId() + ", since newStepStartCount = " + newStepStartCount
" and startLimit=" + startLimit);
			}
		}
		return true;
	}
	protected void statusStarting() {
	}
	protected void persistUserData() {
		ByteArrayOutputStream persistentBAOS = new ByteArrayOutputStream();
		ObjectOutputStream persistentDataOOS = null;
		try {
			persistentDataOOS = new ObjectOutputStream(persistentBAOS);
			persistentDataOOS.close();
catch (Exception e) {
			throw new BatchContainerServiceException("Cannot persist the persistent user data for the step."e);
		}
	}
		// set the end time metric before flushing
		long time = System.currentTimeMillis();
		Timestamp endTS = new Timestamp(time);
	}
	private StepExecutionImpl getNewStepExecution(long rootJobExecutionIdStepContextImpl stepContext) {
		return .createStepExecution(rootJobExecutionIdstepContext);
	}
	private void setContextProperties() {
		if (jslProps != null) {
			for (Property property : jslProps.getPropertyList()) {
				contextProps.setProperty(property.getName(), property.getValue());
			}	
		}
		// set up metrics
	}
	public void setStepContext(StepContextImpl stepContext) {
		this. = stepContext;
	}
	}
	public void setAnalyzerQueue(BlockingQueue<PartitionDataWrapperanalyzerQueue) {
		this. = analyzerQueue;
	}
    @Override
    public List<LonggetLastRunStepExecutions() {
        
        List<LongstepExecIdList = new ArrayList<Long>(1);
        stepExecIdList.add(this..getLastRunStepExecutionId());
        
        return stepExecIdList;
    }
	private void rethrowWithMsg(String msgBeginningThrowable tLevel level) {
		String errorMsg = msgBeginning + " ; Caught exception/error: " + t.getLocalizedMessage();
		StringWriter sw = new StringWriter();
		PrintWriter pw = new PrintWriter(sw);
		.log(levelerrorMsg + " : Stack trace: " + sw.toString());
		throw new BatchContainerRuntimeException(errorMsgt);
	}
	private void rethrowWithWarning(String msgBeginningThrowable t) {
		rethrowWithMsg(msgBeginningt.);
	}
	private void rethrowWithSevere(String msgBeginningThrowable t) {
		rethrowWithMsg(msgBeginningt.);
	}
	public String toString() {
		return "BaseStepControllerImpl for step = " + .getId();
	}
New to GrepCode? Check out our FAQ X