Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  // This software is released into the Public Domain.  See copying.txt for details.
  package org.openstreetmap.osmosis.replication.v0_6;
  
  import java.io.File;
  import java.util.Date;
  import java.util.Map;
  
Consumes the files in a replication directory and combines them into larger replication files grouped by a time interval. This allows replication files created at regular intervals to be combined into larger files for more efficient consumption where latency is less of an issue.
 
 	private static final Logger LOG = Logger.getLogger(ReplicationFileMerger.class.getName());
 
 	private static final String DATA_DIRECTORY = "data";
 	private static final String CONFIG_FILE = "configuration.txt";
 
 	private boolean sinkActive;
 	private ChangeSink changeSink;
Creates a new instance.

Parameters:
workingDirectory The directory containing configuration and tracking files.
 
 	public ReplicationFileMerger(File workingDirectory) {
 		super(workingDirectory);
 
 
 		 = false;
 	}
 
 
 	private Date alignDateToIntervalBoundary(Date requestedDatelong intervalLength) {
 		long remainder;
 
 		remainder = requestedDate.getTime() % intervalLength;
 
 		if (remainder > 0) {
 			return new Date(requestedDate.getTime() - remainder);
 		} else {
 			return requestedDate;
 		}
 	}


 
 	protected Date calculateMaximumTimestamp(ReplicationDownloaderConfiguration configurationDate serverTimestamp,
 			Date localTimestamp) {
 		Date maximumTimestamp;
 		long intervalLength;
 
 		// Read the current persisted state.
 
 		// Get the default maximum timestamp according to base calculations.
 		maximumTimestamp = super.calculateMaximumTimestamp(configurationserverTimestamplocalTimestamp);
 
 		// Align the maximum timestamp to an interval boundary.
 		intervalLength = getConfiguration().getIntervalLength();
 		if (intervalLength > 0) {
 			maximumTimestamp = alignDateToIntervalBoundary(maximumTimestampintervalLength);
 
 			// For the first sequence file, we make sure we make sure that the
 			// maximum timestamp is
 			// ahead of the data timestamp. If it isn't, we move the maximum
 			// timestamp backwards by
 			// one interval to address the case where the local timestamp is
 			// behind the data
 			// timestamp causing some data to be downloaded and processed.
 				if (maximumTimestamp.compareTo(.getTimestamp()) <= 0) {
 					maximumTimestamp = new Date(maximumTimestamp.getTime() - intervalLength);
				}
			}
		}
		// If the maximum timestamp exceeds the current local timestamp, but
		// does not exceed the current data timestamp then we shouldn't perform
		// any processing. If we download data we'll be forced to open a new
		// data file for the next interval which will not be populated fully
		// if the maximum timestamp is not high enough. To stop processing, we
		// simply set the maximum timestamp to equal the current local
		// timestamp.
		if ((maximumTimestamp.compareTo(localTimestamp) > 0)
				&& (maximumTimestamp.compareTo(.getTimestamp()) <= 0)) {
			maximumTimestamp = localTimestamp;
		}
		.finer("Maximum timestamp is " + maximumTimestamp);
		return maximumTimestamp;
	}
	private ChangeSink buildResultWriter(long sequenceNumber) {
		XmlChangeWriter xmlChangeWriter;
		ChangeSorter changeSorter;
		xmlChangeWriter = .saveData(sequenceNumber);
		changeSorter.setChangeSink(xmlChangeWriter);
		return changeSorter;
	}
	private void writeChangeset(XmlChangeReader xmlReader) {
		final ChangeSink localChangeSink = ;
		xmlReader.setChangeSink(new ChangeSink() {
			private ChangeSink suppressedWriter = localChangeSink;
			public void initialize(Map<StringObjectmetaData) {
				// Suppress the call.
			}
			public void process(ChangeContainer change) {
			}
			public void complete() {
				// Suppress the call.
			}
			public void release() {
				// Suppress the call.
			}
		});
		xmlReader.run();
	}
	}


	protected void processInitialize(Map<StringObjectmetaData) {
		// Do nothing.
	}


	protected void processInitializeState(ReplicationState initialState) {
		Date initialDate;
		Date alignedDate;
		long intervalLength;
		intervalLength = getConfiguration().getIntervalLength();
		initialDate = initialState.getTimestamp();
		// Align the date to an interval boundary.
		alignedDate = alignDateToIntervalBoundary(initialDateintervalLength);
		// If the date has been moved, then advance it to the next interval. We
		// do this because
		// during replication we never claim to have covered a time period that
		// we haven't received
		// data for. We may include extra data from a previous interval. By
		// advancing the stated
		// initial timestamp to the next interval our first replication will
		// include some data from
		// the previous interval.
		if (alignedDate.compareTo(initialDate) < 0) {
			alignedDate = new Date(alignedDate.getTime() + intervalLength);
		}
		// Create an initial replication state object.
		 = new ReplicationState(alignedDate, 0);
		// Write out the initial "0" state file.
	}


	protected void processChangeset(XmlChangeReader xmlReaderReplicationState replicationState) {
		int intervalLength;
		configuration = getConfiguration();
		// Get the configured interval length.
		intervalLength = configuration.getIntervalLength();
		// If this is the first time through, initialise a writer for the next
		// sequence number.
		if (!) {
			// Increment the current sequence number.
			// Initialise an output file for the new sequence number.
			.finer("Opening change sink for interval with sequence number " + .getSequenceNumber());
		}
		if (intervalLength > 0) {
			// If this is the first time through, align the timestamp at the
			// next boundary.
			if (!) {
				Date intervalEnd;
				intervalEnd = new Date(.getTimestamp().getTime() + intervalLength);
				intervalEnd = alignDateToIntervalBoundary(intervalEndintervalLength);
				.finer("End of current interval is " + intervalEnd);
			}
			// If the replication state has moved us past the current interval
			// end point we need to
			// open a new interval. This may occur many times if the current
			// replication state moves
			// us past several intervals.
			while (replicationState.getTimestamp().compareTo(.getTimestamp()) > 0) {
				// If we have an open changeset writer, close it and save the
				// current state.
				.finer("Closing change sink for interval with sequence number "
				// Update the state to match the next interval.
configuration.getIntervalLength()));
				// Begin a new interval.
				.finer("Opening change sink for interval with sequence number "
			}
else {
			// There is no maximum interval set, so simply update the current
			// state based on the
			// current replication state.
			.finer("End of current interval is " + replicationState.getTimestamp());
		}
		// Write the changeset to the writer.
		writeChangeset(xmlReader);
		// We are guaranteed to have an active writer at this point.
		 = true;
	}


	protected void processComplete() {
		if () {
			.finer("Closing change sink for interval with sequence number " + .getSequenceNumber());
			 = null;
			 = false;
		}
	}


	protected void processRelease() {
		if () {
			 = false;
		}
	}
New to GrepCode? Check out our FAQ X