Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package com.splout.db.dnode;
  
  /*
   * #%L
   * Splout SQL Server
   * %%
   * Copyright (C) 2012 Datasalt Systems S.L.
   * %%
   * This program is free software: you can redistribute it and/or modify
  * it under the terms of the GNU Affero General Public License as published by
  * the Free Software Foundation, either version 3 of the License, or
  * (at your option) any later version.
  * 
  * This program is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  * GNU General Public License for more details.
  * 
  * You should have received a copy of the GNU Affero General Public License
  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  * #L%
  */
 
 
 import  org.apache.thrift.TException;
 import  org.apache.thrift.TProcessor;
 import  org.apache.thrift.protocol.TProtocol;
 import  org.apache.thrift.server.TServer;
 import  org.apache.thrift.transport.TServerTransport;
 import  org.apache.thrift.transport.TTransport;
 import  org.apache.thrift.transport.TTransportException;

TThreadPoolServer (approximately) patched with: https://issues.apache.org/jira/secure/attachment/12444333/THRIFT-692.patch.untested.txt
 
 public class CustomTThreadPoolServer extends TServer {
 
 	private static final Log LOGGER = LogFactory.getLog(CustomTThreadPoolServer.class.getName());
 
 	public static class Args extends AbstractServerArgs<Args> {
 		public int minWorkerThreads = 5;
 		public int maxWorkerThreads = .;
 		public int stopTimeoutVal = 60;
 
 		public Args(TServerTransport transport) {
 			super(transport);
 		}
 
 		public Args minWorkerThreads(int n) {
 			return this;
 		}
 
 		public Args maxWorkerThreads(int n) {
 			return this;
 		}
 	}
 
 	// Executor service for handling client connections
 
 	// Flag for stopping the server
 	private volatile boolean stopped_;
 
 	private final TimeUnit stopTimeoutUnit;
 
 	private final long stopTimeoutVal;
 
 	public CustomTThreadPoolServer(Args args) {
 		super(args);
 
 		    args.maxWorkerThreads);
 
 		 = args.stopTimeoutUnit;
 		 = args.stopTimeoutVal;
 
 		 = new ThreadPoolExecutor(args.minWorkerThreadsargs.maxWorkerThreads, 60,
 		    .executorQueue);
 	}
 
 	public void serve() {
 		try {
 			serverTransport_.listen();
 		} catch(TTransportException ttx) {
 			.error("Error occurred during listening."ttx);
 			return;
 		}
 
		 = false;
		while(!) {
			int failureCount = 0;
			try {
				TTransport client = serverTransport_.accept();
				WorkerProcess wp = new WorkerProcess(client);
				try {
					++failureCount;
					.warn("Execution rejected."ree);
					client.close();
				}
catch(TTransportException ttx) {
				if(!) {
					++failureCount;
					.warn("Transport error occurred during acceptance of message."ttx);
				}
catch(Error e) {
				if(!) {
					++failureCount;
					.warn("Uncaught error."e);
				}
			}
		}
		// Loop until awaitTermination finally does return without a interrupted
		// exception. If we don't do this, then we'll shut down prematurely. We want
		// to let the executorService clear it's task queue, closing client sockets
		// appropriately.
		long now = System.currentTimeMillis();
		while(timeoutMS >= 0) {
			try {
				break;
catch(InterruptedException ix) {
				long newnow = System.currentTimeMillis();
				timeoutMS -= (newnow - now);
				now = newnow;
			}
		}
		setServing(false);
	}
	public void stop() {
		 = true;
		serverTransport_.interrupt();
	}
	private class WorkerProcess implements Runnable {

Client that this services.
		private TTransport client_;

Default constructor.

Parameters:
client Transport to process
		private WorkerProcess(TTransport client) {
			 = client;
		}

Loops on processing a client forever
		public void run() {
			TProcessor processor = null;
			TTransport inputTransport = null;
			TTransport outputTransport = null;
			TProtocol inputProtocol = null;
			TProtocol outputProtocol = null;
			try {
				processor = processorFactory_.getProcessor();
				inputTransport = inputTransportFactory_.getTransport();
				outputTransport = outputTransportFactory_.getTransport();
				inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
				outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
				// we check stopped_ first to make sure we're not supposed to be shutting
				// down. this is necessary for graceful shutdown.
				while(! && processor.process(inputProtocoloutputProtocol)) {
				}
catch(TTransportException ttx) {
				// Assume the client died and continue silently
catch(TException tx) {
				.error("Thrift error occurred during processing of message."tx);
catch(Exception x) {
				.error("Error occurred during processing of message."x);
			}
			if(inputTransport != null) {
				inputTransport.close();
			}
			if(outputTransport != null) {
				outputTransport.close();
			}
		}
	}
New to GrepCode? Check out our FAQ X