Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
   /*
    * Copyright 2012 International Business Machines Corp.
    * 
    * See the NOTICE file distributed with this work for additional information
    * regarding copyright ownership. Licensed under the Apache License, 
    * Version 2.0 (the "License"); you may not use this file except in compliance
    * with the License. You may obtain a copy of the License at
    * 
    *   http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  package com.ibm.jbatch.container.impl;
  
  import java.util.List;
  
  
  
  
  	private final static String sourceClass = ChunkStepControllerImpl.class.getName();
  	private final static Logger logger = Logger.getLogger();
  
  	private Chunk chunk = null;
  	private ItemReaderProxy readerProxy = null;
  	private ItemProcessorProxy processorProxy = null;
  	private ItemWriterProxy writerProxy = null;
  	private CheckpointAlgorithm chkptAlg = null;
  	private ServicesManager servicesManager = ServicesManagerImpl.getInstance();
  	private SkipHandler skipHandler = null;
 
 	// metrics
 	long readCount = 0;
 	long writeCount = 0;
 	long readSkipCount = 0;
 	long processSkipCount = 0;
 	long writeSkipCount = 0;
 	boolean rollbackRetry = false;
 
 	public ChunkStepControllerImpl(RuntimeJobExecution jobExecutionImplStep stepStepContextImpl stepContextlong rootJobExecutionIdBlockingQueue<PartitionDataWrapperanalyzerStatusQueue) {
 		super(jobExecutionImplstepstepContextrootJobExecutionIdanalyzerStatusQueue);
 	}

Utility Class to hold statuses at each level of Read-Process-Write loop
 
 	private class ItemStatus {
 
 		public boolean isSkipped() {
 			return ;
 		}
 
 		public void setSkipped(boolean skipped) {
 			this. = skipped;
 		}
 
 		public boolean isFiltered() {
 			return ;
 		}
 
 		public void setFiltered(boolean filtered) {
 			this. = filtered;
 		}
 
 		public boolean isCheckPointed() {
 			return ;
 		}
 
 		public void setCheckPointed(boolean checkPointed) {
 			this. = checkPointed;
 		}
 
 		public boolean isFinished() {
 			return ;
 		}
 
 		public void setFinished(boolean finished) {
 			this. = finished;
 		}
 
 		public boolean isRetry() {
 			return ;
 		}
 
 		public void setRetry(boolean retry) {
 			this. = retry;
 		}
 
 		public boolean isRollback() {
 			return ;
 		}
 
 		public void setRollback(boolean rollback) {
 			this. = rollback;
 		}
 
 		private boolean skipped = false;
 		private boolean filtered = false;
 		private boolean finished = false;
 		private boolean checkPointed = false;
 		private boolean retry = false;
 		private boolean rollback = false;
 
 	}

We read and process one item at a time but write in chunks (group of items). So, this method loops until we either reached the end of the reader (not more items to read), or the writer buffer is full or a checkpoint is triggered.

Parameters:
chunkSize write buffer size
theStatus flags when the read-process reached the last record or a checkpoint is required
Returns:
an array list of objects to write
 
 	private List<ObjectreadAndProcess(int chunkSizeItemStatus theStatus) {
 		.entering("readAndProcess"new Object[] { chunkSizetheStatus });
 
 		List<ObjectchunkToWrite = new ArrayList<Object>();
 		Object itemRead = null;
 		Object itemProcessed = null;
 		int readProcessedCount = 0;
 
 		while (true) {
 			ItemStatus status = new ItemStatus();
 			itemRead = readItem(status);
 
 			if (status.isRollback()) {
 				theStatus.setRollback(true);
 				// inc rollbackCount
 				break;
 			}
 
 			if (!status.isSkipped() && !status.isFinished()) {
 				itemProcessed = processItem(itemReadstatus);
 
 				if (status.isRollback()) {
 					theStatus.setRollback(true);
 					// inc rollbackCount
 					break;
 				}
 
 				if (!status.isSkipped() && !status.isFiltered()) {
 					chunkToWrite.add(itemProcessed);
 					readProcessedCount++;
 				}
 			}
 
 			theStatus.setFinished(status.isFinished());
 
 			// This will force the current item to finish processing on a stop
 			// request
 				theStatus.setFinished(true);
 			}
 
 			// write buffer size reached
 			if ((readProcessedCount == chunkSize) && (.getCheckpointType() != "custom")) {
 				break;
 			}
 
 			// checkpoint reached
 			if (theStatus.isCheckPointed()) {
 				break;
 			}
 
 			// last record in readerProxy reached
 			if (theStatus.isFinished()) {
 				break;
 			}
 
 		}
 		.exiting("readAndProcess"chunkToWrite);
 		return chunkToWrite;
 	}

Reads an item from the reader

Parameters:
status flags the current read status
Returns:
the item read
 
 	private Object readItem(ItemStatus status) {
 		.entering("readItem"status);
 		Object itemRead = null;
 
 		try {
 			// call read listeners before and after the actual read
 			for (ItemReadListenerProxy readListenerProxy : ) {
 				readListenerProxy.beforeRead();
 			}
 
 			itemRead = .readItem();
 
 			for (ItemReadListenerProxy readListenerProxy : ) {
 				readListenerProxy.afterRead(itemRead);
 			}
 
 			// itemRead == null means we reached the end of
 			// the readerProxy "resultset"
 			status.setFinished(itemRead == null);
 			if (!status.isFinished()) {
 			}
 		} catch (Exception e) {
 			for (ItemReadListenerProxy readListenerProxy : ) {
 				readListenerProxy.onReadError(e);
 			}
 			if(!) {
 				if (retryReadException(e)) {
 					for (ItemReadListenerProxy readListenerProxy : ) {
 						readListenerProxy.onReadError(e);
 					}
 					// if not a rollback exception, just retry the current item
 						itemRead = readItem(status);
 					} else {
 						status.setRollback(true);
 						 = true;
 						// inc rollbackCount
 					}
 				}
 				else if(skipReadException(e)) {
 					status.setSkipped(true);
 
 				}
 				else {
 				}
 			}
 			else {
 				// coming from a rollback retry
 					status.setSkipped(true);
 
 				}
 				else if (retryReadException(e)) {
 						itemRead = readItem(status);
 					}
 					else {
 						status.setRollback(true);
 						// inc rollbackCount
 					}
 				}
 				else {
 				}
 			}
 
 		} catch (Throwable e) {
 		}
 
 		.exiting("readItem"itemRead==null ? "<null>" : itemRead);
 		return itemRead;
 	}

Process an item previously read by the reader

Parameters:
itemRead the item read
status flags the current process status
Returns:
the processed item
 
 	private Object processItem(Object itemReadItemStatus status) {
 		.entering("processItem"new Object[] { itemReadstatus });
 		Object processedItem = null;
 
 		// if no processor defined for this chunk
 		if ( == null){
 			return itemRead;
 		}
 
 		try {
 
 			// call process listeners before and after the actual process call
 			for (ItemProcessListenerProxy processListenerProxy : ) {
 				processListenerProxy.beforeProcess(itemRead);
 			}
 
 			processedItem = .processItem(itemRead);
 
 			if (processedItem == null) {
 				// inc filterCount
 				status.setFiltered(true);
 			}
 
 			for (ItemProcessListenerProxy processListenerProxy : ) {
 				processListenerProxy.afterProcess(itemReadprocessedItem);
 			}
 		} catch (Exception e) {
 			for (ItemProcessListenerProxy processListenerProxy : ) {
 				processListenerProxy.onProcessError(processedIteme);
 			}
 			if(!) {
 				if (retryProcessException(eitemRead)) {
 						// call process listeners before and after the actual
 						// process call
 						for (ItemProcessListenerProxy processListenerProxy : ) {
 							processListenerProxy.beforeProcess(itemRead);
 						}
 						processedItem = processItem(itemReadstatus);
 						if (processedItem == null) {
 							// inc filterCount
 							status.setFiltered(true);
 						}
 
 						for (ItemProcessListenerProxy processListenerProxy : ) {
 							processListenerProxy.afterProcess(itemReadprocessedItem);
 						}
 					} else {
 						status.setRollback(true);
 						 = true;
 						// inc rollbackCount
 					}
 				}
 				else if (skipProcessException(eitemRead)) {
 					status.setSkipped(true);
 				}
 				else {
 				}
 			}
 			else {
 				if (skipProcessException(eitemRead)) {
 					status.setSkipped(true);
 				} else if (retryProcessException(eitemRead)) {
 						// call process listeners before and after the actual
 						// process call
 						for (ItemProcessListenerProxy processListenerProxy : ) {
 							processListenerProxy.beforeProcess(itemRead);
 						}
 						processedItem = processItem(itemReadstatus);
 						if (processedItem == null) {
 							// inc filterCount
 							status.setFiltered(true);
 						}
 
 						for (ItemProcessListenerProxy processListenerProxy : ) {
 							processListenerProxy.afterProcess(itemReadprocessedItem);
 						}
 					} else {
 						status.setRollback(true);
 						 = true;
 						// inc rollbackCount
 					}
 				} else {
 				}
 			}
 
 		} catch (Throwable e) {
 		}
 
 		.exiting("processItem"processedItem==null ? "<null>" : processedItem);
 		return processedItem;
 	}

Writes items

Parameters:
theChunk the array list with all items processed ready to be written
 
 	private void writeChunk(List<ObjecttheChunkItemStatus status) {
 		.entering("writeChunk"theChunk);
 		if (!theChunk.isEmpty()) {
 			try {
 
 				// call read listeners before and after the actual read
 				for (ItemWriteListenerProxy writeListenerProxy : ) {
 					writeListenerProxy.beforeWrite(theChunk);
 				}
 
 
 				for (ItemWriteListenerProxy writeListenerProxy : ) {
 					writeListenerProxy.afterWrite(theChunk);
 				}
 			} catch (Exception e) {
 				for (ItemWriteListenerProxy writeListenerProxy : ) {
 					writeListenerProxy.onWriteError(theChunke);
 				}
 				{
 					if (retryWriteException(etheChunk)) {
 							writeChunk(theChunkstatus);
 						} else {
 							 = true;
 							status.setRollback(true);
 							// inc rollbackCount
 						}
 					} else if (skipWriteException(etheChunk)) {
 					} else {
 					}
 
 				}
 				else {
 					if (skipWriteException(etheChunk)) {
 					} else if (retryWriteException(etheChunk)) {
 							status.setRetry(true);
 							writeChunk(theChunkstatus);
 						} else {
 							 = true;
 							status.setRollback(true);
 							// inc rollbackCount
 						}
 					} else {
 					}
 				}
 
 			} catch (Throwable e) {
 			}
 		}
 		.exiting("writeChunk");
 	}

Main Read-Process-Write loop

 
 	private void invokeChunk() {
 		.entering("invokeChunk2");
 
 		int itemCount = ChunkHelper.getItemCount();
 		int timeInterval = ChunkHelper.getTimeLimit();
 		List<ObjectchunkToWrite = new ArrayList<Object>();
 		boolean checkPointed = true;
 		boolean rollback = false;
 		Throwable caughtThrowable = null;
 
 		// begin new transaction at first iteration or after a checkpoint commit
 
 		try {
 
 			while (true) {
 
 				if (checkPointed || rollback) {
 					if (this..getCheckpointType() == "custom" ){
 						int newtimeOut = this..checkpointTimeout();
 					}
 					for (ChunkListenerProxy chunkProxy : ) {
 						chunkProxy.beforeChunk();
 					}
 
 					if (rollback) {
 					}
 				}
 
 				ItemStatus status = new ItemStatus();
 
 				if (rollback) {
 					rollback = false;
 				}
 
 				chunkToWrite = readAndProcess(itemCountstatus);
 
 				if (status.isRollback()) {
 					itemCount = 1;
 					rollback = true;
 
 
 
 					continue;
 				}
 
 				writeChunk(chunkToWritestatus);
 
 				if (status.isRollback()) {
 					itemCount = 1;
 					rollback = true;
 
 
 
 					continue;
 				}
 				checkPointed = status.isCheckPointed();
 
 				// we could finish the chunk in 3 conditions: buffer is full,
 				// checkpoint, not more input
 				if (status.isCheckPointed() || status.isFinished()) {
 					// TODO: missing before checkpoint listeners
 					// 1.- check if spec list proper steps for before checkpoint
 					// 2.- ask Andy about retry
 					// 3.- when do we stop?
 
 
 							for (ChunkListenerProxy chunkProxy : ) {
 								chunkProxy.afterChunk();
 							}
 
 
 
 
 
 
 							// exit loop when last record is written
 							if (status.isFinished()) {
 
 
 								// increment commitCount
 								break;
 							} else {
 								// increment commitCount
 							}
 
 				}
 
 			}
 		} catch (Exception e) {
 			caughtThrowable = e;
 			.log(."Failure in Read-Process-Write Loop"e);
 			// Only try to call onError() if we have an Exception, but not an Error.
 			for (ChunkListenerProxy chunkProxy : ) {
 				try {
 					chunkProxy.onError(e);
 				} catch (Exception e1) {
 					StringWriter sw = new StringWriter();
 					PrintWriter pw = new PrintWriter(sw);
 					.warning("Caught secondary exception when calling chunk listener onError() with stack trace: " + sw.toString() + "\n. Will continue to remaining chunk listeners (if any) and rethrow wrapping the primary exception.");
 				}
 			}
 		} catch (Throwable t) {
 			caughtThrowable = t;
 			.log(."Failure in Read-Process-Write Loop"t);
 		} finally {
 			if (caughtThrowable != null) {
 				.warning("Caught throwable in chunk processing. Attempting to close all readers and writers.");
 				.exiting("invokeChunk");
 				throw new BatchContainerRuntimeException("Failure in Read-Process-Write Loop"caughtThrowable);
 			} else {
 				.finest("Exiting normally");
 				.exiting("invokeChunk");
 			}
 		}
 	}
 
 	protected void invokeCoreStep() throws BatchContainerServiceException {
 
 		this. = .getChunk();
 
 		
 	}
 
 	private CheckpointAlgorithm getCheckpointAlgorithm(int itemCountint timeInterval) {
 		CheckpointAlgorithm alg = null;
 
 		if (.getCheckpointType() == "item") {
 			alg = new ItemCheckpointAlgorithm();
 			((ItemCheckpointAlgorithmalg).setThresholds(itemCounttimeInterval);
 		} else { // custom chkpt alg
 		}
 
 		return alg;
 	}
 
 	/*
 	 * Initialize itemreader, itemwriter, and item processor checkpoint
 	 */
 	private void initializeChunkArtifacts() {
 		String sourceMethod = "initializeChunkArtifacts";
 			.entering(sourceMethod);
 
 		int itemCount = ChunkHelper.getItemCount();
 		int timeInterval = ChunkHelper.getTimeLimit();
 		String checkpointPolicy = ChunkHelper.getCheckpointPolicy();
 
 		ItemReader itemReader = .getReader();
 		List<PropertyitemReaderProps = itemReader.getProperties() == null ? null : itemReader.getProperties().getPropertyList();
 		try {
 					itemReaderProps);
 
 			 = ProxyFactory.createItemReaderProxy(itemReader.getRef(), injectionRef);
 
 				.fine("Created ItemReaderProxy for " + itemReader.getRef());
 			}
 		} catch (ArtifactValidationException e) {
 			throw new BatchContainerServiceException("Cannot create the ItemReader [" + itemReader.getRef() + "]"e);
 		}
 
 		ItemProcessor itemProcessor = .getProcessor();
 		if (itemProcessor != null){
 			List<PropertyitemProcessorProps = itemProcessor.getProperties() == null ? null : itemProcessor.getProperties().getPropertyList();
 			try {
 
 						itemProcessorProps);
 
 				 = ProxyFactory.createItemProcessorProxy(itemProcessor.getRef(), injectionRef);
 					.fine("Created ItemProcessorProxy for " + itemProcessor.getRef());
 				}
 			} catch (ArtifactValidationException e) {
 				throw new BatchContainerServiceException("Cannot create the ItemProcessor [" + itemProcessor.getRef() + "]"e);
 			}
 		}
 
 		ItemWriter itemWriter = .getWriter();
 		List<PropertyitemWriterProps = itemWriter.getProperties() == null ? null : itemWriter.getProperties().getPropertyList();
 		try {
 					itemWriterProps);
 
 			 = ProxyFactory.createItemWriterProxy(itemWriter.getRef(), injectionRef);
 				.fine("Created ItemWriterProxy for " + itemWriter.getRef());
 			}
 		} catch (ArtifactValidationException e) {
 			throw new BatchContainerServiceException("Cannot create the ItemWriter [" + itemWriter.getRef() + "]"e);
 		}
 
 		try {
 			List<PropertypropList = null;
 
 			if (.getCheckpointAlgorithm() != null) {
 
 			}
 
 					propList);
 
 			 = CheckpointAlgorithmFactory.getCheckpointAlgorithmProxy(injectionRef);
 				.fine("Created CheckpointAlgorithmProxy for policy [" + checkpointPolicy + "]");
 			}
 		} catch (ArtifactValidationException e) {
 			throw new BatchContainerServiceException("Cannot create the CheckpointAlgorithm for policy [" + .getCheckpointPolicy()
 					+ "]"e);
 		}
 
 				null);
 
 
 		if (.getCheckpointType() == "item") {
 			((ItemCheckpointAlgorithm).setThresholds(itemCounttimeInterval);
 		} else { // custom chkpt alg
 		}
 
 			.fine("Setting contexts for chunk artifacts");
 		}
 
 			.fine("Initialize checkpoint manager with item-count=" + itemCount);
 		.fine("Initialize checkpoint manager with time-interval=" + timeInterval);
 
 
 
 
 
 			.exiting(sourceMethod);
 	}
 
 	private void openReaderAndWriter() {
 		String sourceMethod = "openReaderAndWriter";
 
 			.entering(sourceMethod);
 
 		try {
 
 			// check for data in backing store
 			if (readerChkptData != null) {
 
 				byte[] readertoken = readerChkptData.getRestartToken();
 				ByteArrayInputStream readerChkptBA = new ByteArrayInputStream(readertoken);
 				TCCLObjectInputStream readerOIS = null;
 				try {
 					readerOIS = new TCCLObjectInputStream(readerChkptBA);
 					readerOIS.close();
 				} catch (Exception ex) {
 					// is this what I should be throwing here?
 							throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + .getId() + "]"ex);
 				}
 			} else {
 				// no chkpt data exists in the backing store
 				readerChkptData = null;
 			}
 		} catch (ClassCastException e) {
 			.warning("Expected CheckpointData but found" + readerChkptData );
 			throw new IllegalStateException("Expected CheckpointData but found" + readerChkptData );
 		}
 
 
 		try {
 			// check for data in backing store
 			if (writerChkptData != null) {
 				byte[] writertoken = writerChkptData.getRestartToken();
 				ByteArrayInputStream writerChkptBA = new ByteArrayInputStream(writertoken);
 				TCCLObjectInputStream writerOIS = null;
 				try {
 					writerOIS = new TCCLObjectInputStream(writerChkptBA);
 					writerOIS.close();
 				} catch (Exception ex) {
 					// is this what I should be throwing here?
 							throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + .getId() + "]"ex);
 				}
 			} else {
 				// no chkpt data exists in the backing store
 				writerChkptData = null;
 			}
 		} catch (ClassCastException e) {
 			.warning("Expected Checkpoint but found" + writerChkptData);
 			throw new IllegalStateException("Expected Checkpoint but found" + writerChkptData);
 		}
 
 		// set up metrics
 		// stepContext.addMetric(MetricImpl.Counter.valueOf("READ_COUNT"), 0);
 		// stepContext.addMetric(MetricImpl.Counter.valueOf("WRITE_COUNT"), 0);
 		// stepContext.addMetric(MetricImpl.Counter.valueOf("READ_SKIP_COUNT"),
 		// 0);
 		// stepContext.addMetric(MetricImpl.Counter.valueOf("PROCESS_SKIP_COUNT"),
 		// 0);
 		// stepContext.addMetric(MetricImpl.Counter.valueOf("WRITE_SKIP_COUNT"),
 		// 0);
 
 			.exiting(sourceMethod);
 	}
 
 	public void stop() {
 
 		// we don't need to call stop on the chunk implementation here since a
 		// chunk always returns control to
 		// the batch container after every item.
 
 	}
 
 	boolean skipReadException(Exception e) {
 
 		try {
 		} catch (BatchContainerRuntimeException bcre) {
 			return false;
 		}
 
 		return true;
 
 	}
 
 	boolean retryReadException(Exception e) {
 
 		try {
 		} catch (BatchContainerRuntimeException bcre) {
 			return false;
 		}
 
 		return true;
 
 	}
 
 	boolean skipProcessException(Exception eObject record) {
 
 		try {
 		} catch (BatchContainerRuntimeException bcre) {
 			return false;
 		}
 
 		return true;
 
 	}
 
 	boolean retryProcessException(Exception eObject record) {
 
 		try {
 		} catch (BatchContainerRuntimeException bcre) {
 			return false;
 		}
 
 		return true;
 
 	}
 
 	boolean skipWriteException(Exception eList<ObjectchunkToWrite) {
 
 
 
 		try {
 		} catch (BatchContainerRuntimeException bcre) {
 			return false;
 		}
 
 
 		return true;
 
 	}
 
 	boolean retryWriteException(Exception eList<ObjectchunkToWrite) {
 
 		try {
 		} catch (BatchContainerRuntimeException bcre) {
 			return false;
 		}
 
 		return true;
 
 	}
 
 	private void positionReaderAtCheckpoint() {
 
 		try {
 			// check for data in backing store
 			if (readerData != null) {
 				byte[] readertoken = readerData.getRestartToken();
 				ByteArrayInputStream readerChkptBA = new ByteArrayInputStream(readertoken);
 				TCCLObjectInputStream readerOIS = null;
 				try {
 					readerOIS = new TCCLObjectInputStream(readerChkptBA);
					readerOIS.close();
catch (Exception ex) {
					// is this what I should be throwing here?
							throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + .getId() + "]"ex);
else {
				// no chkpt data exists in the backing store
				readerData = null;
catch (ClassCastException e) {
			throw new IllegalStateException("Expected CheckpointData but found" + readerData);
	private void positionWriterAtCheckpoint() {
		try {
			// check for data in backing store
			if (writerData != null) {
				byte[] writertoken = writerData.getRestartToken();
				ByteArrayInputStream writerChkptBA = new ByteArrayInputStream(writertoken);
				TCCLObjectInputStream writerOIS = null;
				try {
					writerOIS = new TCCLObjectInputStream(writerChkptBA);
					writerOIS.close();
catch (Exception ex) {
					// is this what I should be throwing here?
							throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + .getId() + "]"ex);
else {
				// no chkpt data exists in the backing store
				writerData = null;
catch (ClassCastException e) {
			throw new IllegalStateException("Expected CheckpointData but found" + writerData);