Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  // This software is released into the Public Domain.  See copying.txt for details.
  package org.openstreetmap.osmosis.replication.v0_6;
 import java.util.Date;
 import java.util.Map;
This class downloads a set of replication files from a HTTP server and tracks the progress of which files have already been processed. The actual processing of changeset files is performed by sub-classes. This class forms the basis of a replication mechanism.

Brett Henderson
 public abstract class BaseReplicationDownloader implements RunnableTask {
 	private static final Logger LOG = Logger.getLogger(BaseReplicationDownloader.class.getName());
 	private static final String LOCK_FILE = "download.lock";
 	private static final String CONFIG_FILE = "configuration.txt";
 	private static final String LOCAL_STATE_FILE = "state.txt";
 	private File workingDirectory;
Creates a new instance.

workingDirectory The directory containing configuration and tracking files.
 	public BaseReplicationDownloader(File workingDirectory) {
 		this. = workingDirectory;

Provides sub-classes with access to the working directory.

The working directory for the task.
 	protected File getWorkingDirectory() {
 		return ;

Downloads the file from the server with the specified name and writes it to a local temporary file.

fileName The name of the file to download.
baseUrl The url of the directory containing change files.
The temporary file containing the downloaded data.
 	private File downloadReplicationFile(String fileNameURL baseUrl) {
 		URL changesetUrl;
 		InputStream inputStream = null;
 		OutputStream outputStream = null;
 		try {
 			changesetUrl = new URL(baseUrlfileName);
 		} catch (MalformedURLException e) {
 			throw new OsmosisRuntimeException("The server file URL could not be created."e);
 		try {
			File outputFile;
			byte[] buffer;
			// Open an input stream for the changeset file on the server.
			URLConnection connection = changesetUrl.openConnection();
			connection.setReadTimeout(15 * 60 * 1000); // timeout 15 minutes
			connection.setConnectTimeout(15 * 60 * 1000); // timeout 15 minutes
			inputStream = connection.getInputStream();
			source = new BufferedInputStream(inputStream, 65536);
			// Create a temporary file to write the data to.
			outputFile = File.createTempFile("change"null);
			// Open a output stream for the destination file.
			outputStream = new FileOutputStream(outputFile);
			sink = new BufferedOutputStream(outputStream, 65536);
			// Download the file.
			buffer = new byte[65536];
			for (int bytesRead =; bytesRead > 0; bytesRead = {
				sink.write(buffer, 0, bytesRead);
			// Clean up all file handles.
			inputStream = null;
			outputStream = null;
			return outputFile;
catch (IOException e) {
			throw new OsmosisRuntimeException("Unable to read the changeset file " + fileName + " from the server."e);
finally {
			try {
				if (inputStream != null) {
catch (IOException e) {
				// We are already in an error condition so log and continue.
				.log(."Unable to changeset download stream."e);
			try {
				if (outputStream != null) {
catch (IOException e) {
				// We are already in an error condition so log and continue.
				.log(."Unable to changeset output stream."e);
	private void processReplicationFile(File replicationFileReplicationState replicationState) {
		try {
			XmlChangeReader xmlReader;
			// Send the contents of the replication file to the sink but suppress the complete
			// and release methods.
			xmlReader = new XmlChangeReader(replicationFiletrue.);
			// Delegate to the sub-class to process the xml.
finally {
			if (!replicationFile.delete()) {
				.warning("Unable to delete file " + replicationFile.getName());

Determines the maximum timestamp of data to be downloaded during this invocation. This may be overriden by sub-classes, but the sub-classes must call this implemention first and then limit the maximum timestamp further if needed. A sub-class may never increase the maximum timestamp beyond that calculated by this method.

configuration The configuration.
serverTimestamp The timestamp of the latest data on the server.
localTimestamp The timestamp of the most recently downloaded data.
The maximum timestamp for this invocation.
	protected Date calculateMaximumTimestamp(ReplicationDownloaderConfiguration configurationDate serverTimestamp,
			Date localTimestamp) {
		Date maximumTimestamp;
		maximumTimestamp = serverTimestamp;
		// Limit the duration according to the maximum defined in the configuration.
		if (configuration.getMaxInterval() > 0) {
			if ((serverTimestamp.getTime() - localTimestamp.getTime())
configuration.getMaxInterval()) {
				maximumTimestamp = new Date(localTimestamp.getTime() + configuration.getMaxInterval());
		.finer("Maximum timestamp is " + maximumTimestamp);
		return maximumTimestamp;
			ReplicationState initialLocalState) {
		URL baseUrl;
		ReplicationState localState;
		Date maximumDownloadTimestamp;
		localState = initialLocalState;
		// Determine the location of download files.
		baseUrl = configuration.getBaseUrl();
		// Determine the maximum timestamp that can be downloaded.
		maximumDownloadTimestamp =
			calculateMaximumTimestamp(configurationserverState.getTimestamp(), localState.getTimestamp());
		.fine("The maximum timestamp to be downloaded is " + maximumDownloadTimestamp + ".");
		// Download all files and send their contents to the sink.
		while (localState.getSequenceNumber() < serverState.getSequenceNumber()) {
			File replicationFile;
			long sequenceNumber;
			ReplicationState fileReplicationState;
			// Check to see if our local state has already reached the maximum
			// allowable timestamp. This will typically occur if a job is run
			// again before new data becomes available, or if an implementation
			// of this class (eg. ReplicationFileMerger) is waiting for a full
			// time period of data to become available before processing.
			if (localState.getTimestamp().compareTo(maximumDownloadTimestamp) >= 0) {
			// Calculate the next sequence number.
			sequenceNumber = localState.getSequenceNumber() + 1;
			.finer("Processing replication sequence " + sequenceNumber + ".");
			// Get the state associated with the next file.
			fileReplicationState = .getServerState(baseUrlsequenceNumber);
			// Ensure that the next state is within the allowable timestamp
			// range. We must stop if the next data takes us beyond the maximum
			// timestamp. This will either occur if a maximum download time
			// duration limit has been imposed, or if a time-aligned boundary
			// has been reached.
			if (fileReplicationState.getTimestamp().compareTo(maximumDownloadTimestamp) > 0) {
				// We will always allow at least one replication interval
				// through to deal with the case where a single interval exceeds
				// the maximum duration. This can happen if the source data has
				// a long time gap between two intervals due to system downtime.
				if (localState.getSequenceNumber() != initialLocalState.getSequenceNumber()) {
			// Download the next replication file to a temporary file.
			replicationFile =
				downloadReplicationFile(.getFormattedName(sequenceNumber".osc.gz"), baseUrl);
			// Process the file and send its contents to the sink.
			// Update the local state to reflect the file state just processed.
			localState = fileReplicationState;
		return localState;
	private void runImpl() {
		try {
			ReplicationState serverState;
			ReplicationState localState;
			PropertiesPersister localStatePersistor;
			// Instantiate utility objects.
			// Obtain the server state.
			.fine("Reading current server state.");
			serverState = .getServerState(configuration.getBaseUrl());
			// Build the local state persister which is used for both loading and storing local state.
			localStatePersistor = new PropertiesPersister(new File());
			// Begin processing.
			// If local state isn't available we need to copy server state to be the initial local state
			// then exit.
			if (localStatePersistor.exists()) {
				localState = new ReplicationState(localStatePersistor.loadMap());
				// Download and process the replication files.
				localState = download(configurationserverStatelocalState);
else {
				localState = serverState;
			// Commit downstream changes.
			// Persist the local state.;
finally {

This is called prior to any processing being performed. It allows any setup activities to be performed.

metaData The meta data associated with this processing request (empty in the current implementation).
	protected abstract void processInitialize(Map<StringObjectmetaData);

Invoked once during the first execution run to allow initialisation based on the initial replication state downloaded from the server.

initialState The first server state.
	protected abstract void processInitializeState(ReplicationState initialState);

Processes the changeset.

xmlReader The changeset reader initialised to point to the changeset file.
replicationState The replication state associated with the changeset file.
	protected abstract void processChangeset(XmlChangeReader xmlReaderReplicationState replicationState);

This is implemented by sub-classes and is called when all changesets have been processed. This should perform any completion tasks such as committing changes to a database.
	protected abstract void processComplete();

This is implemented by sub-classes and is called and the completion of all processing regardless of whether it was successful or not. This should perform any cleanup tasks such as closing files or releasing database connections.
	protected abstract void processRelease();

	public void run() {
		FileBasedLock fileLock;
		fileLock = new FileBasedLock(new File());
		try {
finally {
New to GrepCode? Check out our FAQ X