Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2006-2009 the original author or authors.
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *      http://www.apache.org/licenses/LICENSE-2.0
   *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
 package org.springframework.batch.item.database;
 
 
 

Abstract base class for any simple item reader that opens a database cursor and continually retrieves the next row in the ResultSet.

By default the cursor will be opened using a separate connection. The ResultSet for the cursor is held open regardless of commits or roll backs in a surrounding transaction. Clients of this reader are responsible for buffering the items in the case that they need to be re-presented on a rollback. This buffering is handled by the step implementations provided and is only a concern for anyone writing their own step implementations.

There is an option (setUseSharedExtendedConnection(boolean) that will share the connection used for the cursor with the rest of the step processing. If you set this flag to true then you must wrap the DataSource in a ExtendedConnectionDataSourceProxy to prevent the connection from being closed and released after each commit performed as part of the step processing. You must also use a JDBC driver supporting JDBC 3.0 or later since the cursor will be opened with the additional option of 'HOLD_CUSORS_OVER_COMMIT' enabled.

Each call to org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read() will attempt to map the row at the current position in the ResultSet. There is currently no wrapping of the ResultSet to suppress calls to next(). However, if the RowMapper (mistakenly) increments the current row, the next call to read will verify that the current row is at the expected position and throw a DataAccessException if it is not. The reason for such strictness on the ResultSet is due to the need to maintain control for transactions and restartability. This ensures that each call to org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read() returns the ResultSet at the correct row, regardless of rollbacks or restarts.

org.springframework.batch.item.ExecutionContext: The current row is returned as restart data, and when restored from that same data, the cursor is opened and the current row set to the value within the restart data. See setDriverSupportsAbsolute(boolean) for improving restart performance.

Calling close on this org.springframework.batch.item.ItemStream will cause all resources it is currently using to be freed. (Connection, ResultSet, etc). It is then illegal to call org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read() again until it has been re-opened.

Known limitation: when used with Derby setVerifyCursorPosition(boolean) needs to be false because java.sql.ResultSet.getRow() call used for cursor position verification is not available for 'TYPE_FORWARD_ONLY' result sets.

Author(s):
Lucas Ward
Peter Zozom
Robert Kasanicky
Thomas Risberg
		implements InitializingBean {

Logger available to subclasses
	protected final Log log = LogFactory.getLog(getClass());
	public static final int VALUE_NOT_SET = -1;
	private Connection con;
	protected ResultSet rs;
	private int fetchSize = ;
	private int maxRows = ;
	private int queryTimeout = ;
	private boolean ignoreWarnings = true;
	private boolean verifyCursorPosition = true;
	private boolean initialized = false;
	private boolean driverSupportsAbsolute = false;
	private boolean useSharedExtendedConnection = false;
		super();
	}

Assert that mandatory properties are set.

Throws:
java.lang.IllegalArgumentException if either data source or sql properties not set.
	public void afterPropertiesSet() throws Exception {
		Assert.notNull("DataSource must be provided");
	}

Public setter for the data source for injection purposes.

Parameters:
dataSource
	public void setDataSource(DataSource dataSource) {
		this. = dataSource;
	}

Public getter for the data source.

Returns:
the dataSource
		return this.;
	}

Prepare the given JDBC Statement (or PreparedStatement or CallableStatement), applying statement settings such as fetch size, max rows, and query timeout.

Parameters:
stmt the JDBC Statement to prepare
Throws:
java.sql.SQLException
See also:
setFetchSize(int)
setMaxRows(int)
setQueryTimeout(int)
	protected void applyStatementSettings(PreparedStatement stmtthrows SQLException {
		}
		if ( != ) {
		}
		}
	}

Return the exception translator for this instance. Creates a default SQLErrorCodeSQLExceptionTranslator for the specified DataSource if none is set.
		synchronized(this) {
			if ( == null) {
				if ( != null) {
				}
				else {
				}
			}
		}
	}

Throw a SQLWarningException if we're not ignoring warnings, else log the warnings (at debug level).

Parameters:
statement the current statement to obtain the warnings from, if there are any.
Throws:
java.sql.SQLException
See also:
org.springframework.jdbc.SQLWarningException
	protected void handleWarnings(Statement statementthrows SQLWarningException,
						SQLWarning warningToLog = statement.getWarnings();
						while (warningToLog != null) {
							.debug("SQLWarning ignored: SQL state '" + warningToLog.getSQLState() + "', error code '"
warningToLog.getErrorCode() + "', message [" + warningToLog.getMessage() + "]");
							warningToLog = warningToLog.getNextWarning();
						}
					}
				}
				else {
					SQLWarning warnings = statement.getWarnings();
					if (warnings != null) {
						throw new SQLWarningException("Warning not ignored"warnings);
					}
				}
			}

Moves the cursor in the ResultSet to the position specified by the row parameter by traversing the ResultSet.

Parameters:
row
	private void moveCursorToRow(int row) {
		try {
			int count = 0;
			while (row != count && .next()) {
				count++;
			}
		}
		catch (SQLException se) {
			throw getExceptionTranslator().translate("Attempted to move ResultSet to last committed row"getSql(), se);
		}
	}

Gives the JDBC driver a hint as to the number of rows that should be fetched from the database when more rows are needed for this ResultSet object. If the fetch size specified is zero, the JDBC driver ignores the value.

Parameters:
fetchSize the number of rows to fetch
See also:
java.sql.ResultSet.setFetchSize(int)
	public void setFetchSize(int fetchSize) {
		this. = fetchSize;
	}

Sets the limit for the maximum number of rows that any ResultSet object can contain to the given number.

Parameters:
maxRows the new max rows limit; zero means there is no limit
See also:
java.sql.Statement.setMaxRows(int)
	public void setMaxRows(int maxRows) {
		this. = maxRows;
	}

Sets the number of seconds the driver will wait for a Statement object to execute to the given number of seconds. If the limit is exceeded, an SQLException is thrown.

Parameters:
queryTimeout seconds the new query timeout limit in seconds; zero means there is no limit
See also:
java.sql.Statement.setQueryTimeout(int)
	public void setQueryTimeout(int queryTimeout) {
		this. = queryTimeout;
	}

Set whether SQLWarnings should be ignored (only logged) or exception should be thrown.

Parameters:
ignoreWarnings if TRUE, warnings are ignored
	public void setIgnoreWarnings(boolean ignoreWarnings) {
		this. = ignoreWarnings;
	}

Allow verification of cursor position after current row is processed by RowMapper or RowCallbackHandler. Default value is TRUE.

Parameters:
verifyCursorPosition if true, cursor position is verified
	public void setVerifyCursorPosition(boolean verifyCursorPosition) {
		this. = verifyCursorPosition;
	}

Indicate whether the JDBC driver supports setting the absolute row on a java.sql.ResultSet. It is recommended that this is set to true for JDBC drivers that supports ResultSet.absolute() as it may improve performance, especially if a step fails while working with a large data set.

Parameters:
driverSupportsAbsolute false by default
See also:
java.sql.ResultSet.absolute(int)
	public void setDriverSupportsAbsolute(boolean driverSupportsAbsolute) {
		this. = driverSupportsAbsolute;
	}

Indicate whether the connection used for the cursor should be used by all other processing thus sharing the same transaction. If this is set to false, which is the default, then the cursor will be opened using in its connection and will not participate in any transactions started for the rest of the step processing. If you set this flag to true then you must wrap the DataSource in a ExtendedConnectionDataSourceProxy to prevent the connection from being closed and released after each commit. When you set this option to true then the statement used to open the cursor will be created with both 'READ_ONLY' and 'HOLD_CUSORS_OVER_COMMIT' options. This allows holding the cursor open over transaction start and commits performed in the step processing. To use this feature you need a database that supports this and a JDBC driver supporting JDBC 3.0 or later.

Parameters:
useSharedExtendedConnection false by default
	public void setUseSharedExtendedConnection(boolean useSharedExtendedConnection) {
		this. = useSharedExtendedConnection;
	}
	public boolean isUseSharedExtendedConnection() {
	}
	public abstract String getSql();

Check the result set is in synch with the currentRow attribute. This is important to ensure that the user hasn't modified the current row.
	private void verifyCursorPosition(long expectedCurrentRowthrows SQLException {
			if (expectedCurrentRow != this..getRow()) {
				throw new InvalidDataAccessResourceUsageException("Unexpected cursor position change.");
			}
		}
	}

Close the cursor and database connection. Make call to cleanupOnClose so sub classes can cleanup any resources they have allocated.
	protected void doClose() throws Exception {
		 = false;
		JdbcUtils.closeResultSet(this.);
		 = null;
			if (!TransactionSynchronizationManager.isActualTransactionActive()) {
				DataSourceUtils.releaseConnection();
			}
		}
		else {
			JdbcUtils.closeConnection(this.);
		}
	}
	protected abstract void cleanupOnClose()  throws Exception;

Execute the statement to open the cursor.
	protected final void doOpen() throws Exception {
		Assert.state(!"Stream is already initialized.  Close before re-opening.");
		Assert.isNull("ResultSet still open!  Close before re-opening.");
		 = true;
	}
	protected void initializeConnection() {
		Assert.state(getDataSource() != null"DataSource must not be null.");
		try {
							"You must use a ExtendedConnectionDataSourceProxy for the dataSource when " +
							"useSharedExtendedConnection is set to true.");
				}
				this. = DataSourceUtils.getConnection();
			}
			else {
			}
		}
		catch (SQLException se) {
			throw getExceptionTranslator().translate("Executing query"getSql(), se);
		}
	}
	protected abstract void openCursor(Connection con);

Read next row and map it to item, verify cursor position if setVerifyCursorPosition(boolean) is true.
	protected T doRead() throws Exception {
		if ( == null) {
			throw new ReaderNotOpenException("Reader must be open before it can be read.");
		}
		try {
			if (!.next()) {
				return null;
			}
			int currentRow = getCurrentItemCount();
item = readCursor(currentRow);
			return item;
		}
		catch (SQLException se) {
			throw getExceptionTranslator().translate("Attempt to process next row failed"getSql(), se);
		}
	}

Read the cursor and map to the type of object this reader should return. This method must be overriden by subclasses.

Parameters:
rs The current result set
currentRow Current position of the result set
Returns:
the mapped object at the cursor position
Throws:
java.sql.SQLException
	protected abstract T readCursor(ResultSet rsint currentRowthrows SQLException;

Use java.sql.ResultSet.absolute(int) if possible, otherwise scroll by calling java.sql.ResultSet.next().
	protected void jumpToItem(int itemIndexthrows Exception {
			try {
				.absolute(itemIndex);
			}
			catch (SQLException e) {
				// Driver does not support rs.absolute(int) revert to
				// traversing ResultSet
				.warn("The JDBC driver does not appear to support ResultSet.absolute(). Consider"
" reverting to the default behavior setting the driverSupportsAbsolute to false"e);
				moveCursorToRow(itemIndex);
			}
		}
		else {
			moveCursorToRow(itemIndex);
		}
	}
New to GrepCode? Check out our FAQ X