Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
   /*
    * Copyright 2012 International Business Machines Corp.
    * 
    * See the NOTICE file distributed with this work for additional information
    * regarding copyright ownership. Licensed under the Apache License, 
    * Version 2.0 (the "License"); you may not use this file except in compliance
    * with the License. You may obtain a copy of the License at
    * 
    *   http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  package com.ibm.jbatch.container.impl;
  
  import java.util.List;
  
  
  
  
  
      private final static String sourceClass = ChunkStepControllerImpl.class.getName();
      private final static Logger logger = Logger.getLogger();
  
      private Chunk chunk = null;
      private ItemReaderProxy readerProxy = null;
      private ItemProcessorProxy processorProxy = null;
      private ItemWriterProxy writerProxy = null;
      private CheckpointAlgorithmProxy checkpointProxy = null;
      private CheckpointAlgorithm chkptAlg = null;
      private CheckpointManager checkpointManager;
      private ServicesManager servicesManager = ServicesManagerImpl.getInstance();
      private SkipHandler skipHandler = null;
      CheckpointData readerChkptData = null;
      CheckpointData writerChkptData = null;
      List<ChunkListenerProxychunkListeners = null;
      private RetryHandler retryHandler;
  
     // metrics
     long readCount = 0;
     long writeCount = 0;
     long readSkipCount = 0;
     long processSkipCount = 0;
     long writeSkipCount = 0;
     boolean rollbackRetry = false;
 
     public ChunkStepControllerImpl(RuntimeJobExecutionHelper jobExecutionImplStep step) {
         super(jobExecutionImplstep);
         // TODO Auto-generated constructor stub
     }
 
     // TODO: complete refactoring, remove starts/end comment and remove old
     // read-process-write loop
     // mostly works but a few failures like
     // " WRITE: the chunk write did not at the correct boundry (idx) ->11"
     // need to debug some more
     /*
      * Refactoring starts here
      */

    
Utility Class to hold statuses at each level of Read-Process-Write loop
 
     private class ItemStatus {
 
         public boolean isSkipped() {
             return ;
         }
 
         public void setSkipped(boolean skipped) {
             this. = skipped;
         }
 
         public boolean isFiltered() {
             return ;
         }
 
         public void setFiltered(boolean filtered) {
             this. = filtered;
         }
 
         public boolean isCheckPointed() {
             return ;
         }
 
         public void setCheckPointed(boolean checkPointed) {
             this. = checkPointed;
         }
 
         public boolean isFinished() {
             return ;
         }
 
         public void setFinished(boolean finished) {
             this. = finished;
         }
 
         public boolean isRetry() {
             return ;
         }
 
         public void setRetry(boolean retry) {
             this. = retry;
         }
 
         public boolean isRollback() {
             return ;
         }
 
         public void setRollback(boolean rollback) {
             this. = rollback;
         }
 
         private boolean skipped = false;
         private boolean filtered = false;
         private boolean finished = false;
         private boolean checkPointed = false;
         private boolean retry = false;
         private boolean rollback = false;
 
     }

    
We read and process one item at a time but write in chunks (group of items). So, this method loops until we either reached the end of the reader (not more items to read), or the writer buffer is full or a checkpoint is triggered.

Parameters:
chunkSize write buffer size
theStatus flags when the read-process reached the last record or a checkpoint is required
Returns:
an array list of objects to write
 
     private List<ObjectreadAndProcess(int chunkSizeItemStatus theStatus) {
         .entering("readAndProcess"new Object[] { chunkSizetheStatus });
 
         List<ObjectchunkToWrite = new ArrayList<Object>();
         Object itemRead = null;
         Object itemProcessed = null;
         int readProcessedCount = 0;
 
         while (true) {
             ItemStatus status = new ItemStatus();
             itemRead = readItem(status);
 
             if (status.isRollback()) {
                 theStatus.setRollback(true);
                 // inc rollbackCount
                 break;
             }
 
             if (!status.isSkipped() && !status.isFinished()) {
                 itemProcessed = processItem(itemReadstatus);
 
                 if (status.isRollback()) {
                     theStatus.setRollback(true);
                     // inc rollbackCount
                     .getMetric(..).incValue();
                     break;
                 }
 
                 if (!status.isSkipped() && !status.isFiltered()) {
                     chunkToWrite.add(itemProcessed);
                     readProcessedCount++;
                 }
             }
 
             theStatus.setFinished(status.isFinished());
             theStatus.setCheckPointed(.ApplyCheckPointPolicy());
 
             // This will force the current item to finish processing on a stop
             // request
             if (.getBatchStatus().equals(.)) {
                 theStatus.setFinished(true);
             }
 
             // write buffer size reached
             if ((readProcessedCount == chunkSize) && (.getCheckpointType() != "custom")) {
                 break;
             }
 
             // checkpoint reached
             if (theStatus.isCheckPointed()) {
                 break;
             }
 
             // last record in readerProxy reached
             if (theStatus.isFinished()) {
                 break;
             }
 
         }
         .exiting("readAndProcess"chunkToWrite);
         return chunkToWrite;
     }

    
Reads an item from the reader

Parameters:
status flags the current read status
Returns:
the item read
 
     private Object readItem(ItemStatus status) {
         .entering("readItem"status);
         Object itemRead = null;
 
         try {
             // call read listeners before and after the actual read
             for (ItemReadListenerProxy readListenerProxy : ) {
                 readListenerProxy.beforeRead();
             }
 
             itemRead = .readItem();
 
             for (ItemReadListenerProxy readListenerProxy : ) {
                 readListenerProxy.afterRead(itemRead);
             }
 
             // itemRead == null means we reached the end of
             // the readerProxy "resultset"
             status.setFinished(itemRead == null);
             if (!status.isFinished()) {
                 .getMetric(..).incValue();
             }
         } catch (Exception e) {
         	.setException(e);
         	for (ItemReadListenerProxy readListenerProxy : ) {
                 readListenerProxy.onReadError(e);
             }
         	if(!) {
         		if (retryReadException(e)) {
         			for (ItemReadListenerProxy readListenerProxy : ) {
                         readListenerProxy.onReadError(e);
                     }
     				 // if not a rollback exception, just retry the current item
         			 if (!.isRollbackException(e)) {
                          itemRead = readItem(status);
         			 } else {
                          status.setRollback(true);
                           = true;
                          // inc rollbackCount
                          .getMetric(..).incValue();
                      }
         		}
         		else if(skipReadException(e)) {
         			status.setSkipped(true);
                     .getMetric(..).incValue();
 
         		}
         		else {
                     throw new BatchContainerRuntimeException(e);
                 }
         	}
         	else {
         		// coming from a rollback retry
         		if(skipReadException(e)) {
         			status.setSkipped(true);
                     .getMetric(..).incValue();
 
         		}
         		else if (retryReadException(e)) {
         			 if (!.isRollbackException(e)) {
                          itemRead = readItem(status);
         			 }
                      else {
                          status.setRollback(true);
                          // inc rollbackCount
                          .getMetric(..).incValue();
                      }
         		}
         		else {
                     throw new BatchContainerRuntimeException(e);
                 }
         	}
 
         } catch (Throwable e) {
             throw new BatchContainerRuntimeException(e);
         }
 
         .exiting("readItem"itemRead);
         return itemRead;
     }

    
Process an item previously read by the reader

Parameters:
itemRead the item read
status flags the current process status
Returns:
the processed item
 
     private Object processItem(Object itemReadItemStatus status) {
         .entering("processItem"new Object[] { itemReadstatus });
         Object processedItem = null;
 
         // if no processor defined for this chunk
         if ( == null){
         	return itemRead;
         }
         
         try {
 
             // call process listeners before and after the actual process call
             for (ItemProcessListenerProxy processListenerProxy : ) {
                 processListenerProxy.beforeProcess(itemRead);
             }
 
             processedItem = .processItem(itemRead);
 
             if (processedItem == null) {
                 // inc filterCount
                 .getMetric(..).incValue();
                 status.setFiltered(true);
             }
 
             for (ItemProcessListenerProxy processListenerProxy : ) {
                 processListenerProxy.afterProcess(itemReadprocessedItem);
             }
         } catch (Exception e) {
         	for (ItemProcessListenerProxy processListenerProxy : ) {
                 processListenerProxy.onProcessError(processedIteme);
             }
         	if(!) {
         		if (retryProcessException(eitemRead)) {
         			if (!.isRollbackException(e)) {
                         // call process listeners before and after the actual
                         // process call
                         for (ItemProcessListenerProxy processListenerProxy : ) {
                             processListenerProxy.beforeProcess(itemRead);
                         }
                         processedItem = processItem(itemReadstatus);
                         if (processedItem == null) {
                             // inc filterCount
                             .getMetric(..).incValue();
                             status.setFiltered(true);
                         }
 
                         for (ItemProcessListenerProxy processListenerProxy : ) {
                             processListenerProxy.afterProcess(itemReadprocessedItem);
                         }
                     } else {
                         status.setRollback(true);
                          = true;
                         // inc rollbackCount
                         .getMetric(..).incValue();
                     }
         		}
         		else if (skipProcessException(eitemRead)) {
         			status.setSkipped(true);
                     .getMetric(..).incValue();
         		}
         		else {
                     throw new BatchContainerRuntimeException(e);
                 }
         	}
         	else {
         		if (skipProcessException(eitemRead)) {
         			status.setSkipped(true);
                     .getMetric(..).incValue();
                  } else if (retryProcessException(eitemRead)) {
         			if (!.isRollbackException(e)) {
                         // call process listeners before and after the actual
                         // process call
                         for (ItemProcessListenerProxy processListenerProxy : ) {
                             processListenerProxy.beforeProcess(itemRead);
                         }
                         processedItem = processItem(itemReadstatus);
                         if (processedItem == null) {
                             // inc filterCount
                             .getMetric(..).incValue();
                             status.setFiltered(true);
                         }
 
                         for (ItemProcessListenerProxy processListenerProxy : ) {
                             processListenerProxy.afterProcess(itemReadprocessedItem);
                         }
                     } else {
                         status.setRollback(true);
                          = true;
                         // inc rollbackCount
                         .getMetric(..).incValue();
                     }
         		} else {
         			throw new BatchContainerRuntimeException(e);
                  }
         	}
 
         } catch (Throwable e) {
             throw new BatchContainerRuntimeException(e);
         }
 
         .exiting("processItem"processedItem);
         return processedItem;
     }

    
Writes items

Parameters:
theChunk the array list with all items processed ready to be written
 
     private void writeChunk(List<ObjecttheChunkItemStatus status) {
         .entering("writeChunk"theChunk);
         if (!theChunk.isEmpty()) {
             try {
 
                 // call read listeners before and after the actual read
                 for (ItemWriteListenerProxy writeListenerProxy : ) {
                     writeListenerProxy.beforeWrite(theChunk);
                 }
 
                 .writeItems(theChunk);
 
                 for (ItemWriteListenerProxy writeListenerProxy : ) {
                     writeListenerProxy.afterWrite(theChunk);
                 }
                 .getMetric(..).incValueBy(theChunk.size());
             } catch (Exception e) {
             	this..setException(e);
             	for (ItemWriteListenerProxy writeListenerProxy : ) {
                     writeListenerProxy.onWriteError(theChunke);
                 }
             	if(!)
             	{
             		if (retryWriteException(etheChunk)) {
                         if (!.isRollbackException(e)) {
                             writeChunk(theChunkstatus);
                         } else {
                         	 = true;
                             status.setRollback(true);
                             // inc rollbackCount
                             .getMetric(..).incValue();
                         }
                     } else if (skipWriteException(etheChunk)) {
                         .getMetric(..).incValueBy(1);
                     } else {
                     	throw new BatchContainerRuntimeException(e);
                     }
             		
             	}
             	else {
             		if (skipWriteException(etheChunk)) {
                         .getMetric(..).incValueBy(1);
                     } else if (retryWriteException(etheChunk)) {
                         if (!.isRollbackException(e)) {
                         	status.setRetry(true);
                             writeChunk(theChunkstatus);
                         } else {
                         	 = true;
                             status.setRollback(true);
                             // inc rollbackCount
                             .getMetric(..).incValue();
                         }
                     } else {
                         throw new BatchContainerRuntimeException(e);
                     }
             	}
 
             } catch (Throwable e) {
                 throw new BatchContainerRuntimeException(e);
             }
         }
         .exiting("writeChunk");
     }

    
Main Read-Process-Write loop

 
     private void invokeChunk() throws Exception {
         .entering("invokeChunk2");
 
         int itemCount = ChunkHelper.getItemCount();
         int timeInterval = ChunkHelper.getTimeLimit();
         List<ObjectchunkToWrite = new ArrayList<Object>();
         boolean checkPointed = true;
         boolean rollback = false;
 
         // begin new transaction at first iteration or after a checkpoint commit
 
         try {
 
             .begin();
             this.openReaderAndWriter();
             .commit();
 
             while (true) {
             	
                 if (checkPointed || rollback) {
                 	if (this..getCheckpointType() == "custom" ){
                 		int newtimeOut = this..checkpointTimeout();
                 		.setTransactionTimeout(newtimeOut);
                 	}
                     .begin();
                     for (ChunkListenerProxy chunkProxy : ) {
                         chunkProxy.beforeChunk();
                     }
                     
                     if (rollback) {
                         positionReaderAtCheckpoint();
                         positionWriterAtCheckpoint();
                          = new CheckpointManager(,
                                 getCheckpointAlgorithm(itemCounttimeInterval), .getExecutionId(), 
                                         .getJobInstance().getInstanceId(), .getId());
                     }
                 }
               
                 ItemStatus status = new ItemStatus();
                 
                 if (rollback) {
                     rollback = false;
                 }
 
                 chunkToWrite = readAndProcess(itemCountstatus);
 
                 if (status.isRollback()) {
                     itemCount = 1;
                     rollback = true;
                     .rollback();
                     continue;
                 }
                 
                 writeChunk(chunkToWritestatus);
                 
                 if (status.isRollback()) {
                     itemCount = 1;
                     rollback = true;
                     .rollback();
                     continue;
                 }
                 checkPointed = status.isCheckPointed();
 
                 // we could finish the chunk in 3 conditions: buffer is full,
                 // checkpoint, not more input
                 if (status.isCheckPointed() || status.isFinished()) {
                     // TODO: missing before checkpoint listeners
                     // 1.- check if spec list proper steps for before checkpoint
                     // 2.- ask Andy about retry
                     // 3.- when do we stop?
 
                     .checkpoint();
 
                     for (ChunkListenerProxy chunkProxy : ) {
                         chunkProxy.afterChunk();
                     }
                     
                     this..beginCheckpoint();
                     
                     .commit();
                     
                     this..endCheckpoint();
                     
                     if ( != null) {
 
                     	Serializable data = this..collectPartitionData();
 
                         if (this. != null) {
                             // Invoke the partition analayzer at the end of each step if
                             // the step runs
 
                             PartitionDataWrapper dataWrapper = new PartitionDataWrapper();
                             dataWrapper.setCollectorData(data);
                             dataWrapper.setEventType(.);
                             .add(dataWrapper);
                         }
 
                     }                                        
                     
                     // exit loop when last record is written
                     if (status.isFinished()) {
                         .begin();
                         
                         .close();
                         .close();
                         
                         .commit();
                         // increment commitCount
                         .getMetric(..).incValue();
                         break;
                     } else {
                         // increment commitCount
                         .getMetric(..).incValue();
                     }
 
                 }
 
             }
             
         } catch (Throwable e) {
         	for (ChunkListenerProxy chunkProxy : ) {
                 chunkProxy.onError();
             }
             .rollback();
             .log(."Failure in Read-Process-Write Loop, transaction is being rolled back."e);
             throw new BatchContainerRuntimeException(e);
         }
         .exiting("invokeChunk");
     }
 
     protected void invokeCoreStep() throws BatchContainerServiceException {
 
         this. = .getChunk();
 
         initializeChunkArtifacts();
 
         try {
             invokeChunk();
         } catch (Exception re) {
             throw new BatchContainerServiceException(re);
         } finally {
             if ( != null) {
 
             	Serializable data = this..collectPartitionData();
 
                 if (this. != null) {
                     // Invoke the partition analayzer at the end of each step if
                     // the step runs
 
                     PartitionDataWrapper dataWrapper = new PartitionDataWrapper();
                     dataWrapper.setCollectorData(data);
                     dataWrapper.setEventType(.);
                     .add(dataWrapper);
                 }
 
             }
 
             if (this. != null) {
                 PartitionDataWrapper dataWrapper = new PartitionDataWrapper();
                 dataWrapper.setBatchStatus(.getBatchStatus());
                 dataWrapper.setExitStatus(.getExitStatus());
                 dataWrapper.setEventType(.);
                 .add(dataWrapper);
             }
         }
 
         // TODO invoke analyzeExitStatus in analyzer if it exists
     }
 
     private CheckpointAlgorithm getCheckpointAlgorithm(int itemCountint timeInterval) {
         CheckpointAlgorithm alg = null;
 
         if (.getCheckpointType() == "item") {
             alg = new ItemCheckpointAlgorithm();
             ((ItemCheckpointAlgorithmalg).setThresholds(itemCounttimeInterval);
         } else { // custom chkpt alg
             alg = (CheckpointAlgorithm;
         }
 
         return alg;
     }
 
     /*
      * Initialize itemreader, itemwriter, and item processor checkpoint
      */
     private void initializeChunkArtifacts() {
         String sourceMethod = "initializeChunkArtifacts";
         if (.isLoggable(.))
             .entering(sourceMethod);
 
         int itemCount = ChunkHelper.getItemCount();
         int timeInterval = ChunkHelper.getTimeLimit();
         String checkpointPolicy = ChunkHelper.getCheckpointPolicy();
         
         ItemReader itemReader = .getReader();
         List<PropertyitemReaderProps = itemReader.getProperties() == null ? null : itemReader.getProperties().getPropertyList();
         try {
             InjectionReferences injectionRef = new InjectionReferences(.getJobContext(), 
                     itemReaderProps);
 
              = ProxyFactory.createItemReaderProxy(itemReader.getRef(), injectionRef);
 
             if (.isLoggable(.)) {
                 .fine("Created ItemReaderProxy for " + itemReader.getRef());
             }
         } catch (ArtifactValidationException e) {
             throw new BatchContainerServiceException("Cannot create the ItemReader [" + itemReader.getRef() + "]"e);
         }
 
         ItemProcessor itemProcessor = .getProcessor();
         if (itemProcessor != null){
         	List<PropertyitemProcessorProps = itemProcessor.getProperties() == null ? null : itemProcessor.getProperties().getPropertyList();
         	try {
 
         				itemProcessorProps);
 
         		 = ProxyFactory.createItemProcessorProxy(itemProcessor.getRef(), injectionRef);
         		if (.isLoggable(.)) {
         			.fine("Created ItemProcessorProxy for " + itemProcessor.getRef());
         		}
         	} catch (ArtifactValidationException e) {
         		throw new BatchContainerServiceException("Cannot create the ItemProcessor [" + itemProcessor.getRef() + "]"e);
         	}
         }
         
         ItemWriter itemWriter = .getWriter();
         List<PropertyitemWriterProps = itemWriter.getProperties() == null ? null : itemWriter.getProperties().getPropertyList();
         try {
             InjectionReferences injectionRef = new InjectionReferences(.getJobContext(), 
                     itemWriterProps);
 
              = ProxyFactory.createItemWriterProxy(itemWriter.getRef(), injectionRef);
             if (.isLoggable(.)) {
                 .fine("Created ItemWriterProxy for " + itemWriter.getRef());
             }
         } catch (ArtifactValidationException e) {
             throw new BatchContainerServiceException("Cannot create the ItemWriter [" + itemWriter.getRef() + "]"e);
         }
 
         try {
             List<PropertypropList = null;
 
             if (.getCheckpointAlgorithm() != null) {
 
                 propList = (.getCheckpointAlgorithm().getProperties() == null) ? null : .getCheckpointAlgorithm().getProperties()
                         .getPropertyList();
             }
 
             InjectionReferences injectionRef = new InjectionReferences(.getJobContext(), 
                     propList);
 
              = CheckpointAlgorithmFactory.getCheckpointAlgorithmProxy(injectionRef);
             if (.isLoggable(.)) {
                 .fine("Created CheckpointAlgorithmProxy for policy [" + checkpointPolicy + "]");
             }
         } catch (ArtifactValidationException e) {
             throw new BatchContainerServiceException("Cannot create the CheckpointAlgorithm for policy [" + .getCheckpointPolicy()
                     + "]"e);
         }
 
                 null);
 
 
         if (.getCheckpointType() == "item") {
              = new ItemCheckpointAlgorithm();
             ((ItemCheckpointAlgorithm).setThresholds(itemCounttimeInterval);
         } else { // custom chkpt alg
              = (CheckpointAlgorithm;
         }
 
         if (.isLoggable(.)) {
             .fine("Setting contexts for chunk artifacts");
         }
 
         if (.isLoggable(.))
             .fine("Initialize checkpoint manager with item-count=" + itemCount);
         .fine("Initialize checkpoint manager with time-interval=" + timeInterval);
 
                 .getJobInstance().getInstanceId(), .getId());
 
 
 
 
         if (.isLoggable(.))
             .exiting(sourceMethod);
     }
 
     private void openReaderAndWriter() {
         String sourceMethod = "openReaderAndWriter";
 
         if (.isLoggable(.))
             .entering(sourceMethod);
 
         try {
 
             // check for data in backing store
             if (data.size() >= 1) {
 
                  = (CheckpointDatadata.get(0);
                 byte[] readertoken = .getRestartToken();
                 ByteArrayInputStream readerChkptBA = new ByteArrayInputStream(readertoken);
                 TCCLObjectInputStream readerOIS = null;
                 try {
                     readerOIS = new TCCLObjectInputStream(readerChkptBA);
                     .open((ExternalizablereaderOIS.readObject());
                     readerOIS.close();
                 } catch (Exception ex) {
                     // is this what I should be throwing here?
                     throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + .getId() + "]"ex);
                 }
             } else {
                 // no chkpt data exists in the backing store
                  = null;
                 .open(null);
             }
         } catch (ClassCastException e) {
             throw new IllegalStateException("Expected CheckpointData but found" + data.get(0));
         }
 
 
         try {
             // check for data in backing store
             if (data.size() >= 1) {
                  = (CheckpointDatadata.get(0);
                 byte[] writertoken = .getRestartToken();
                 ByteArrayInputStream writerChkptBA = new ByteArrayInputStream(writertoken);
                 TCCLObjectInputStream writerOIS = null;
                 try {
                     writerOIS = new TCCLObjectInputStream(writerChkptBA);
                     .open((ExternalizablewriterOIS.readObject());
                     writerOIS.close();
                 } catch (Exception ex) {
                     // is this what I should be throwing here?
                     throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + .getId() + "]"ex);
                 }
             } else {
                 // no chkpt data exists in the backing store
                  = null;
                 .open(null);
             }
         } catch (ClassCastException e) {
             throw new IllegalStateException("Expected Checkpoint but found" + data.get(0));
         }
 
         // set up metrics
         // stepContext.addMetric(MetricImpl.Counter.valueOf("READ_COUNT"), 0);
         // stepContext.addMetric(MetricImpl.Counter.valueOf("WRITE_COUNT"), 0);
         // stepContext.addMetric(MetricImpl.Counter.valueOf("READ_SKIP_COUNT"),
         // 0);
         // stepContext.addMetric(MetricImpl.Counter.valueOf("PROCESS_SKIP_COUNT"),
         // 0);
         // stepContext.addMetric(MetricImpl.Counter.valueOf("WRITE_SKIP_COUNT"),
         // 0);
 
         if (.isLoggable(.))
             .exiting(sourceMethod);
     }
 
     @Override
     public void stop() {
 
         // we don't need to call stop on the chunk implementation here since a
         // chunk always returns control to
         // the batch container after every item.
 
     }
 
     boolean skipReadException(Exception e) {
 
         try {
             .handleExceptionRead(e);
         } catch (BatchContainerRuntimeException bcre) {
             return false;
         }
 
         return true;
 
     }
 
     boolean retryReadException(Exception e) {
 
         try {
             .handleExceptionRead(e);
         } catch (BatchContainerRuntimeException bcre) {
             return false;
         }
 
         return true;
 
     }
 
     boolean skipProcessException(Exception eObject record) {
 
         try {
             .handleExceptionWithRecordProcess(erecord);
         } catch (BatchContainerRuntimeException bcre) {
             return false;
         }
 
         return true;
 
     }
 
     boolean retryProcessException(Exception eObject record) {
 
         try {
             .handleExceptionProcess(erecord);
         } catch (BatchContainerRuntimeException bcre) {
             return false;
         }
 
         return true;
 
     }
 
     boolean skipWriteException(Exception eList<ObjectchunkToWrite) {
 
         Object writeObjs[] = chunkToWrite.toArray();
         for (int i = 0; i < writeObjs.lengthi++) {
             try {
                 .handleExceptionWithRecordListWrite(echunkToWrite);
             } catch (BatchContainerRuntimeException bcre) {
                 return false;
             }
         }
 
         return true;
 
     }