Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package com.splout.db.dnode;
  
  /*
   * #%L
   * Splout SQL Server
   * %%
   * Copyright (C) 2012 - 2013 Datasalt Systems S.L.
   * %%
   * This program is free software: you can redistribute it and/or modify
  * it under the terms of the GNU Affero General Public License as published by
  * the Free Software Foundation, either version 3 of the License, or
  * (at your option) any later version.
  * 
  * This program is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU General Public License for more details.
  * 
  * You should have received a copy of the GNU Affero General Public License
  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  * #L%
  */
 
 import java.io.File;
 import java.net.URL;
 import java.util.Map;
 
 
A simple class that allows for fast (GZip), chunked transport of binary files between nodes through HTTP. This class is both a server and a client: use its Runnable method for creating a server that can receive files or use the send(java.lang.String,int,long,java.io.File,java.lang.String,boolean) method for sending a file to another peer.

For safety, every transfer checks whether the checksum (CRC32) matches the expected one or not.

This class should have the same semantics as Fetcher so it should save files to the same temp folder, etc. It can be configured by com.splout.db.common.SploutConfiguration.

 
 public class HttpFileExchanger extends Thread implements HttpHandler {
 
 	private final static Log log = LogFactory.getLog(HttpFileExchanger.class);
 
 	private File tempDir;
 
 	// this thread pool is passed to the HTTP server for handling incoming requests
 	// this thread pool is used for sending multiple files at the same time
 	private HttpServer server;
 
 	private AtomicBoolean isListening = new AtomicBoolean(false);
 	private AtomicBoolean isInit = new AtomicBoolean(false);
 
 	// This callback will be called when the files are received
 	
 	private Map<StringObjectcurrentTransfers = new HashMap<StringObject>();
 	private Object currentTransfersMonitor = new Object();
 	
 		this. = config;
 		this. = callback;
 	}
 
 	public interface ReceiveFileCallback {
 
 		public void onProgress(String tablespaceInteger partitionLong versionFile filelong totalSizelong sizeDownloaded);
 		public void onFileReceived(String tablespaceInteger partitionLong versionFile file);
 		public void onBadCRC(String tablespaceInteger partitionLong versionFile file);
 		public void onError(Throwable tString tablespaceInteger partitionLong versionFile file);
 	}

We initialize everything in an init() method to be able to catch explicit exceptions (otherwise that's not possible in Thread's run()).
	public void init() throws IOException {
		int httpPort = 0;
		int trials = 0;
		boolean bind = false;
		do {
			try {
				bind = true;
catch(BindException e) {
else {
					throw e;
				}
			}
while(!bind && trials < 50);
		// serve all http requests at root context
		.createContext("/"this);
		.set(true);
	}
	public String address() {
	}
	public void run() {
		if(!.get()) {
			throw new IllegalStateException("HTTP server must be init with init() method.");
		}
		.info("HTTP File exchanger LISTENING on port: "
	}
	public boolean isListening() {
		return .get();
	}
	public void close() {
		if( != null) {
			.warn("HTTP File exchanger STOPPED.");
		}
	}
	public void handle(HttpExchange exchangethrows IOException {
		DataInputStream iS = null;
		FileOutputStream writer = null;
		File dest = null;
		String tablespace = null
		Integer partition = null;
		Long version = null;
		try {
			iS = new DataInputStream(new GZIPInputStream(exchange.getRequestBody()));
			String fileName = exchange.getRequestHeaders().getFirst("filename");
			tablespace = exchange.getRequestHeaders().getFirst("tablespace");
			partition = Integer.valueOf(exchange.getRequestHeaders().getFirst("partition"));
			version = Long.valueOf(exchange.getRequestHeaders().getFirst("version"));
			dest = new File(new File(, DNodeHandler.getLocalStoragePartitionRelativePath(tablespace,
			    partitionversion)), fileName);
			// just in case, avoid copying the same file concurrently
			// (but we also shouldn't avoid this in other levels of the app)
			synchronized() {
					throw new IOException("Incoming file already being transferred - " + dest);
				}
			}
			if(!dest.getParentFile().exists()) {
			}
			if(dest.exists()) {
				dest.delete();
			}
			writer = new FileOutputStream(dest);
			byte[] buffer = new byte[.getInt(.)];
			Checksum checkSum = new CRC32();
			// 1- Read file size
			long fileSize = iS.readLong();
			.debug("Going to read file [" + fileName + "] of size: " + fileSize);
			// 2- Read file contents
			long readSoFar = 0;
			do {
				long missingBytes = fileSize - readSoFar;
				int bytesToRead = (int) Math.min(missingBytesbuffer.length);
				int read = iS.read(buffer, 0, bytesToRead);
				checkSum.update(buffer, 0, read);
				writer.write(buffer, 0, read);
				readSoFar += read;
				.onProgress(tablespacepartitionversiondestfileSizereadSoFar);
while(readSoFar < fileSize);
			// 3- Read CRC
			long expectedCrc = iS.readLong();
			if(expectedCrc == checkSum.getValue()) {
				.info("File [" + dest.getAbsolutePath() + "] received -> Checksum -- " + checkSum.getValue()
				    + " matches expected CRC [OK]");
				.onFileReceived(tablespacepartitionversiondest);
else {
				.error("File received [" + dest.getAbsolutePath() + "] -> Checksum -- " + checkSum.getValue()
				    + " doesn't match expected CRC: " + expectedCrc);
				.onBadCRC(tablespacepartitionversiondest);
				dest.delete();
			}
catch(Throwable t) {
			.onError(ttablespacepartitionversiondest);
			if(dest != null && dest.exists() && !t.getMessage().contains("Incoming file already being transferred")) {
				dest.delete();
			}
finally {
			if(writer != null) {
				writer.close();
			}
			if(iS != null) {
				iS.close();
			}
			if(dest != null) {
			}
		}
	}
	public void send(final String tablespacefinal int partitionfinal long version,
	    final File binaryFilefinal String urlboolean blockUntilComplete) {
		Future<?> future = .submit(new Runnable() {
			public void run() {
				DataOutputStream writer = null;
				InputStream input = null;
				try {
					HttpURLConnection connection = (HttpURLConnectionnew URL(url).openConnection();
					connection.setDoOutput(true);
					connection.setRequestProperty("filename"binaryFile.getName());
					connection.setRequestProperty("tablespace"tablespace);
					connection.setRequestProperty("partition"partition + "");
					connection.setRequestProperty("version"version + "");
					Checksum checkSum = new CRC32();
					writer = new DataOutputStream(new GZIPOutputStream(connection.getOutputStream()));
					// 1 - write file size
					writer.writeLong(binaryFile.length());
					writer.flush();
					// 2 - write file content
					input = new FileInputStream(binaryFile);
					byte[] buffer = new byte[.getInt(.)];
					long wrote = 0;
					for(int length = 0; (length = input.read(buffer)) > 0;) {
						writer.write(buffer, 0, length);
						checkSum.update(buffer, 0, length);
						wrote += length;
					}
					// 3 - add the CRC so that we can verify the download
					writer.writeLong(checkSum.getValue());
					writer.flush();
					.info("Sent file " + binaryFile + " to " + url + " with #bytes: " + wrote
					    + " and checksum: " + checkSum.getValue());
catch(IOException e) {
finally {
					try {
						if(input != null) {
							input.close();
						}
						if(writer != null) {
							writer.close();
						}
catch(IOException ignore) {
					}
				}
			}
		});
		try {
			if(blockUntilComplete) {
				while(future.isDone() || future.isCancelled()) {
					Thread.sleep(1000);
				}
			}
catch(InterruptedException e) {
			// interrupted!
		}
	}
  	return ;
  }
	// Use this main for testing purposes, as a client
	// args: [file] [server]
	public static void main(String[] argsthrows IOException {
		SploutConfiguration conf = SploutConfiguration.get();
		HttpFileExchanger fileExchanger = new HttpFileExchanger(confnew ReceiveFileCallback() {
			public void onProgress(String tablespaceInteger partitionLong versionFile filelong totalSize,
			    long sizeDownloaded) {
			}
			public void onFileReceived(String tablespaceInteger partitionLong versionFile file) {
			}
			public void onError(Throwable tString tablespaceInteger partitionLong versionFile file) {
			}
			public void onBadCRC(String tablespaceInteger partitionLong versionFile file) {
			}
		});
		fileExchanger.init();
		fileExchanger.send("t1", 0, 1l, new File(args[0]), args[1], true);
		fileExchanger.close();
	}
New to GrepCode? Check out our FAQ X