Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
   /*
    * Copyright 2012 International Business Machines Corp.
    * 
    * See the NOTICE file distributed with this work for additional information
    * regarding copyright ownership. Licensed under the Apache License, 
    * Version 2.0 (the "License"); you may not use this file except in compliance
    * with the License. You may obtain a copy of the License at
    * 
    *   http://www.apache.org/licenses/LICENSE-2.0
   * 
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  package com.ibm.jbatch.container.services.impl;
  
  import java.util.HashMap;
  import java.util.HashSet;
  import java.util.List;
  import java.util.Map;
  import java.util.Set;
  
  
  
  
  	private static final String CLASSNAME = JDBCPersistenceManagerImpl.class.getName();
  
  	private final static Logger logger = Logger.getLogger();
  
  	private IBatchConfig batchConfig = null;
  
  	protected DataSource dataSource = null;
  	protected String jndiName = null;
  
  	protected String driver = ""
  	protected String schema = "";
  	protected String url = ""
  	protected String userId = "";
  	protected String pwd = "";
  
  	/* (non-Javadoc)
  	 * @see com.ibm.jbatch.container.services.impl.AbstractPersistenceManagerImpl#init(com.ibm.jbatch.container.IBatchConfig)
  	 */
  	public void init(IBatchConfig batchConfigthrows BatchContainerServiceException {
  		.config("Entering CLASSNAME.init(), batchConfig =" + batchConfig);
  
  		this. = batchConfig;
  
 
 		if (!batchConfig.isJ2seMode()) {
 
 			.config("JNDI name = " + );
 
 			if ( == null || .equals("")) {
 				throw new BatchContainerServiceException("JNDI name is not defined.");
 			}
 
 			try {
 				Context ctx = new InitialContext();
 
 			} catch (NamingException e) {
 				.severe("Lookup failed for JNDI name: " +  + 
 						".  One cause of this could be that the batch runtime is incorrectly configured to EE mode when it should be in SE mode.");
 			}
 
 		} else {
 
 			.config("driver: " +  + ", url: " + );
 		}
 
 		try {
 			// only auto-create on Derby
 			if(isDerby()) {	
 				if(!isSchemaValid()) {
 				}
 			}
 		} catch (SQLException e) {
 		}
 
 		.config("Exiting CLASSNAME.init()");
 	}

Checks if the default schema JBATCH or the schema defined in batch-config exists.

Returns:
true if the schema exists, false otherwise.
Throws:
java.sql.SQLException
 
 	private boolean isSchemaValid() throws SQLException {
 		.entering("isSchemaValid");
 		DatabaseMetaData dbmd = conn.getMetaData();
 		ResultSet rs = dbmd.getSchemas();
 		while(rs.next()) {
 			if (.equalsIgnoreCase(rs.getString("TABLE_SCHEM")) ) {
 				cleanupConnection(connrsnull);
 				.exiting("isSchemaValid"true);
 				return true;
 			}
 		}
 		cleanupConnection(connrsnull);
 		.exiting("isSchemaValid"false);
 		return false;
 	}
 
 	private boolean isDerby() throws SQLException {
 		.entering("isDerby");
 		DatabaseMetaData dbmd = conn.getMetaData();
 		boolean derby = dbmd.getDatabaseProductName().toLowerCase().indexOf("derby") > 0;
 		.exiting("isDerby"derby);
 		return derby;
 	}

Creates the default schema JBATCH or the schema defined in batch-config.

 
 	private void createSchema() throws SQLException {
 		.entering("createSchema");
 
 		.log(. + " schema does not exists. Trying to create it.");
 		PreparedStatement ps = null;
 		ps = conn.prepareStatement("CREATE SCHEMA " + );
 		ps.execute();
 
 		cleanupConnection(connnullps);
 		.exiting("createSchema");
 	}

Checks if all the runtime batch table exists. If not, it creates them.

 
 	private void checkAllTables() throws SQLException {
 		.entering("checkAllTables");
 
 
 
 
 		.exiting("checkAllTables");
 	}

Creates tableName using the createTableStatement DDL.

Parameters:
tableName
createTableStatement
Throws:
java.sql.SQLException
 
 	private void createIfNotExists(String tableNameString createTableStatementthrows SQLException {
 		.entering("createIfNotExists"new Object[] {tableNamecreateTableStatement});
 
 		DatabaseMetaData dbmd = conn.getMetaData();
 		ResultSet rs = dbmd.getTables(nulltableNamenull);
 		PreparedStatement ps = null;
 		if(!rs.next()) {
 			.log(.tableName + " table does not exists. Trying to create it.");
 			ps = conn.prepareStatement(createTableStatement);
 		}
 
 		cleanupConnection(connrsps);
 		.exiting("createIfNotExists");
 	}

Executes the provided SQL statement

Parameters:
statement
Throws:
java.sql.SQLException
 
 	private void executeStatement(String statementthrows SQLException {
 		.entering("executeStatement"statement);
 
 		PreparedStatement ps = null;
 
 		ps = conn.prepareStatement(statement);
 
 		cleanupConnection(connps);
 		.exiting("executeStatement");
 	}
 
 
 	/* (non-Javadoc)
 	 * @see com.ibm.jbatch.container.services.impl.AbstractPersistenceManagerImpl#createCheckpointData(com.ibm.ws.batch.container.checkpoint.CheckpointDataKey, com.ibm.ws.batch.container.checkpoint.CheckpointData)
 	 */
 	public void createCheckpointData(CheckpointDataKey keyCheckpointData value) {
 		.entering("createCheckpointData"new Object[] {keyvalue});
 		.exiting("createCheckpointData");
 	}
 
 	/* (non-Javadoc)
 	 * @see com.ibm.jbatch.container.services.impl.AbstractPersistenceManagerImpl#getCheckpointData(com.ibm.ws.batch.container.checkpoint.CheckpointDataKey)
 	 */
 		.entering("getCheckpointData"key==null ? "<null>" : key);
 		.exiting("getCheckpointData"checkpointData==null ? "<null>" : checkpointData);
 		return checkpointData;
 	}
 
 	/* (non-Javadoc)
 	 * @see com.ibm.jbatch.container.services.impl.AbstractPersistenceManagerImpl#updateCheckpointData(com.ibm.ws.batch.container.checkpoint.CheckpointDataKey, com.ibm.ws.batch.container.checkpoint.CheckpointData)
 	 */
 	public void updateCheckpointData(CheckpointDataKey keyCheckpointData value) {
 		.entering("updateCheckpointData"new Object[] {keyvalue});
 		if(data != null) {
 		} else {
 			createCheckpointData(keyvalue);
 		}
 		.exiting("updateCheckpointData");
 	}


Returns:
the database connection and sets it to the default schema JBATCH or the schema defined in batch-config.
Throws:
java.sql.SQLException
 
 	protected Connection getConnection() throws SQLException {
 		.finest("Entering: " +  + ".getConnection");
 		Connection connection = null;
 
 			.finest("J2EE mode, getting connection from data source");
 			connection = .getConnection();
 			.finest("autocommit="+connection.getAutoCommit());
 		} else {
 			try {
 				Class.forName();
 			} catch (ClassNotFoundException e) {
 				throw new PersistenceException(e);
 			}
 			.finest("JSE mode, getting connection from " + );
 			connection = DriverManager.getConnection();
 			.finest("autocommit="+connection.getAutoCommit());
 		}
 		setSchemaOnConnection(connection);
 
 		.finest("Exiting: " +  + ".getConnection() with conn =" + connection);
 		return connection;
 	}

Returns:
the database connection. The schema is set to whatever default its used by the underlying database.
Throws:
java.sql.SQLException
 
 		.finest("Entering getConnectionToDefaultSchema");
 		Connection connection = null;
 
 			.finest("J2EE mode, getting connection from data source");
 			try {
 				connection = .getConnection();
 			} catch(SQLException e) {
 				logException("FAILED GETTING DATABASE CONNECTION"e);
 				throw new PersistenceException(e);
 			}
 			.finest("autocommit="+connection.getAutoCommit());
 		} else {
 			try {
 				Class.forName();
 			} catch (ClassNotFoundException e) {
 				logException("ClassNotFoundException: Cannot load driver class: " + e);
 				throw new PersistenceException(e);
 			}
 			.finest("JSE mode, getting connection from " + );
 			try {
 				connection = DriverManager.getConnection();
 			} catch (SQLException e) {
 				logException("FAILED GETTING DATABASE CONNECTION.  FOR EMBEDDED DERBY CHECK FOR OTHER USER LOCKING THE CURRENT DATABASE (Try using a different database instance)."e); 
 				throw new PersistenceException(e);
 			}
 			.finest("autocommit="+connection.getAutoCommit());
 		}
 		.finest("Exiting from getConnectionToDefaultSchema, conn= " +connection);
 		return connection;
 	}
 
 	private void logException(String msgException e) {
 		StringWriter sw = new StringWriter();
 		PrintWriter pw = new PrintWriter(sw);
 
 		.log(.msg +  "; Exception stack trace: " + sw);
 	}

Set the default schema JBATCH or the schema defined in batch-config on the connection object.

Parameters:
connection
Throws:
java.sql.SQLException
 
 	private void setSchemaOnConnection(Connection connectionthrows SQLException {
 		.finest("Entering " +  +".setSchemaOnConnection()");
 
 		PreparedStatement ps = null;
 		ps = connection.prepareStatement("SET SCHEMA ?");
 		ps.setString(1, );
 		ps.executeUpdate(); 
 
 		ps.close();
 
 		.finest("Exiting " +  +".setSchemaOnConnection()");
 	}

select data from DB table

Parameters:
key - the IPersistenceDataKey object
Returns:
List of serializable objects store in the DB table Ex. select id, obj from tablename where id = ?
 
 		.entering("queryCheckpointData"new Object[] {key});
 		Connection conn = null;
 		PreparedStatement statement = null;
 		ResultSet rs = null;
 		ObjectInputStream objectIn = null;
 		CheckpointData data = null;
 		try {
 			conn = getConnection();
 			statement.setObject(1, key);
 			rs = statement.executeQuery();
 			if (rs.next()) {
 				byte[] buf = rs.getBytes("obj");
 			}
 		} catch (SQLException e) {
 			throw new PersistenceException(e);
 		} catch (IOException e) {
 			throw new PersistenceException(e);
 		} catch (ClassNotFoundException e) {
 			throw new PersistenceException(e);
 		} finally {
 			if (objectIn != null) {
 				try {
 					objectIn.close();
 				} catch (IOException e) {
 					throw new PersistenceException(e);
 				}
 			}
 			cleanupConnection(connrsstatement);
 		}
 		.exiting("queryCheckpointData");
 		return data;	
 	}


insert data to DB table

Parameters:
key - the IPersistenceDataKey object
value - serializable object to store Ex. insert into tablename values(?, ?)
 
 	private <T> void insertCheckpointData(Object key, T value) {
 		.entering("insertCheckpointData"new Object[] {keyvalue});
 		Connection conn = null;
 		PreparedStatement statement = null;
 		ByteArrayOutputStream baos = null;
 		ObjectOutputStream oout = null;
 		byte[] b;
 		try {
 			conn = getConnection();
 			baos = new ByteArrayOutputStream();
 			oout = new ObjectOutputStream(baos);
 			oout.writeObject(value);
 
 			b = baos.toByteArray();
 
 			statement.setObject(1, key);
 			statement.setBytes(2, b);
 			statement.executeUpdate();
 		} catch (SQLException e) {
 			throw new PersistenceException(e);
 		} catch (IOException e) {
 			throw new PersistenceException(e);
 		} finally {
 			if (baos != null) {
 				try {
 					baos.close();
 				} catch (IOException e) {
 					throw new PersistenceException(e);
 				}
 			}
 			if (oout != null) {
 				try {
 					oout.close();
 				} catch (IOException e) {
 					throw new PersistenceException(e);
 				}
 			}
 			cleanupConnection(connnullstatement);
 		}
 		.exiting("insertCheckpointData");
 	}

update data in DB table

Parameters:
value - serializable object to store
key - the IPersistenceDataKey object
query - SQL statement to execute. Ex. update tablename set obj = ? where id = ?
 
 	private void updateCheckpointData(Object keyCheckpointData value) {
 		.entering("updateCheckpointData"new Object[] {keyvalue});
 		Connection conn = null;
 		PreparedStatement statement = null;
 		ByteArrayOutputStream baos = null;
 		ObjectOutputStream oout = null;
 		byte[] b;
 		try {
 			conn = getConnection();
 			baos = new ByteArrayOutputStream();
 			oout = new ObjectOutputStream(baos);
 			oout.writeObject(value);
 
 			b = baos.toByteArray();
 
 			statement.setBytes(1, b);
 			statement.setObject(2, key);
 			statement.executeUpdate();
 		} catch (SQLException e) {
 			throw new PersistenceException(e);
 		} catch (IOException e) {
 			throw new PersistenceException(e);
 		} finally {
 			if (baos != null) {
 				try {
 					baos.close();
 				} catch (IOException e) {
 					throw new PersistenceException(e);
 				}
 			}
 			if (oout != null) {
 				try {
 					oout.close();
 				} catch (IOException e) {
 					throw new PersistenceException(e);
 				}
 			}
 			cleanupConnection(connnullstatement);
 		}
 		.exiting("updateCheckpointData");
 	}



closes connection, result set and statement

Parameters:
conn - connection object to close
rs - result set object to close
statement - statement object to close
 
 	private void cleanupConnection(Connection connResultSet rsPreparedStatement statement) {
 
 		.logp(."cleanupConnection""Entering",  new Object[] {connrs==null ? "<null>" : rsstatement==null ? "<null>" : statement});
 
 		if (statement != null) {
 			try {
 				statement.close();
 			} catch (SQLException e) {
 				throw new PersistenceException(e);
 			}
 		}
 
 		if (rs != null) {
 			try {
 				rs.close();
 			} catch (SQLException e) {
 				throw new PersistenceException(e);
 			}
 		}
 
 		if (conn != null) {
 			try {
 				conn.close();
 			} catch (SQLException e) {
 				throw new PersistenceException(e);
 			} finally {
 				try {
 					conn.close();
 				} catch (SQLException e) {
 					throw new PersistenceException(e);
 				}
 			}
 		}
 		.logp(."cleanupConnection""Exiting");
 	}

closes connection and statement

Parameters:
conn - connection object to close
statement - statement object to close
 
 	private void cleanupConnection(Connection connPreparedStatement statement) {
 
 		.logp(."cleanupConnection""Entering",  new Object[] {connstatement});
 
 		if (statement != null) {
 			try {
 				statement.close();
 			} catch (SQLException e) {
 				throw new PersistenceException(e);
 			}
 		}
 
 		if (conn != null) {
 			try {
 				conn.close();
 			} catch (SQLException e) {
 				throw new PersistenceException(e);
 			} finally {
 				try {
 					conn.close();
 				} catch (SQLException e) {
 					throw new PersistenceException(e);
 				}
 			}
 		}
 		.logp(."cleanupConnection""Exiting");
 	}
 
 
 	public int jobOperatorGetJobInstanceCount(String jobNameString appTag) {
 		
 		Connection conn = null;
 		PreparedStatement statement = null;
 		ResultSet rs = null;
 		int count;
 
 		try {
 			conn = getConnection();
 			statement = conn.prepareStatement("select count(jobinstanceid) as jobinstancecount from jobinstancedata where name = ? and apptag = ?");
 			statement.setString(1, jobName);
 			statement.setString(2, appTag);
 			rs = statement.executeQuery();
 			rs.next();
 			count = rs.getInt("jobinstancecount");
 
 		} catch (SQLException e) {
 			throw new PersistenceException(e);
 		}
 		finally {
 			cleanupConnection(connrsstatement);
 		}
 		return count;
 	}
 	
 	public int jobOperatorGetJobInstanceCount(String jobName) {
 
 		Connection conn = null;
 		PreparedStatement statement = null;
 		ResultSet rs = null;
 		int count;
 
 		try {
 			conn = getConnection();
 			statement.setString(1, jobName);
 			rs = statement.executeQuery();
 			rs.next();
 			count = rs.getInt("jobinstancecount");
 
 		} catch (SQLException e) {
 			throw new PersistenceException(e);
 		}
 		finally {
 			cleanupConnection(connrsstatement);
 		}
 		return count;
 	}
 
 
 	public List<LongjobOperatorGetJobInstanceIds(String jobNameString appTagint startint count) {
 		Connection conn = null;
 		PreparedStatement statement = null;
 		ResultSet rs = null;
 		List<Longdata = new ArrayList<Long>();
 
 		try {
 			conn = getConnection();
 			statement = conn.prepareStatement("select jobinstanceid from jobinstancedata where name = ? and apptag = ? order by jobinstanceid desc");
 			statement.setObject(1, jobName);
 			statement.setObject(2, appTag);
 			rs = statement.executeQuery();
 			while (rs.next()) {
 				long id = rs.getLong("jobinstanceid");
 				data.add(id);
 			}
 		} catch (SQLException e) {
 			throw new PersistenceException(e);
 		}
 		finally {
 			cleanupConnection(connrsstatement);
 		}
 
 		if (data.size() > 0){
 			try {
 				return data.subList(startstart+count);
 			}
 			catch (IndexOutOfBoundsException oobEx){
 				return data.subList(startdata.size());
 			}
 		}
 		else return data;
 	}
 	
 	public List<LongjobOperatorGetJobInstanceIds(String jobNameint startint count) {
 
 		Connection conn = null;
 		PreparedStatement statement = null;
 		ResultSet rs = null;
 		List<Longdata = new ArrayList<Long>();
 
 		try {
 			conn = getConnection();
 			statement.setObject(1, jobName);
 			rs = statement.executeQuery();
 			while (rs.next()) {
 				long id = rs.getLong("jobinstanceid");
 				data.add(id);
 			}
 		} catch (SQLException e) {
 			throw new PersistenceException(e);
 		}
 		finally {
 			cleanupConnection(connrsstatement);
 		}
 
 		if (data.size() > 0){
 			try {
 				return data.subList(startstart+count);
 			}
 			catch (IndexOutOfBoundsException oobEx){
 				return data.subList(startdata.size());
 			}
 		}
 		else return data;
 	}
 
 
 		Connection conn = null;
 		PreparedStatement statement = null;
 		ResultSet rs = null;
 		HashMap<LongStringdata = new HashMap<Long,String>();
 
 		try {
 			conn = getConnection();
 
 			// Filter out 'subjob' parallel execution entries which start with the special character
 			final String filter = "not like '" + . + "%'";
 
 			statement = conn.prepareStatement("select distinct jobinstanceid, name from jobinstancedata where name " + filter );
 			rs = statement.executeQuery();
 			while (rs.next()) {
 				long id = rs.getLong("jobinstanceid");
 				String name = rs.getString("name");
 				data.put(idname);
 			}
 		} catch (SQLException e) {
 			throw new PersistenceException(e);
 		}
 		finally {
 			cleanupConnection(connrsstatement);
 		}
 
 		return data;
 	}
 
 	public Timestamp jobOperatorQueryJobExecutionTimestamp(long keyTimestampType timestampType) {
 
 
 
 		Connection conn = null;
 		PreparedStatement statement = null;
 		ResultSet rs = null;
 		Timestamp createTimestamp = null;
 		Timestamp endTimestamp = null;
 		Timestamp updateTimestamp = null;
 		Timestamp startTimestamp = null;
 
 		try {
 			conn = getConnection();
 			statement = conn.prepareStatement("select createtime, endtime, updatetime, starttime from executioninstancedata where jobexecid = ?");
 			statement.setObject(1, key);
 			rs = statement.executeQuery();
 			while (rs.next()) {
 				createTimestamp = rs.getTimestamp(1);
 				endTimestamp = rs.getTimestamp(2);
 				updateTimestamp = rs.getTimestamp(3);
 				startTimestamp = rs.getTimestamp(4);
 			}
 		} catch (SQLException e) {
 			throw new PersistenceException(e);
 		}
 		finally {
 			cleanupConnection(connrsstatement);
 		}
 
 		if (timestampType.equals(.)) {
 			return createTimestamp;
 		} else if (timestampType.equals(.)) {
 			return endTimestamp;
 		} else if (timestampType.equals(.)) {
 			return updateTimestamp;
 		} else if (timestampType.equals(.)) {
 			return startTimestamp;
 		} else {
 			throw new IllegalArgumentException("Unexpected enum value.");
 		}
 	}
 
 
 		Connection conn = null;
 		PreparedStatement statement = null;
 		ResultSet rs = null;
 		String status = null;
 
 		try {
 			conn = getConnection();
 			statement = conn.prepareStatement("select batchstatus from executioninstancedata where jobexecid = ?");
 			statement.setLong(1, key);
 			rs = statement.executeQuery();
 			while (rs.next()) {
 				status = rs.getString(1);
 			}
 		} catch (SQLException e) {
 			throw new PersistenceException(e);
 		} finally {
 			cleanupConnection(connrsstatement);
 		}
 
 		return status;
 	}
 
 
 
 		Connection conn = null;
 		PreparedStatement statement = null;
 		ResultSet rs = null;
 		String status = null;
 
 		try {
 			conn = getConnection();
 			statement = conn.prepareStatement("select exitstatus from executioninstancedata where jobexecid = ?");
 			statement.setLong(1, key);
 			rs = statement.executeQuery();
 			while (rs.next()) {
 				status = rs.getString(1);
 			}
 		} catch (SQLException e) {
 			throw new PersistenceException(e);
 		} finally {
 			cleanupConnection(connrsstatement);
 		}
 
 		return status;
 	}
 
 	public long jobOperatorQueryJobExecutionJobInstanceId(long executionIDthrows NoSuchJobExecutionException {
 
 		Connection conn = null;
 		PreparedStatement statement = null;
 		ResultSet rs = null;
 		long jobinstanceID = 0;
 
 		try {
 			conn = getConnection();
 			statement = conn.prepareStatement("select jobinstanceid from executioninstancedata where jobexecid = ?");
 			statement.setLong(1, executionID);
 			rs = statement.executeQuery();
 			if (rs.next()) {
 				jobinstanceID = rs.getLong("jobinstanceid");
 			} else {
 				String msg = "Did not find job instance associated with executionID =" + executionID;
 				.fine(msg);
 				throw new NoSuchJobExecutionException(msg);
 			}
 		} catch (SQLException e) {
 			throw new PersistenceException(e);
 		}
 		finally {
 			cleanupConnection(connrsstatement);
 		}
 
 		return jobinstanceID;
 	}
 
 	public Properties getParameters(long executionIdthrows NoSuchJobExecutionException {
 		Connection conn = null;
 		PreparedStatement statement = null;
 		ResultSet rs = null;
 		Properties props = null;
 		ObjectInputStream objectIn = null;
 
 		try {
 			conn = getConnection();
 			statement = conn.prepareStatement("select parameters from executioninstancedata where jobexecid = ?"); 
 			statement.setLong(1, executionId);
 			rs = statement.executeQuery();
 			
 			if (rs.next()) {
 				// get the object based data
 				byte[] buf = rs.getBytes("parameters");
 				props = (Properties)deserializeObject(buf);
 			} else {
 				String msg = "Did not find table entry for executionID =" + executionId;
 				.fine(msg);
 				throw new NoSuchJobExecutionException(msg);
 			}
 
 		} catch (SQLException e) {
 			throw new PersistenceException(e);
 		} catch (IOException e) {
 			throw new PersistenceException(e);
 		} catch (ClassNotFoundException e) {
 			throw new PersistenceException(e);
 		} finally {
 			if (objectIn != null) {
 				try {
 					objectIn.close();
 				} catch (IOException e) {
 					throw new PersistenceException(e);
 				}
 			}
 			cleanupConnection(connrsstatement);
 		}
 
 		return props;
 
 	}
 
 
 
 
 		Connection conn = null;
 		PreparedStatement statement = null;
 		ResultSet rs = null;
 
 		long jobexecid = 0;
 		long stepexecid = 0;
 		String stepname = null;
 		String batchstatus = null;
 		String exitstatus = null;
 		Exception ex = null;
 		long readCount =0;
 		long writeCount = 0;
 		long commitCount = 0;
 		long rollbackCount = 0;
 		long readSkipCount = 0;
 		long processSkipCount = 0;
 		long filterCount = 0;
 		long writeSkipCount = 0;
 		Timestamp startTS = null;
 		Timestamp endTS = null;
 		StepExecutionImpl stepEx = null;
 		ObjectInputStream objectIn = null;
 
 		try {
 			conn = getConnection();
 			statement = conn.prepareStatement("select A.* from stepexecutioninstancedata as A inner join executioninstancedata as B on A.jobexecid = B.jobexecid where B.jobinstanceid = ? order by A.stepexecid desc"); 
 			statement.setLong(1, instanceId);
 			rs = statement.executeQuery();
 			while (rs.next()) {
 				stepname = rs.getString("stepname");
 				if (data.containsKey(stepname)) {
 					continue;
 				} else {
 
 					jobexecid = rs.getLong("jobexecid");
 					batchstatus = rs.getString("batchstatus");
 					exitstatus = rs.getString("exitstatus");
 					readCount = rs.getLong("readcount");
 					writeCount = rs.getLong("writecount");
 					commitCount = rs.getLong("commitcount");
 					rollbackCount = rs.getLong("rollbackcount");
 					readSkipCount = rs.getLong("readskipcount");
 					processSkipCount = rs.getLong("processskipcount");
 					filterCount = rs.getLong("filtercount");
 					writeSkipCount = rs.getLong("writeSkipCount");
 					startTS = rs.getTimestamp("startTime");
 					endTS = rs.getTimestamp("endTime");
 					// get the object based data
 					Serializable persistentData = null;
 					byte[] pDataBytes = rs.getBytes("persistentData");
 					if (pDataBytes != null) {
						objectIn = new TCCLObjectInputStream(new ByteArrayInputStream(pDataBytes));
						persistentData = (Serializable)objectIn.readObject();
					stepEx = new StepExecutionImpl(jobexecidstepexecid);
					stepEx.setBatchStatus(BatchStatus.valueOf(batchstatus));
					stepEx.setExitStatus(exitstatus);
					stepEx.setStepName(stepname);
					stepEx.setReadCount(readCount);
					stepEx.setWriteCount(writeCount);
					stepEx.setCommitCount(commitCount);
					stepEx.setRollbackCount(rollbackCount);
					stepEx.setReadSkipCount(readSkipCount);
					stepEx.setProcessSkipCount(processSkipCount);
					stepEx.setFilterCount(filterCount);
					stepEx.setWriteSkipCount(writeSkipCount);
					stepEx.setStartTime(startTS);
					stepEx.setEndTime(endTS);	
					stepEx.setPersistentUserData(persistentData);
					data.put(stepnamestepEx);
catch (SQLException e) {
			throw new PersistenceException(e);
catch (IOException e) {
			throw new PersistenceException(e);
catch (ClassNotFoundException e) {
			throw new PersistenceException(e);
finally {
			cleanupConnection(connrsstatement);
		return data;
		Connection conn = null;
		PreparedStatement statement = null;
		ResultSet rs = null;
		long jobexecid = 0;
		long stepexecid = 0;
		String stepname = null;
		String batchstatus = null;
		String exitstatus = null;
		Exception ex = null;
		long readCount =0;
		long writeCount = 0;
		long commitCount = 0;
		long rollbackCount = 0;
		long readSkipCount = 0;
		long processSkipCount = 0;
		long filterCount = 0;
		long writeSkipCount = 0;
		Timestamp startTS = null;
		Timestamp endTS = null;
		StepExecutionImpl stepEx = null;
		ObjectInputStream objectIn = null;
		try {
			conn = getConnection();
			statement = conn.prepareStatement("select * from stepexecutioninstancedata where jobexecid = ?"); 
			statement.setLong(1, execid);
			rs = statement.executeQuery();
			while (rs.next()) {
				jobexecid = rs.getLong("jobexecid");
				stepexecid = rs.getLong("stepexecid");
				stepname = rs.getString("stepname");
				batchstatus = rs.getString("batchstatus");
				exitstatus = rs.getString("exitstatus");
				readCount = rs.getLong("readcount");
				writeCount = rs.getLong("writecount");
				commitCount = rs.getLong("commitcount");
				rollbackCount = rs.getLong("rollbackcount");
				readSkipCount = rs.getLong("readskipcount");
				processSkipCount = rs.getLong("processskipcount");
				filterCount = rs.getLong("filtercount");
				writeSkipCount = rs.getLong("writeSkipCount");
				startTS = rs.getTimestamp("startTime");
				endTS = rs.getTimestamp("endTime");
				// get the object based data
				Serializable persistentData = null;
				byte[] pDataBytes = rs.getBytes("persistentData");
				if (pDataBytes != null) {
					objectIn = new TCCLObjectInputStream(new ByteArrayInputStream(pDataBytes));
					persistentData = (Serializable)objectIn.readObject();
				stepEx = new StepExecutionImpl(jobexecidstepexecid);
				stepEx.setBatchStatus(BatchStatus.valueOf(batchstatus));
				stepEx.setExitStatus(exitstatus);
				stepEx.setStepName(stepname);
				stepEx.setReadCount(readCount);
				stepEx.setWriteCount(writeCount);
				stepEx.setCommitCount(commitCount);
				stepEx.setRollbackCount(rollbackCount);
				stepEx.setReadSkipCount(readSkipCount);
				stepEx.setProcessSkipCount(processSkipCount);
				stepEx.setFilterCount(filterCount);
				stepEx.setWriteSkipCount(writeSkipCount);
				stepEx.setStartTime(startTS);
				stepEx.setEndTime(endTS);	
				stepEx.setPersistentUserData(persistentData);
				data.add(stepEx);
catch (SQLException e) {
			throw new PersistenceException(e);
catch (IOException e) {
			throw new PersistenceException(e);
catch (ClassNotFoundException e) {
			throw new PersistenceException(e);
finally {
			cleanupConnection(connrsstatement);
		return data;
    @Override
    public StepExecution getStepExecutionByStepExecutionId(long stepExecId) {
        Connection conn = null;
        PreparedStatement statement = null;
        ResultSet rs = null;
        long