Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2012 International Business Machines Corp.
   * 
   * See the NOTICE file distributed with this work for additional information
   * regarding copyright ownership. Licensed under the Apache License, 
   * Version 2.0 (the "License"); you may not use this file except in compliance
   * with the License. You may obtain a copy of the License at
   * 
   *   http://www.apache.org/licenses/LICENSE-2.0
  * 
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.ibm.jbatch.container.impl;
 
 import java.util.List;
 
 
 
 
 	private final static String sourceClass = PartitionedStepControllerImpl.class.getName();
 	private final static Logger logger = Logger.getLogger();
 
 	private static final int DEFAULT_PARTITION_INSTANCES = 1;
 	private static final int DEFAULT_THREADS = 0; //0 means default to number of instances
 
 	private PartitionPlan plan = null;
 
 	private int threads = ;
 
 	private Properties[] partitionProperties = null;
 
 
 
 	// On invocation this will be re-primed to reflect already-completed partitions from a previous execution.
 
 
 	final List<JSLJobsubJobs = new ArrayList<JSLJob>();
 	protected List<StepListenerProxystepListeners = null;
 
 	
 
 	protected PartitionedStepControllerImpl(final RuntimeJobExecution jobExecutionImplfinal Step stepStepContextImpl stepContextlong rootJobExecutionId) {
 		super(jobExecutionImplstepstepContextrootJobExecutionId);
 	}
 
 	public void stop() {
 
		// It's possible we may try to stop a partitioned step before any
		// sub steps have been started.
		synchronized () {
			if ( != null) {
					try {
catch (Exception e) {
						// TODO - Is this what we want to know.  
						// Blow up if it happens to force the issue.
						throw new IllegalStateException(e);
					}
				}
			}
		}
	}
		// Determine the number of partitions
		PartitionPlan plan = null;
		Integer previousNumPartitions = null;
		final PartitionMapper partitionMapper = .getPartition().getMapper();
		//from persisted plan from previous run
		if (.getNumPartitions() != null) {
			previousNumPartitions = .getNumPartitions();
		}
		if (partitionMapper != null) { //from partition mapper
			PartitionMapperProxy partitionMapperProxy;
			final List<PropertypropertyList = partitionMapper.getProperties() == null ? null
partitionMapper.getProperties().getPropertyList();
			// Set all the contexts associated with this controller.
			// Some of them may be null
					propertyList);
			try {
				partitionMapperProxy = ProxyFactory.createPartitionMapperProxy(
						partitionMapper.getRef(), injectionRef);
catch (final ArtifactValidationException e) {
						"Cannot create the PartitionMapper ["
partitionMapper.getRef() + "]"e);
			}
			PartitionPlan mapperPlan = partitionMapperProxy.mapPartitions();
			//Set up the new partition plan
			plan = new BatchPartitionPlan();
			//When true is specified, the partition count from the current run
			//is used and all results from past partitions are discarded.
			if (mapperPlan.getPartitionsOverride() || previousNumPartitions == null){
				plan.setPartitions(mapperPlan.getPartitions());
else {
				plan.setPartitions(previousNumPartitions);
			}
			if (mapperPlan.getThreads() == 0) {
else {
				plan.setThreads(mapperPlan.getThreads());    
			}
				.fine("Partition plan defined by partition mapper: " + plan);
			}
else if (.getPartition().getPlan() != null) { //from static partition element in jsl
			String partitionsAttr = .getPartition().getPlan().getPartitions();
			String threadsAttr = null;
			int numPartitions = .;
			int numThreads;
			Properties[] partitionProps = null;
			if (partitionsAttr != null) {
				try {
					numPartitions = Integer.parseInt(partitionsAttr);
catch (final NumberFormatException e) {
					throw new IllegalArgumentException("Could not parse partition instances value in stepId: " + .getId()
", with instances=" + partitionsAttre);
				}   
				partitionProps = new Properties[numPartitions];
				if (numPartitions < 1) {
					throw new IllegalArgumentException("Partition instances value must be 1 or greater in stepId: " + .getId()
", with instances=" + partitionsAttr);
				}
			}
			threadsAttr = .getPartition().getPlan().getThreads();
			if (threadsAttr != null) {
				try {
					numThreads = Integer.parseInt(threadsAttr);
					if (numThreads == 0) {
						numThreads = numPartitions;
					}
catch (final NumberFormatException e) {
					throw new IllegalArgumentException("Could not parse partition threads value in stepId: " + .getId()
", with threads=" + threadsAttre);
				}   
				if (numThreads < 0) {
					throw new IllegalArgumentException("Threads value must be 0 or greater in stepId: " + .getId()
", with threads=" + threadsAttr);
				}
else { //default to number of partitions if threads isn't set
				numThreads = numPartitions;
			}
			if (.getPartition().getPlan().getProperties() != null) {
				for (JSLProperties props : jslProperties) {
					int targetPartition = Integer.parseInt(props.getPartition());
                    try {
                        partitionProps[targetPartition] = CloneUtility.jslPropertiesToJavaProperties(props);
                    } catch (ArrayIndexOutOfBoundsException e) {
                        throw new BatchContainerRuntimeException("There are only " + numPartitions + " partition instances, but there are "
                                + jslProperties.size()
                                + " partition properties lists defined. Remember that partition indexing is 0 based like Java arrays."e);
                    }
                }
			}
			plan = new BatchPartitionPlan();
			plan.setPartitions(numPartitions);
			plan.setThreads(numThreads);
			plan.setPartitionProperties(partitionProps);
			plan.setPartitionsOverride(false); //FIXME what is the default for a static plan??
		}
		// Set the other instance variables for convenience.
		this. = plan.getPartitions();
		this. = plan.getThreads();
		return plan;
	}
		this. = this.generatePartitionPlan();
		//persist the partition plan so on restart we have the same plan to reuse
		/* When true is specified, the partition count from the current run
		 * is used and all results from past partitions are discarded. Any
		 * resource cleanup or back out of work done in the previous run is the
		 * responsibility of the application. The PartitionReducer artifact's
		 * rollbackPartitionedStep method is invoked during restart before any
		 * partitions begin processing to provide a cleanup hook.
		 */
			if (this. != null) {
			}
		}
		.fine("Number of partitions in step: " +  + " in step " + .getId() + "; Subjob properties defined by partition mapper: " + );
		//Set up a blocking queue to pick up collector data from a partitioned thread
		if (this. != null) {
		}
		// Build all sub jobs from partitioned step
		// kick off the threads
		// Deal with the results.
	}
		synchronized () {		
			//check if we've already issued a stop
				.fine("Step already in STOPPING state, exiting from buildSubJobBatchWorkUnits() before beginning execution");
				return;
			}
			for (int instance = 0; instance < instance++) {
			}
			// Then build all the subjobs but do not start them yet
else {
			}
			// NOTE:  At this point I might not have as many work units as I had partitions, since some may have already completed.
		}
	}
			.fine("Step already in STOPPING state, exiting from executeAndWaitForCompletion() before beginning execution");
			return;
		}
		int numTotalForThisExecution = .size();
		this. =  - numTotalForThisExecution
		int numCurrentCompleted = 0;
		int numCurrentSubmitted = 0;
		.fine("Calculated that " +  + " partitions are already complete out of total # = " 
 + ", with # remaining =" + numTotalForThisExecution);
		//Start up to to the max num we are allowed from the num threads attribute
		for (int i=0; i < this. && i < numTotalForThisExecutioni++, numCurrentSubmitted++) {
else {
			}
		}
		boolean readyToSubmitAnother = false;
		while (true) {
			.finer("Begin main loop in waitForQueueCompletion(), readyToSubmitAnother = " + readyToSubmitAnother);
			try {
				if ( != null) {
					.fine("Found analyzer, proceeding on analyzerQueue path");
						.finer("Analyze collector data: " + dataWrapper.getCollectorData());
						continue// without being ready to submit another
else if (..equals(dataWrapper.getEventType())) {
						.fine("Analyze status called for completed partition: batchStatus= " + dataWrapper.getBatchstatus() + ", exitStatus = " + dataWrapper.getExitStatus());
						.add(.take());  // Shouldn't be a a long wait.
						readyToSubmitAnother = true;
else {
						.warning("Invalid partition state");
						throw new IllegalStateException("Invalid partition state");
					}
else {
					.fine("No analyzer, proceeding on completedWorkQueue path");
					// block until at least one thread has finished to
					// submit more batch work. hold on to the finished work to look at later
					readyToSubmitAnother = true;
				}
catch (InterruptedException e) {
				.severe("Caught exc"e);
			}
			if (readyToSubmitAnother) {
				numCurrentCompleted++;
				.fine("Ready to submit another (if there is another left to submit); numCurrentCompleted = " + numCurrentCompleted);
				if (numCurrentCompleted < numTotalForThisExecution) {
					if (numCurrentSubmitted < numTotalForThisExecution) {
						.fine("Submitting # " + numCurrentSubmitted + " out of " + numTotalForThisExecution + " total for this execution");
						if (.getStartCount() > 1) {
else {
						}
						readyToSubmitAnother = false;
					}
else {
					.fine("Finished... breaking out of loop");
					break;
				}
else {
				.fine("Not ready to submit another."); // Must have just done a collector
			}
		}
	}        
	private void checkCompletedWork() {
			.fine("Check completed work list.");
		}

check the batch status of each subJob after it's done to see if we need to issue a rollback start rollback if any have stopped or failed
		boolean rollback = false;
		boolean partitionFailed = false;
		for (final BatchWorkUnit subJob : ) {
			if (batchStatus.equals(.)) {
				.fine("Subjob " + subJob.getJobExecutionImpl().getExecutionId() + " ended with status '" + batchStatus + "'; Starting logical transaction rollback.");
				rollback = true;
				partitionFailed = true;
				//Keep track of the failing status and throw an exception to propagate after the rest of the partitions are complete
		}
		//If rollback is false we never issued a rollback so we can issue a logicalTXSynchronizationBeforeCompletion
		//NOTE: this will get issued even in a subjob fails or stops if no logicalTXSynchronizationRollback method is provied
		//We are assuming that not providing a rollback was intentional
		if (rollback == true) {
			if (this. != null) {
			}
			if (partitionFailed) {
				throw new BatchContainerRuntimeException("One or more partitions failed");
			}
else {
			if (this. != null) {
			}
		}
	}
	protected void setupStepArtifacts() {
		InjectionReferences injectionRef = null;
		if (analyzer != null) {
			final List<PropertypropList = analyzer.getProperties() == null ? null : analyzer.getProperties()
			injectionRef = new InjectionReferences(.getJobContext(), propList);
			try {
				 = ProxyFactory.createPartitionAnalyzerProxy(analyzer.getRef(), injectionRef);
catch (final ArtifactValidationException e) {
				throw new BatchContainerServiceException("Cannot create the analyzer [" + analyzer.getRef() + "]"e);
			}
		PartitionReducer partitionReducer = .getPartition().getReducer();
		if (partitionReducer != null) {
			final List<PropertypropList = partitionReducer.getProperties() == null ? null : partitionReducer.getProperties()
			injectionRef = new InjectionReferences(.getJobContext(), propList);
			try {
				this. = ProxyFactory.createPartitionReducerProxy(partitionReducer.getRef(), injectionRef);
catch (final ArtifactValidationException e) {
				throw new BatchContainerServiceException("Cannot create the analyzer [" + partitionReducer.getRef() + "]",
						e);
			}
		}
	}
	protected void invokePreStepArtifacts() {
		if ( != null) {
			for (StepListenerProxy listenerProxy : ) {
				// Call beforeStep on all the step listeners
				listenerProxy.beforeStep();
			}
		}
		// Invoke the reducer before all parallel steps start (must occur
		// before mapper as well)
		if (this. != null) {
		}
	}
	protected void invokePostStepArtifacts() {
		// Invoke the reducer after all parallel steps are done
		if (this. != null) {
			}else {
			}
		}
		// Called in spec'd order, e.g. Sec. 11.7
		if ( != null) {
			for (StepListenerProxy listenerProxy : ) {
				// Call afterStep on all the step listeners
				listenerProxy.afterStep();
			}
		}
	}
		// Since we're already on the main thread, there will never
		// be anything to do on this thread.  It's only on the partitioned
		// threads that there is something to send back.
	}
New to GrepCode? Check out our FAQ X