Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package com.splout.db.qnode;
  
  /*
   * #%L
   * Splout SQL Server
   * %%
   * Copyright (C) 2012 Datasalt Systems S.L.
   * %%
   * This program is free software: you can redistribute it and/or modify
  * it under the terms of the GNU Affero General Public License as published by
  * the Free Software Foundation, either version 3 of the License, or
  * (at your option) any later version.
  * 
  * This program is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU General Public License for more details.
  * 
  * You should have received a copy of the GNU Affero General Public License
  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  * #L%
  */
 
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import  org.apache.thrift.transport.TTransportException;
 
The Deployer is a specialized module (com.splout.db.qnode.QNodeHandlerModule) of the com.splout.db.qnode.QNode that performs the business logic associated with a distributed deployment. It is used by the com.splout.db.qnode.QNodeHandler.
 
 @SuppressWarnings({ "unchecked""rawtypes" })
 public class Deployer extends QNodeHandlerModule {
 
 	private final static Log log = LogFactory.getLog(Deployer.class);
 
 	@SuppressWarnings("serial")
 	public static class UnexistingVersion extends Exception {
 
 		public UnexistingVersion() {
 			super();
 		}
 
 		public UnexistingVersion(String message) {
 			super(message);
 		}
 	}

Runnable that deals with the asynchronous part of the deployment. Particularly, it waits until DNodes finish their work, and then performs the version switch.
 
 	public class ManageDeploy implements Runnable {
 
 		// Number of seconds to wait until another
 		// check to see if timeout was reached or
 		// if a DNode failed.
 		private long secondsToCheckFailureOrTimeout = 60l;
 
 		private long version;
 		private List<Stringdnodes;
 		private long timeoutSeconds;
 		private boolean isReplicaBalancingEnabled;
 
 		public ManageDeploy(List<StringdnodesList<DeployRequestdeployRequestslong version,
 		    long timeoutSecondslong secondsToCheckFailureOrTimeoutboolean isReplicaBalancingEnabled) {
			this. = dnodes;
			this. = deployRequests;
			this. = version;
			this. = timeoutSeconds;
			this. = secondsToCheckFailureOrTimeout;
			this. = isReplicaBalancingEnabled;
		}
		public void run() {
			.info(.getConfig().getProperty(.) + " Executing deploy for version ["
			    +  + "]");
			try {
				long waitSeconds = 0;
				ICountDownLatch countDownLatchForDeploy = .getCoordinationStructures()
				boolean finished;
				do {
					finished = countDownLatchForDeploy.await(.);
					if(!finished) {
						// If any of the DNodes failed, then we cancel the deployment.
							return;
						}
						// Let's see if we reached the timeout.
						// Negative timeoutSeconds => waits forever
						if(waitSeconds >  &&  >= 0) {
							.warn("Deploy of version [" +  + "] timed out. Reached [" + waitSeconds
							    + "] seconds.");
							return;
						}
					}
while(!finished);
				.info("All DNodes performed the deploy of version [" + 
				    + "]. Publishing tablespaces...");
				// We finish by publishing the versions table with the new versions.
				try {
catch(UnexistingVersion e) {
					throw new RuntimeException(
					    "Unexisting version after deploying this version. Sounds like a bug."e);
				}
				.info("Deploy of version [" +  + "] Finished PROPERLY. :-)");
				// After a deploy we must synchronize tablespace versions to see if we have to remove some.
				// If some replicas are under-replicated, start a balancing process
catch(MemberLeftException e) {
				.error("Error while deploying version [" +  + "]"e);
				.error("Error while deploying version [" +  + "]"e);
catch(InterruptedException e) {
				.error("Error while deploying version [" +  + "]"e);
catch(Throwable t) {
				throw new RuntimeException(t);
finally {
			}
		}

Compose the list of switch actions to switch

Returns:
			}
			return actions;
		}

Log DNodes errors in deployment
		private void explainErrors() {
			    );
			String msg = "Deployment of version [" +  + "] failed in DNode[";
			for(Entry<StringStringentry : deployErrorPanel.entrySet()) {
				.error(msg + entry.getKey() + "] - it failed with the error [" + entry.getValue() + "]");
			}
		}

Return true if one or more of the DNodes reported an error.
		private boolean checkForFailure() {
			    );
				return !deployErrorPanel.isEmpty();
			}
			// If replica balancing is enabled we check whether we could survive after the failed DNodes
			Set<StringfailedDNodes = new HashSet<String>(deployErrorPanel.keySet());
			// Check if deploy needs to be canceled or if the system could auto-rebalance itself afterwards
			for(DeployRequest deployRequest : ) {
				for(ReplicationEntry repEntrydeployRequest.getReplicationMap()) {
					if(failedDNodes.containsAll(repEntry.getNodes())) {
						// There is AT LEAST one partition that depends on the failed DNodes so the deploy must fail!
						return true;
					}
				}
			}
			return false;
		}
/* End ManageDeploy */

The Deployer deals with deploy and switch version requests.
	public Deployer(QNodeHandlerContext context) {
		super(context);
	}

Call this method for starting an asynchronous deployment given a proper deploy request - proxy method for QNodeHandler. Returns a QueryStatus with the status of the request.

Throws:
InterruptedException
	public DeployInfo deploy(List<DeployRequestdeployRequeststhrows InterruptedException {
		// A new unique version number is generated.
		// Generate the list of actions per DNode
		Map<StringList<DeployAction>> actionsPerDNode = generateDeployActionsPerDNode(deployRequests,
		    version);
		// Starting the countdown latch.
		ICountDownLatch countDownLatchForDeploy = .getCoordinationStructures()
		    .getCountDownLatchForDeploy(version);
		Set<StringdnodesInvolved = actionsPerDNode.keySet();
		countDownLatchForDeploy.setCount(dnodesInvolved.size());
		// Sending deploy signals to each DNode
		for(Map.Entry<StringList<DeployAction>> actionPerDNode : actionsPerDNode.entrySet()) {
			DNodeService.Client client = null;
			boolean renew = false;
			try {
				try {
					client = .getDNodeClientFromPool(actionPerDNode.getKey());
catch(TTransportException e) {
					renew = true;
					throw e;
				}
				client.deploy(actionPerDNode.getValue(), version);
catch(Exception e) {
				.error("Error sending deploy actions to DNode [" + actionPerDNode.getKey() + "]"e);
				abortDeploy(new ArrayList<String>(actionsPerDNode.keySet()), version);
				DeployInfo errDeployInfo = new DeployInfo();
				errDeployInfo.setError("Error connecting to DNode " + actionPerDNode.getKey());
				return errDeployInfo;
finally {
				.returnDNodeClientToPool(actionPerDNode.getKey(), clientrenew);
			}
		}
		// Initiating an asynchronous process to manage the deployment
		.execute(new ManageDeploy(new ArrayList(actionsPerDNode.keySet()), deployRequests,
		DeployInfo deployInfo = new DeployInfo();
		deployInfo.setVersion(version);
		deployInfo.setStartedAt(SimpleDateFormat.getInstance().format(new Date()));
		return deployInfo;
	}

DNodes are informed to stop the deployment, as something failed.

Throws:
InterruptedException
	public void abortDeploy(List<Stringdnodeslong version) {
		for(String dnode : dnodes) {
			DNodeService.Client client = null;
			boolean renew = false;
			try {
				try {
catch(TTransportException e) {
					renew = true;
					throw e;
				}
				client.abortDeploy(version);
catch(Exception e) {
				.error("Error sending abort deploy flag to DNode [" + dnode + "]"e);
finally {
				if(client != null) {
					.returnDNodeClientToPool(dnodeclientrenew);
				}
			}
		}
	}

Switches current versions being served for some tablespaces, in an atomic way.
	public void switchVersions(List<SwitchVersionRequestswitchRequestthrows UnexistingVersion {
		// We compute the new versions table, and then try to update it
		// We use optimistic locking: we read the original
		// map and try to update it. If the original has changed during
		// this process, we retry: reload the original map, ...
		Map<StringLongversionsTable;
		Map<StringLongnewVersionsTable;
		do {
			newVersionsTable = new HashMap<StringLong>();
			if(versionsTable != null) {
				newVersionsTable.putAll(versionsTable);
			}
			for(SwitchVersionRequest req : switchRequest) {
				newVersionsTable.put(tsv.getTablespace(), tsv.getVersion());
			}
		    newVersionsTable));
	}

Generates the list of individual deploy actions that has to be sent to each DNode.
	    List<DeployRequestdeployRequestslong version) {
		long deployDate = System.currentTimeMillis(); // Here is where we decide the data of the deployment for all deployed
																									// tablespaces
		for(DeployRequest req : deployRequests) {
			for(Object obj : req.getReplicationMap()) {
				PartitionEntry pEntry = null;
				for(PartitionEntry partEntry : req.getPartitionMap()) {
					if(partEntry.getShard() == rEntry.getShard()) {
						pEntry = partEntry;
					}
				}
				if(pEntry == null) {
					throw new RuntimeException("No Partition metadata for shard: " + rEntry.getShard()
					    + " this is very likely to be a software bug.");
				}
				// Normalize DNode ids -> The convention is that DNodes are identified by host:port . So we need to strip the
				// protocol, if any
				for(int i = 0; i < rEntry.getNodes().size(); i++) {
					String dnodeId = rEntry.getNodes().get(i);
					if(dnodeId.startsWith("tcp://")) {
						dnodeId = dnodeId.substring("tcp://".length(), dnodeId.length());
					}
					rEntry.getNodes().set(idnodeId);
				}
				for(String dNode : rEntry.getNodes()) {
					List<DeployActionactionsSoFar = (List<DeployAction>) MapUtils.getObject(actionsdNode,
					    new ArrayList<DeployAction>());
					actions.put(dNodeactionsSoFar);
					DeployAction deployAction = new DeployAction();
					deployAction.setDataURI(req.getData_uri() + "/" + rEntry.getShard() + ".db");
					deployAction.setTablespace(req.getTablespace());
					deployAction.setVersion(version);
					deployAction.setPartition(rEntry.getShard());
					// Add partition metadata to the deploy action for DNodes to save it
					metadata.setMinKey(pEntry.getMin());
					metadata.setMaxKey(pEntry.getMax());
					metadata.setNReplicas(rEntry.getNodes().size());
					metadata.setDeploymentDate(deployDate);
					deployAction.setMetadata(metadata);
					actionsSoFar.add(deployAction);
				}
			}
		}
		return actions;
	}
New to GrepCode? Check out our FAQ X