Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  // This software is released into the Public Domain.  See copying.txt for details.
  package org.openstreetmap.osmosis.replicationhttp.v0_6.impl;
  
  import java.io.File;
  import java.util.HashMap;
  import java.util.List;
  import java.util.Map;
 
Netty handler for receiving replication data and notifying listeners.

Author(s):
Brett Henderson
 
 
 	private static final Logger LOG = Logger.getLogger(ReplicationDataClientHandler.class.getName());
 
 	private ChangeSink changeSink;
 	private String pathPrefix;
 	private boolean sinkInitInvoked;
 	private boolean replicationStateReceived;
Creates a new instance.

Parameters:
control Provides the Netty handlers with access to the controller.
changeSink The destination for the replication data.
serverHost The name of the host system running the sequence server.
pathPrefix The base path to add to the URL. This is necessary if a data server is sitting behind a proxy server that adds a prefix to the request path.
 
 	public ReplicationDataClientHandler(SequenceClientControl controlChangeSink changeSinkString serverHost,
 			String pathPrefix) {
 		super(controlserverHost);
 
 		this. = changeSink;
 		this. = pathPrefix;
 
 
 		 = false;
 		 = null;
 	}
 
 
 	private void sendReplicationData(File chunkFile) {
 		// Release all class level resources and prepare for passing the
 		// replication data downstream.
 		 = null;
 		 = false;
 		
 		// Send the replication data downstream but don't call any lifecycle
 		// methods on the change sink because we're managing those separately.
 		if (chunkFile != null) {
 			RunnableChangeSource changeReader = new XmlChangeReader(chunkFiletrue.);
 			changeReader.run();
 		}
 		
 	}
 
 
 	private void invokeSinkInit() {
 		Map<StringObjectmetaData = new HashMap<StringObject>(1);
 		.initialize(metaData);
 		 = true;
 	}
 
 
	protected String getRequestUri() {
		// We need to know the last replication number that we have received on
		// a previous run. To do this we need to retrieve the replication state
		// from our downstream replication task by initializing.
		// The downstream task returns the next sequence number.
		long requestSequenceNumber = .getSequenceNumber();
		return  + "/replicationData/" + requestSequenceNumber + "/tail";
	}
	private ReplicationState loadState(File stateFile) {
		PropertiesPersister persister = new PropertiesPersister(stateFile);
		state.load(persister.loadMap());
		return state;
	}
	protected void processMessageData(ChannelBuffer buffer) {
		// Break the data down according to chunk alignment.
		List<FilechunkFiles = .processData(buffer);
		try {
			for (File chunkFile : chunkFiles) {
					// We usually have to invoke the sink init, but if this is
					// during startup we may have already performed this step
					// while preparing our initial request.
					}
					// The first chunk contains the replication state stored in
					// properties format.
					ReplicationState serverReplicationState = loadState(chunkFile);
						.finer("Received replication state " + serverReplicationState.getSequenceNumber());
					}
					// Validate that the server has sent us the expected state.
					if (serverReplicationState.getSequenceNumber() != .getSequenceNumber()) {
						throw new OsmosisRuntimeException("Received sequence number "
serverReplicationState.getSequenceNumber() + " from server, expected "
					}
					// Update the local state with server values.
					.setTimestamp(serverReplicationState.getTimestamp());
					// If this is replication 0, then we need to finish
					// processing now because the first sequence doesn't have
					// any data.
					}
else {
				}
			}
finally {
			// Delete all chunk files.
			for (File chunkFile : chunkFiles) {
				if (!chunkFile.delete()) {
					.log(."Unable to delete the current temporary chunk file " + chunkFile);
				}
			}
		}
	}
		// Release any half populated chunk files.
		super.channelClosed(ctxe);
	}

This acts as a proxy between the xml change reader and the real change sink. The primary purpose is to only propagate calls to process because the lifecycle methods initialize, complete and release are managed separately.
	private static class NoLifecycleChangeSinkWrapper implements ChangeSink {
Creates a new instance.

Parameters:
changeSink The wrapped change sink.
			this. = changeSink;
		}
		public void initialize(Map<StringObjectmetaData) {
			// Do nothing.
		}
		public void process(ChangeContainer change) {
		}
		public void complete() {
			// Do nothing.
		}
		public void release() {
			// Do nothing.
		}
	}
New to GrepCode? Check out our FAQ X