Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  // This software is released into the Public Domain.  See copying.txt for details.
  package org.openstreetmap.osmosis.replicationhttp.v0_6.impl;
  
  import java.io.File;
 import java.util.Date;
 
A sequence server handler implementation that sends the replication data associated with sequence numbers.

Author(s):
Brett Henderson
 
 
 	private static final Logger LOG = Logger.getLogger(ReplicationDataServerHandler.class.getName());
 	private static final String REQUEST_DATE_FORMAT = "yyyy-MM-dd-HH-mm-ss";
 	private static final int CHUNK_SIZE = 4096;
 
 	private File dataDirectory;
 	private boolean fileSizeSent;
 	private boolean includeData;
Creates a new instance.

Parameters:
control Provides the Netty handlers with access to the controller.
dataDirectory The directory containing the replication data files.
 
 	public ReplicationDataServerHandler(SequenceServerControl controlFile dataDirectory) {
 		super(control);
 
 		this. = dataDirectory;
 
 	}
 
 
 		Calendar calendar = new GregorianCalendar(TimeZone.getTimeZone("UTC"));
 		dateParser.setCalendar(calendar);
 
 		return dateParser;
 	}
 
 
 	private File getStateFile(long sequenceNumber) {
 		return new File(.getFormattedName(sequenceNumber".state.txt"));
 	}
 
 
 	private File getDataFile(long sequenceNumber) {
 		return new File(.getFormattedName(sequenceNumber".osc.gz"));
 	}
 
 
 	private ReplicationState getReplicationState(long sequenceNumber) {
 		PropertiesPersister persister = new PropertiesPersister(getStateFile(sequenceNumber));
 		ReplicationState state = new ReplicationState();
 		state.load(persister.loadMap());
 
		return state;
	}


Search through the replication state records and find the nearest replication number with a timestamp earlier or equal to the requested date. It is not sufficient to find the minimum known sequence record with a timestamp greater than the requested date because there may be missing replication records in between.

Parameters:
lastDate The last date known by the client.
Returns:
The associated sequence number.
	private long getNextSequenceNumberByDate(Date lastDate) {
		long startBound = 0;
		long endBound = getControl().getLatestSequenceNumber();
		// If the requested date is greater than or equal to the latest known
		// timestamp we should return our latest sequence number so that the
		// client will start receiving all new records as they arrive with
		// possibly some duplicated change records.
		if (lastDate.compareTo(getReplicationState(endBound).getTimestamp()) >= 0) {
			return endBound;
		}
		// Continue splitting our range in half until either we find the
		// requested record, or we only have one possibility remaining.
		while ((endBound - startBound) > 1) {
			// Calculate the current midpoint.
			long midPoint = startBound + ((endBound - startBound) / 2);
			// If the midpoint doesn't exist we need to reset the start bound to
			// the midpoint and search again.
			if (!getStateFile(midPoint).exists()) {
				startBound = midPoint;
				continue;
			}
			// If the midpoint timestamp is greater we search in the lower half,
			// otherwise the higher half.
			int comparison = lastDate.compareTo(getReplicationState(midPoint).getTimestamp());
			if (comparison == 0) {
				// We have an exact match so stop processing now.
				return midPoint;
else if (comparison < 0) {
				// We will now search in the lower half of the search range.
				// Even though we know the midpoint is not the right value, we
				// include it in the next range because our search assumes that
				// the right sequence number is less than the end point.
				endBound = midPoint;
else {
				// We will now search in the upper half of the search range.
				// Even though the mid point has a timestamp less than the
				// requested value, it still may be the selected value if the
				// next timestamp is greater.
				startBound = midPoint;
			}
		}
		// We only have one possibility remaining which is the start bound. This
		// is the requested record if it exists and has a timestamp less than or
		// equal to that requested.
		if (getStateFile(startBound).exists()
				&& lastDate.compareTo(getReplicationState(startBound).getTimestamp()) >= 0) {
			return startBound;
else {
			// We cannot find any replication records with an early enough date.
			// This typically means that replication records for that time
			// period either no longer exist or never existed.
			throw new ResourceGoneException();
		}
	}
	private FileChannel openFileChannel(File file) {
		try {
			return new FileInputStream(file).getChannel();
catch (FileNotFoundException e) {
			throw new OsmosisRuntimeException("Unable to open file " + filee);
		}
	}
	private ChannelBuffer readFromFile(FileChannel fileChannelint bytesToRead) {
		try {
			// Allocate a buffer for the data to be read.
			byte[] rawBuffer = new byte[bytesToRead];
			// Copy data into the buffer using NIO.
			ByteBuffer nioBuffer = ByteBuffer.wrap(rawBuffer);
			for (int bytesRead = 0; bytesRead < bytesToRead;) {
				int lastBytesRead = fileChannel.read(nioBuffer);
				// We always expect to read data.
				if (lastBytesRead < 0) {
					throw new OsmosisRuntimeException("Unexpectedly reached the end of the replication data file");
				}
				if (lastBytesRead == 0) {
					throw new OsmosisRuntimeException("Last read of the replication data file returned 0 bytes");
				}
				bytesRead += lastBytesRead;
			}
			// Create and return a Netty buffer.
			return ChannelBuffers.wrappedBuffer(rawBuffer);
catch (IOException e) {
			throw new OsmosisRuntimeException("Unable to read from the replication data file"e);
		}
	}
	private ChannelBuffer loadFile(File file) {
		FileChannel fileChannel = openFileChannel(file);
		try {
			if (fileChannel.size() > .) {
				throw new OsmosisRuntimeException("Maximum file size supported is " + . + " bytes");
			}
			// Determine the size of the file.
			int fileSize = (intfileChannel.size();
			// Read the entire file.
			ChannelBuffer buffer = readFromFile(fileChannelfileSize);
			// We no longer need access to the file.
			fileChannel.close();
			return buffer;
catch (IOException e) {
			throw new OsmosisRuntimeException("Unable to read from file " + filee);
finally {
			try {
				fileChannel.close();
catch (IOException e) {
				.log(."Unable to close channel for file " + filee);
			}
		}
	}
		try {
			// Determine how many bytes are left in the file.
			// We will only send up to our maximum chunk size.
			if (remaining > ) {
				remaining = ;
			}
			// Read the next data for the next chunk.
			ChannelBuffer buffer = readFromFile(, (intremaining);
			// Close the file if we've reached the end.
			}
			return buffer;
catch (IOException e) {
			throw new OsmosisRuntimeException("Unable to read from the replication data file"e);
		}
	}
	private ChannelBuffer buildChunkHeader(long chunkSize) {
		return ChannelBuffers.copiedBuffer(Long.toString(chunkSize) + "\r\n".);
	}
	protected void handleRequest(ChannelHandlerContext ctxHttpRequest request) {
		final String replicationStateUri = "replicationState";
		final String replicationDataUri = "replicationData";
		final String textContentType = "text/plain";
		final String dataContentType = "application/octet-stream";
		// Split the request Uri into its path elements.
		String uri = request.getUri();
		if (!uri.startsWith("/")) {
			throw new OsmosisRuntimeException("Uri doesn't start with a / character: " + uri);
		}
		Queue<StringuriElements = new LinkedList<String>(Arrays.asList(uri.split("/")));
		uriElements.remove(); // First element is empty due to leading '/'.
		// First element must be either the replication state or replication
		// data uri which determines whether replication data will be included
		// or just the replication state.
		String contentType;
		if (uriElements.isEmpty()) {
		}
		String requestTypeString = uriElements.remove();
		if (replicationStateUri.equals(requestTypeString)) {
			contentType = textContentType;
			 = false;
else if (replicationDataUri.equals(requestTypeString)) {
			contentType = dataContentType;
			 = true;
else {
		}
		/*
		 * The next element determines which replication number to start from.
		 * The request is one of "current" or N where is the last sequence
		 * number received by the client.
		 */
		long nextSequenceNumber;
		if (uriElements.isEmpty()) {
		}
		String sequenceStartString = uriElements.remove();
		if ("current".equals(sequenceStartString)) {
			nextSequenceNumber = getControl().getLatestSequenceNumber();
else {
			// Try to parse the sequence start string as a number. If that fails
			// try to parse as a date.
			try {
				nextSequenceNumber = Long.parseLong(sequenceStartString);
catch (NumberFormatException e) {
				try {
					Date lastDate = getRequestDateParser().parse(sequenceStartString);
					nextSequenceNumber = getNextSequenceNumberByDate(lastDate);
catch (ParseException e1) {
					throw new BadRequestException("Requested sequence number of " + sequenceStartString
" is not a number, or a date in format yyyy-MM-dd-HH-mm-ss.");
				}
			}
		}
		// If the next element exists and is "tail" it means that the client
		// wants to stay connected and receive updated sequences as they become
		// available.
		boolean follow;
		if (!uriElements.isEmpty()) {
			String tailElement = uriElements.remove();
			if ("tail".equals(tailElement)) {
				follow = true;
else {
			}
else {
			follow = false;
		}
		// Validate that that no more URI elements are available.
		if (!uriElements.isEmpty()) {
		}
		// Begin sending replication sequence information to the client.
			.finer("New request details, includeData=" +  + ", sequenceNumber=" + nextSequenceNumber
", tail=" + follow);
		}
		initiateSequenceWriting(ctxcontentTypenextSequenceNumberfollow);
	}
	protected void writeSequence(ChannelHandlerContext ctxChannelFuture futurelong sequenceNumber) {
		// We do not support sending new replication data until the previous
		// send has completed.
		if ( != null) {
					"We cannot send new replication data until the previous write has completed");
		}
			.finest("Sequence being written, includeData=" +  + ", sequenceNumber="
sequenceNumber);
		}
		// We must save the future to attach to the final write.
		 = future;
		// Get the name of the replication data file.
		File stateFile = getStateFile(sequenceNumber);
		File dataFile = getDataFile(sequenceNumber);
		// Load the contents of the state file.
		ChannelBuffer stateFileBuffer = loadFile(stateFile);
		// Add a chunk length header.
		stateFileBuffer = ChannelBuffers.wrappedBuffer(buildChunkHeader(stateFileBuffer.readableBytes()),
				stateFileBuffer);
		// Only include replication data if initially requested by the client
		// and if this is not sequence 0.
		if ( && sequenceNumber > 0) {
			// Open the data file read for sending.
			 = false;
		}
		/*
		 * Send the state file to the client. If replication data is to be sent
		 * we will continue when we receive completion information via the
		 * writeComplete method. We must create a new future now if we have more
		 * data coming because we don't want the future of the current event to
		 * fire until we're completely finished processing.
		 */
		ChannelFuture writeFuture;
		if ( != null) {
			writeFuture = Channels.future(ctx.getChannel());
else {
			writeFuture = ;
		}
		Channels.write(ctxwriteFuturenew DefaultHttpChunk(stateFileBuffer));
	}
		if ( != null) {
			// We have an open file channel so we are still sending replication
			// data.
			if (!) {
				// Send a chunk header containing the size of the file.
				 = true;
				future = Channels.future(ctx.getChannel());
				buffer = fileSizeBuffer;
else {
				// Send the next chunk to the client.
				buffer = getFileChunk();
				if ( != null) {
					future = Channels.future(ctx.getChannel());
else {
					// This is the last write for this sequence so attach the original future.
					future = ;
				}
			}
			// Write the data to the channel.
			Channels.write(ctxfuturenew DefaultHttpChunk(buffer));
else {
			super.writeComplete(ctxe);
		}
	}
		// Close the in-progress chunk file channel if it exists.
		if ( != null) {
			try {
catch (IOException ex) {
				.log(."Unable to close the replication data file."ex);
			}
		}
		super.channelClosed(ctxe);
	}
New to GrepCode? Check out our FAQ X