Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package com.splout.db.dnode;
  
  /*
   * #%L
   * Splout SQL Server
   * %%
   * Copyright (C) 2012 Datasalt Systems S.L.
   * %%
   * This program is free software: you can redistribute it and/or modify
  * it under the terms of the GNU Affero General Public License as published by
  * the Free Software Foundation, either version 3 of the License, or
  * (at your option) any later version.
  * 
  * This program is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU General Public License for more details.
  * 
  * You should have received a copy of the GNU Affero General Public License
  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  * #L%
  */
 
 import java.io.File;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 
 
The business logic for the DNode: responding to queries, downloading new deployments, handling ZooKeeper events and so forth.
 
 public class DNodeHandler implements IDNodeHandler {
 
 	private final static Log log = LogFactory.getLog(DNodeHandler.class);
 
 	private HazelcastInstance hz;
 
	// The {@link Fetcher} is the responsible for downloading new deployment data.
	protected Object deployLock = new Object();
	// This flag is needed for unit testing.
	// Indicates that the last deploy failed because of timeout. This info can then be answered via a status() request.
	// Thrift exception code used in DNodeException
	public final static int EXCEPTION_ORDINARY = 0;
	public final static int EXCEPTION_UNEXPECTED = 1;
	// A hard limit on the number of results that this DNode can return per SQL query
	private int maxResultsPerQuery;
	// The following variables are used for monitoring and providing statistics:
	private String lastException = null;
	private long lastExceptionTime;
	private long upSince;
	// The {@link Fetcher} is the responsible for downloading new deployment data.
	private Fetcher fetcher;
	// Above this query time the query will be logged as slow query
	private long absoluteSlowQueryLimit;
	private long slowQueries = 0;
	// This thread will interrupt long-running queries
	// This map will hold all the current balance file transactions being done
	public DNodeHandler(Fetcher fetcher) {
		this. = fetcher;
	}
	public DNodeHandler() {
	}

Returns the address (host:port) of this DNode.
	public String whoAmI() {
	}
		return "http://" + .getString(.) + ":"
	}

This inner class will listen for additions to the balance actions map, so that if a balance action has to be taken and this DNode is the one who has the send the file, it will start doing so.
	private class BalanceActionItemListener implements
		public void entryAdded(EntryEvent<BalanceActionStringevent) {
			BalanceAction action = event.getKey();
			if(action.getOriginNode().equals(whoAmI())) {
				// I must do a balance action!
				File toSend = new File(getLocalStorageFolder(action.getTablespace(), action.getPartition(),
				    action.getVersion()), action.getPartition() + ".db");
				File metadataFile = getLocalMetadataFile(action.getTablespace(), action.getPartition(),
				    action.getVersion());
				// send both the .db and the .meta file -> when the other part has both files it will move them atomically...
				.send(action.getTablespace(), action.getPartition(), action.getVersion(), toSend,
				    action.getFinalNode(), false);
				.send(action.getTablespace(), action.getPartition(), action.getVersion(),
				    metadataFileaction.getFinalNode(), false);
			}
		}
		public void entryRemoved(EntryEvent<BalanceActionStringevent) {
			// usually we won't care - but the final DNode might have pro-actively removed this action
		}
		public void entryUpdated(EntryEvent<BalanceActionStringevent) {
		}
		public void entryEvicted(EntryEvent<BalanceActionStringevent) {
		}
	}

This inner class will perform the business logic associated with receiving files: what to do on failures, bad CRC, file received OK...
		public void onProgress(String tablespaceInteger partitionLong versionFile file,
		    long totalSizelong sizeDownloaded) {
			if(file.getName().endsWith(".db")) {
				getProgressFromLocalPanel(tablespacepartitionversion).progressBinaryFile(totalSize,
				    sizeDownloaded);
			}
		}
		public void onFileReceived(String tablespaceInteger partitionLong versionFile file) {
			BalanceFileReceivingProgress progress = getProgressFromLocalPanel(tablespacepartitionversion);
			if(file.getName().endsWith(".meta")) {
				progress.metaFileReceived(file);
else if(file.getName().endsWith(".db")) {
				progress.binaryFileReceived(file);
			}
			// this can be reached simultaneously by 2 different threads so we must synchronized it
			// (thread that downloaded the .meta file and thread that downloaded the .db file)
			synchronized(FileReceiverCallback.this) {
				if(progress.isReceivedMetaFile() && progress.isReceivedBinaryFile()) {
					// This assures that the move will only be done once
					if(new File(progress.getMetaFile()).exists() && new File(progress.getBinaryFile()).exists()) {
						// check if we already have the binary & meta -> then move partition
						// and then remove this action from the panel so that it's completed.
						try {
							// we need to remove existing files if they exist
							// they might be stalled from old deployments
							File meta = getLocalMetadataFile(tablespacepartitionversion);
							if(meta.exists()) {
								meta.delete();
							}
							FileUtils.moveFile(new File(progress.getMetaFile()), meta);
							File binaryToMove = new File(progress.getBinaryFile());
							File binary = new File(getLocalStorageFolder(tablespacepartitionversion),
							    binaryToMove.getName());
							if(binary.exists()) {
								binary.delete();
							}
							FileUtils.moveToDirectory(binaryToMove,
							    getLocalStorageFolder(tablespacepartitionversion), true);
							.info("Balance action successfully completed, received both .db and .meta files ("
							    + tablespace + ", " + partition + ", " + version + ")");
							// Publish new changes to HZ
catch(IOException e) {
finally {
							removeBalanceActionFromHZPanel(tablespacepartitionversion);
						}
					}
				}
			}
		}
		public void onBadCRC(String tablespaceInteger partitionLong versionFile file) {
			removeBalanceActionFromHZPanel(tablespacepartitionversion);
		}
		public void onError(Throwable tString tablespaceInteger partitionLong versionFile file) {
			removeBalanceActionFromHZPanel(tablespacepartitionversion);
		}
		// --- Helper methods --- //
Will remove the BalanceAction associated with this file receiving from HZ data structure.
		private synchronized void removeBalanceActionFromHZPanel(String tablespaceInteger partition,
		    Long version) {
			// first remove the local tracking of this action
			String lookupKey = tablespace + "_" + partition + "_" + version;
				// then remove from HZ
				BalanceAction actionToRemove = null;
				    .entrySet()) {
					BalanceAction action = actionEntry.getKey();
					if(action.getTablespace().equals(tablespace) && action.getPartition() == partition
					    && action.getVersion() == version && action.getFinalNode().equals(.address())) {
						actionToRemove = action;
					}
				}
				if(actionToRemove == null) {
					// no need to worry - another thread might have gone into this code already almost simultaneously
else {
					.info("Removed balance action [" + actionToRemove + "] from HZ panel.");
				}
			}
		}

Will obtain a bean to fill some progress in a local hashmap or create it and put it otherwise.
		    Integer partitionLong version) {
			String lookupKey = tablespace + "_" + partition + "_" + version;
			if(progress == null) {
				progress = new BalanceFileReceivingProgress(tablespacepartitionversion);
				.put(lookupKeyprogress);
			}
			return progress;
		}
	}

Initialization logic: initialize things, connect to ZooKeeper, create Thrift server, etc.

	public void init(SploutConfiguration configthrows Exception {
		this. = config;
		long evictionSeconds = config.getLong(.);
		int maxCachePools = config.getInt(.);
		// We create a Cache for holding SQL connection pools to different tablespace versions
		// http://stackoverflow.com/questions/2583429/how-to-differentiate-between-time-to-live-and-time-to-idle-in-ehcache
		 = new Cache("dbCache"maxCachePoolsfalsefalse.evictionSeconds);
		if( == null) {
			// The Fetcher in charge of downloading new deployments
			this. = new Fetcher(config);
		}
		// When a tablespace version is expired, the connection pool is closed by an expiration handler
		// The thread that will execute deployments asynchronously
		// A thread that will listen to file exchanges through HTTP
		// Connect with the cluster.
		 = Hazelcast.newHazelcastInstance(HazelcastConfigBuilder.build(config));
		// Add shutdown hook
		Runtime.getRuntime().addShutdownHook(new Thread() {
			public void run() {
				try {
					.info("Shutdown hook called - trying to gently stop DNodeHandler " + whoAmI() + " ...");
catch(Throwable e) {
					.error("Error in ShutdownHook"e);
				}
			}
		});
	}

Registers the dnode in the cluster. This gives green ligth to use it.
	public void giveGreenLigth() {
		    minutesToCheckRegisteroldestMembersLeading);
	}

Deletes the files and folders kept by the DNode for a particular tablespace and version.
	private void deleteLocalVersion(com.splout.db.thrift.TablespaceVersion versionthrows IOException {
		File tablespaceFolder = new File(dataFolderversion.getTablespace());
		File versionFolder = new File(tablespaceFolderversion.getVersion() + "");
		if(versionFolder.exists()) {
			File[] partitions = versionFolder.listFiles();
			if(partitions != null) {
				for(File partitionpartitions) {
					if(partition.isDirectory()) {
						// remove references to binary SQLite files in ECache
						// so that space in disk is immediately available
						String dbKey = version.getTablespace() + "_" + version.getVersion() + "_" + partition.getName();
						synchronized() {
							if(.get(dbKey) != null) {
								.info("-- Removing references from ECache: " + dbKey); 
							}
						}
					}
				}
			}
			FileUtils.deleteDirectory(versionFolder);
			.info("-- Successfully removed " + versionFolder);
else {
			// Could happen, nothing to worry
		}
	}

Thrift RPC method -> Given a tablespace and a version, execute the SQL query
	public String sqlQuery(String tablespacelong versionint partitionString query)
	    throws DNodeException {
		String t = Thread.currentThread().getName();
		// Because SQLiteConnection can only be closed by owner Thread, here we need to check if we 
		// have some pending connections to close...
		if(pendingClose != null && pendingClose.size() > 0) {
			synchronized(pendingClose) {
				Iterator<SQLiteConnectionit = pendingClose.iterator();
				while(it.hasNext()) {
					SQLiteConnection conn = it.next();
					.info("-- Closed a connection pending diposal: " + conn.getDatabaseFile());
					conn.dispose();
					it.remove();
				}
			}
		}
		try {
			try {
				// Look for the EHCache database pool cache
				String dbKey = tablespace + "_" + version + "_" + partition;
				Element dbPoolInCache = null;
				synchronized() {
					dbPoolInCache = .get(dbKey);
					if(dbPoolInCache == null) {
						File dbFolder = getLocalStorageFolder(tablespacepartitionversion);
						if(!dbFolder.exists()) {
							.warn("Asked for " + dbFolder + " but it doesn't exist!");
							throw new DNodeException("Requested tablespace (" + tablespace
							    + ") + version (" + version + ") is not available.");
						}
						// Currently using first ".db" file but in the future there might be some convention
						for(String file : dbFolder.list()) {
							if(file.endsWith(".db")) {
								// Create new EHCache item value with a {@link SQLite4JavaManager}
								File metadata = getLocalMetadataFile(tablespacepartitionversion);
								ThriftReader reader = new ThriftReader(metadata);
								PartitionMetadata partitionMetadata = (PartitionMetadatareader
								    .read(new PartitionMetadata());
								reader.close();
								SQLite4JavaManager manager = new SQLite4JavaManager(dbFolder + "/" + file,
								    partitionMetadata.getInitStatements());
								dbPoolInCache = new Element(dbKeymanager);
								.put(dbPoolInCache);
								break;
							}
						}
					}
				}
				if(dbPoolInCache != null) {
					// Query the {@link SQLite4JavaManager} and return
					String result = ((SQLite4JavaManagerdbPoolInCache.getObjectValue()).query(query,
					long time = .endQuery();
					.info("serving query [" + tablespace + "]" + " [" + version + "] [" + partition + "] ["
					    + query + "] time [" + time + "] OK.");
					// double prob = performanceTool.getHistogram().getLeftAccumulatedProbability(time);
					// if(prob > 0.95) {
					// // slow query!
					// log.warn("[SLOW QUERY] Query time over 95 percentil: [" + query + "] time [" + time + "]");
					// slowQueries++;
					// }
					if(time > ) {
						// slow query!
						.warn("[SLOW QUERY] Query time over absolute slow query time (" + 
						    + ") : [" + query + "] time [" + time + "]");
					}
					return result;
else {
					throw new DNodeException(
					    "Deployed folder doesn't contain a .db file - This shouldn't happen. This means there is a bug or inconsistency in the deploy process.");
				}
catch(Throwable e) {
			}
catch(DNodeException e) {
			.info("serving query [" + tablespace + "]" + " [" + version + "] [" + partition + "] [" + query
			    + "] FAILED [" + e.getMsg() + "]");
			throw e;
		}
	}
	private void abortDeploy(long versionString errorMessage) {
		panel.put(whoAmI(), errorMessage);
	}

Thrift RPC method -> Given a list of com.splout.db.thrift.DeployActions and a version identifying the deployment perform an asynchronous deploy.
	public String deploy(final List<DeployActiondeployActionsfinal long versionthrows DNodeException {
		try {
			synchronized() {
				/*
				 * Here we instantiate a Thread for waiting for the deploy so that we are able to implement deploy timeout... If
				 * the deploy takes too much then we cancel it. We achieve this by using Java asynchronous Future objects.
				 */
				Thread deployWait = new Thread() {
					public void run() {
						Future<?> future = .submit(new Runnable() {
							// This code is executed by the solely deploy thread, not the one who waits
							public void run() {
								try {
									.info("Starting deploy actions [" + deployActions + "]");
									long start = System.currentTimeMillis();
									for(DeployAction action : deployActions) {
										// 1- Store metadata
										File metadataFile = getLocalMetadataFile(action.getTablespace(),
										    action.getPartition(), version);
										if(!metadataFile.getParentFile().exists()) {
											metadataFile.getParentFile().mkdirs();
										}
										ThriftWriter writer = new ThriftWriter(metadataFile);
										writer.write(action.getMetadata());
										writer.close();
										// 2- Call the fetcher for fetching
										File fetchedContent = .fetch(action.getDataURI());
										// If we reach this point then the fetch has been OK
										File dbFolder = getLocalStorageFolder(action.getTablespace(), action.getPartition(),
										    version);
										if(dbFolder.exists()) { // If the new folder where we want to deploy already exists means it is
											                      // somehow
											                      // stalled from a previous failed deploy - it is ok to delete it
											FileUtils.deleteDirectory(dbFolder);
										}
										// 4- Perform a "mv" for finally making the data available
										FileUtils.moveDirectory(fetchedContentdbFolder);
									}
									// Publish new DNodeInfo in distributed registry.
									// This makes QNodes notice that a new version is available...
									// PartitionMap and ReplicationMap will be built incrementally as DNodes finish.
									long end = System.currentTimeMillis();
									.info("Local deploy actions [" + deployActions + "] successfully finished in "
									    + (end - start) + " ms.");
catch(Throwable t) {
									// In order to avoid stale deployments, we flag this deploy to be aborted
									.warn("Error deploying [" + deployActions + "] barrier + [" + version + "]"t);
									abortDeploy(version, ExceptionUtils.getStackTrace(t));
finally {
									// Decrement the countdown latch. On 0, deployer knows that the deploy
									// finished.
									countdown.countDown();
								}
							}
						});
						try {
							// This line makes the wait thread wait for the deploy as long as the configuration tells
							// If the timeout passes a TimeoutException is thrown
catch(InterruptedException e) {
							.warn("Interrupted exception waiting for local deploy to finish - killing deployment",
							    e);
							abortDeploy(version, ExceptionUtils.getStackTrace(e));
							future.cancel(true);
catch(ExecutionException e) {
							.warn("Execution exception waiting for local deploy to finish - killing deployment."e);
							abortDeploy(version, ExceptionUtils.getStackTrace(e));
catch(TimeoutException e) {
							.warn("Timeout waiting for local deploy to finish - killing deployment."e);
							abortDeploy(version,
							    "Timeout reached - " + .getInt(.)
							        + " seconds");
							future.cancel(true);
						}
					}
				};
				deployWait.start();
			}
			// Everything is asynchronous so this is quickly reached - it just means the process has started
			return JSONSerDe.ser(new DNodeStatusResponse("Ok. Deploy initiated"));
catch(Throwable t) {
		}
	}

Thrift RPC method -> Given a list of com.splout.db.thrift.RollbackActions, perform a synchronous rollback
	public String rollback(List<RollbackActionrollbackActionsString ignoreMethrows DNodeException {
		// The DNode doesn't need to do anything special for rolling back a version.
		// It can serve any version that is stored locally.
		try {
			return JSONSerDe.ser(new DNodeStatusResponse("Ok. Rollback order received."));
catch(JSONSerDeException e) {
		}
	}
	/*
	 * Any unexpected exception must be redirected to this method. In this way we can monitor it using state variables.
	 * The state variables will then be passed onto the appropriated bean in the status() RPC call.
	 */
	private void unexpectedException(Throwable t) {
		.error("Unexpected Exception"t);
	}

Returns an com.splout.db.dnode.beans.DNodeSystemStatus filled with the appropriated data.
	public String status() throws DNodeException {
		try {
			if( == null) {
				status.setSystemStatus("UP");
else {
				status.setSystemStatus("Last exception: " + );
			}
			if(folder.exists()) {
				status.setFreeSpaceInDisk(FileSystemUtils.freeSpaceKb(folder.toString()));
				status.setOccupiedSpaceInDisk(FileUtils.sizeOfDirectory(folder));
				Collection<Filefiles = FileUtils.listFilesAndDirs(folder.,
				status.setFiles(new ArrayList<String>(Lists.transform(Lists.newArrayList(files),
				    new Function<FileString>() {
					    @Override
					    public String apply(File file) {
						    return file.getAbsolutePath() + " (" + FileUtils.sizeOf(file) + " bytes)";
					    }
				    })));
				Collections.sort(status.getFiles());
else {
				status.setFreeSpaceInDisk(FileSystemUtils.freeSpaceKb("."));
				status.setFiles(new ArrayList<String>());
			}
			return JSONSerDe.ser(status);
catch(Throwable t) {
		}
	}
	protected File getLocalStorageFolder(String tablespaceint partitionlong version) {
		return getLocalStorageFolder(tablespacepartitionversion);
	}

Returns the folder where the DNode that uses the provided Configuration will store the binary data for this tablespace, version and partition.
	public static File getLocalStorageFolder(SploutConfiguration configString tablespaceint partition,
	    long version) {
		return new File(dataFolder + "/"
		    + getLocalStoragePartitionRelativePath(tablespacepartitionversion));
	}
	public static String getLocalStoragePartitionRelativePath(String tablespaceint partition,
	    long version) {
		return tablespace + "/" + version + "/" + partition;
	}
	protected File getLocalMetadataFile(String tablespaceint partitionlong version) {
		return getLocalMetadataFile(tablespacepartitionversion);
	}

Returns the file where the DNode that uses the provided Configuration will store the metadata for this tablespace, version and partition.
	public static File getLocalMetadataFile(SploutConfiguration configString tablespaceint partition,
	    long version) {
		return new File(dataFolder + "/" + getLocalMetadataFileRelativePath(tablespacepartitionversion));
	}
	public static String getLocalMetadataFileRelativePath(String tablespaceint partitionlong version) {
		return tablespace + "/" + version + "/" + partition + ".meta";
	}
	public boolean isDeployInProgress() {
		return .get() > 0;
	}

Properly dispose this DNode.
	public void stop() throws Exception {
	}
	public String abortDeploy(long versionthrows DNodeException {
		// For simplicity, current implementation cancels all queued deploys.
		// There can only be one deploy being handled at a time, but multiple may have been queued.
		try {
				synchronized() { // No new deploys to be handled until we cancel the current one
					// Note that it is not always guaranteed that threads will be properly shutdown...
						try {
							Thread.sleep(100);
catch(InterruptedException e) {
							.error("Deploy Thread interrupted - continuing anyway."e);
						}
					}
				}
				return JSONSerDe.ser(new DNodeStatusResponse("Ok. Deploy cancelled."));
else {
				return JSONSerDe.ser(new DNodeStatusResponse("No deploy in progress."));
			}
catch(JSONSerDeException e) {
		}
	}
	    throws DNodeException {
		for(com.splout.db.thrift.TablespaceVersion version : versions) {
			.info("Going to remove " + version + " as I have been told to do so.");
			try {
catch(Throwable e) {
			}
		}
		try {
			// Publish new DNodeInfo in distributed registry.
			// This makes QNodes notice that a new version is available...
			// PartitionMap and ReplicationMap will be built incrementally as DNodes finish.
			return JSONSerDe.ser(new DNodeStatusResponse("Ok. Delete old versions executed."));
catch(JSONSerDeException e) {
		}
	}
	// ----------------- TEST API ----------------- //
	/*
	 * This method is called by unit / integration tests in order to simulate failures and recoveries in DNodes and such.
	 */
	public String testCommand(String commandStrthrows DNodeException {
			throw new DNodeException("Can't handle test commands as "
			    + . + " is not set to true.");
		}
		TestCommands command = TestCommands.valueOf(commandStr);
		if(command == null) {
			throw new DNodeException("Unknown test command: " + commandStr);
		}
			// on-demand shutdown
			// This is a "soft-shutdown" so we can recover from it.
			// It is designed for unit and integration testing.
			.info("Received a shutdown by test API.");
else if(command.equals(.)) {
			// on-demand restart
			// This is a "soft-restart" after a "soft-shutdown".
			// It is designed for unit and integration testing.
			try {
				 = Hazelcast.newHazelcastInstance(HazelcastConfigBuilder.build());
				.info("Received a restart by test API.");
				.error("Error while trying to connect to Hazelcast"e);
			}
		}
		try {
			return JSONSerDe.ser(new DNodeStatusResponse("Ok. Test command " + commandStr
			    + " received properly."));
catch(JSONSerDeException e) {
		}
	}
	// --- Getters mainly for testing --- /
		return ;
	}
	}
New to GrepCode? Check out our FAQ X