Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package com.splout.db.dnode;
  
  /*
   * #%L
   * Splout SQL Server
   * %%
   * Copyright (C) 2012 Datasalt Systems S.L.
   * %%
   * This program is free software: you can redistribute it and/or modify
  * it under the terms of the GNU Affero General Public License as published by
  * the Free Software Foundation, either version 3 of the License, or
  * (at your option) any later version.
  * 
  * This program is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU General Public License for more details.
  * 
  * You should have received a copy of the GNU Affero General Public License
  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  * #L%
  */
 
 import java.io.File;
 import java.net.URI;
 
 import  org.apache.hadoop.conf.Configuration;
 import  org.apache.hadoop.fs.FSDataInputStream;
 import  org.apache.hadoop.fs.FSDataOutputStream;
 import  org.apache.hadoop.fs.FileStatus;
 import  org.apache.hadoop.fs.FileSystem;
 import  org.apache.hadoop.fs.Path;
 
This Fetcher is used by DNodeHandler to fetch data to deploy. It handles: file, HDFS and S3 URIs. For the S3 service it uses the JETS3T library.

The fetcher has to return a local File object which is a folder that can be used to perform an atomic "mv" operation. The folder has to contain the DB file.

 
 public class Fetcher {
 
 	private final static Log log = LogFactory.getLog(Fetcher.class);
 
 
 	Configuration hadoopConf;
 
 	public Fetcher(SploutConfiguration config) {
 		 = new Configuration();
 		if(fsName != null) {
 			.set("fs.default.name"fsName);
 		}
 		.info("Created " + Fetcher.class + " with tempDir = " + );
 		if( > 0) {
 			.info("Throttling at: " +  + " bytes per sec.");
 		} else {
 			.warn("No throttling. Fetched data will be transferred at full speed. This may affect query servicing.");
 		}
 	}
 
 		AWSCredentials credentials = new AWSCredentials();
 		return credentials;
 	}
 
 	/*
 	 * Fetch a file that is in a Hadoop file system. Return a local File.
 	 */
 	private File hdfsFetch(String paththrows IOException {
 		Path fromPath = new Path(path);
		File toFile = new File(fromPath.toUri().getPath());
		File toDir = new File(toFile.getParent());
		if(toDir.exists()) {
			FileUtils.deleteDirectory(toDir);
		}
		toDir.mkdirs();
		Path toPath = new Path(toFile.getCanonicalPath());
		FileSystem fS = fromPath.getFileSystem();
		FileSystem tofS = FileSystem.getLocal();
		Throttler throttler = new Throttler((double);
		for(FileStatus fStatus : fS.globStatus(fromPath)) {
			.info("Copying " + fStatus.getPath() + " to " + toPath);
			FSDataInputStream iS = fS.open(fStatus.getPath());
			FSDataOutputStream oS = tofS.create(toPath);
			byte[] buffer = new byte[];
			int nRead;
			while((nRead = iS.read(buffer, 0, buffer.length)) != -1) {
				oS.write(buffer, 0, nRead);
				throttler.incrementAndThrottle(nRead);
			}
			oS.close();
			iS.close();
		}
		return toDir;
	}

Implements basic throttling capabilities.
	public static class Throttler {
		double bytesPerSec;
		long lastTime = System.currentTimeMillis();
		public Throttler(double bytesPerSec) {
			this. = bytesPerSec;
		}
		public void incrementAndThrottle(int bytes) {
			if( < 1) { // no throttle at all
				return;
			}
			long currentTime = System.currentTimeMillis();
			long timeDiff = currentTime - ;
			if(timeDiff == 0) {
				timeDiff = 1;
			}
			double bytesPerSec = (bytes / (doubletimeDiff) * 1000;
			if(bytesPerSec > this.) {
				// Throttle
				double exceededByFactorOf = bytesPerSec / this.;
				try {
					long mustSleep = (long) ((exceededByFactorOf - 1) * timeDiff);
					Thread.sleep(mustSleep);
catch(InterruptedException e) {
				}
			}
		}
	}
	/*
	 * Fetch a file that is in a S3 file system. Return a local File. It accepts "s3://" and "s3n://" prefixes.
	 */
	private File s3Fetch(String fileUrlthrows IOException {
		URI uri;
		try {
			uri = new URI(fileUrl);
catch(URISyntaxException e1) {
			throw new RuntimeException("Bad URI passed to s3Fetch()! " + fileUrl);
		}
		String bucketName = uri.getHost();
		String path = uri.getPath();
		File destFolder = new File(bucketName + "/" + path);
		if(destFolder.exists()) {
			FileUtils.deleteDirectory(destFolder);
		}
		destFolder.mkdirs();
		Throttler throttler = new Throttler((double);
		boolean done = false;
		try {
				throw new IOException("Bucket doesn't exist or is already claimed: " + bucketName);
			}
			if(path.startsWith("/")) {
				path = path.substring(1, path.length());
			}
			for(S3Object object : .listObjects(new S3Bucket(bucketName), path"")) {
				String fileName = path;
				if(path.contains("/")) {
					fileName = path.substring(path.lastIndexOf("/") + 1, path.length());
				}
				File fileDest = new File(destFolderfileName);
				.info("Downloading " + object.getKey() + " to " + fileDest + " ...");
				if(fileDest.exists()) {
					fileDest.delete();
				}
				object = .getObject(new S3Bucket(bucketName), object.getKey());
				FileOutputStream writer = new FileOutputStream(fileDest);
				byte[] buffer = new byte[];
				int nRead;
				while((nRead = iS.read(buffer, 0, buffer.length)) != -1) {
					writer.write(buffer, 0, nRead);
					throttler.incrementAndThrottle(nRead);
				}
				writer.close();
				iS.close();
				done = true;
			}
			if(!done) {
				throw new IOException("Bucket is empty! " + bucketName + " path: " + path);
			}
catch(S3ServiceException e) {
			throw new IOException(e);
		}
		return destFolder;
	}
	/*
	 * Fetch a file that is in a local file system. Return a local File.
	 */
	private File fileFetch(URI urithrows IOException {
		File file = new File(uri);
		File toDir = new File(file.getParent() + "/" + file.getName());
		if(toDir.exists()) {
			FileUtils.deleteDirectory(toDir);
		}
		toDir.mkdirs();
		.info("Copying " + file + " to " + toDir);
		copyFile(filenew File(toDirfile.getName()));
		return toDir;
	}
	private void copyFile(File sourceFileFile destFilethrows IOException {
		if(!destFile.exists()) {
			destFile.createNewFile();
		}
		FileChannel source = null;
		FileChannel destination = null;
		Throttler throttler = new Throttler((double);
		FileInputStream iS = null;
		FileOutputStream oS = null;
		try {
			iS = new FileInputStream(sourceFile);
			oS = new FileOutputStream(destFile); 
			source = iS.getChannel();
			destination = oS.getChannel();
			long count = 0;
			long size = source.size();
			int transferred = 0;
			while(count < size) {
				// Casting to int here is safe since we will transfer at most "downloadBufferSize" bytes.
				// This is done on purpose for being able to implement Throttling.
				transferred = (intdestination.transferFrom(sourcecount);
				count += transferred;
				throttler.incrementAndThrottle(transferred);
			}
finally {
			if(iS != null) {
				iS.close();
			}
			if(oS != null) {
				oS.close();
			}
			if(source != null) {
				source.close();
			}
			if(destination != null) {
				destination.close();
			}
		}
	}

This is the main method that accepts a URI string and delegates the fetching to the appropriate private method.
	public File fetch(String uriStrthrows IOExceptionURISyntaxException {
		URI uri = new URI(uriStr);
		if(uriStr.startsWith("file:")) {
			return fileFetch(uri);
else if(uriStr.startsWith("s3")) {
			if(uriStr.startsWith("s3n")) {
				return s3Fetch(uriStr);
else {
				return s3Fetch(uriStr);
			}
else if(uriStr.startsWith("hdfs")) {
			return hdfsFetch(uriStr);
else {
			throw new IllegalArgumentException("Scheme not recognized or non-absolute URI provided: " + uri);
		}
	}
New to GrepCode? Check out our FAQ X