Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
   /*
    *  Copyright (c) xsocket.org, 2006 - 2008. All rights reserved.
    *
    *  This library is free software; you can redistribute it and/or
    *  modify it under the terms of the GNU Lesser General Public
    *  License as published by the Free Software Foundation; either
    *  version 2.1 of the License, or (at your option) any later version.
    *
    *  This library is distributed in the hope that it will be useful,
   *  but WITHOUT ANY WARRANTY; without even the implied warranty of
   *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
   *  Lesser General Public License for more details.
   *
   *  You should have received a copy of the GNU Lesser General Public
   *  License along with this library; if not, write to the Free Software
   *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
   *
   * Please refer to the LGPL license at: http://www.gnu.org/copyleft/lesser.txt
   * The latest copy of this software may be found on http://www.xsocket.org/
   */
  package org.xsocket.connection.multiplexed;
  
  import java.util.HashMap;
  import java.util.Map;
  import java.util.Set;
  import java.util.Timer;
  
  
Implementation of the IMultiplexedConnection

Author(s):
grro@xsocket.org
  
  public final class MultiplexedConnection implements IMultiplexedConnection {
  
  	private static final Logger LOG = Logger.getLogger(MultiplexedConnection.class.getName());
  
  	// timer
  	private static final Timer TIMER = new Timer("xPipelineTimer"true); 
  
  	private static final long MIN_WATCHDOG_PERIOD_MILLIS = 30 * 1000;
  	
  	// pipelines
  
  	
  	// underlying connection
  	private INonBlockingConnection connection = null;
  
  	
  	// multiplexer
  	private IMultiplexer multiplexer = null;
  	private final Object multiplexerWriteGuard = new Object();
  	private final Object multiplexerReadGuard = new Object();
  
  	// demultiplex result handler
  	
  	
  	// handler adapter
  	private PipelineHandlerAdapter handlerAdapter = PipelineHandlerAdapter.newInstance(null);



constructor. The org.xsocket.connection.multiplexed.multiplexer.SimpleMultiplexer class will be used to (de)multiplex the data

Parameters:
connection the underlying connection
Throws:
java.io.IOException if an exception occurs
 
 	public MultiplexedConnection(INonBlockingConnection connectionthrows IOException {
 		this(connectionnew SimpleMultiplexer());
 	}


constructor.

Parameters:
connection the underlying connection
multiplexer the multiplexer to use
Throws:
java.io.IOException if an exception occurs
 
 	public MultiplexedConnection(INonBlockingConnection connectionIMultiplexer multiplexerthrows IOException {
 		this(connection, PipelineHandlerAdapter.newInstance(null), multiplexer);
 	}


constructor.

Parameters:
connection the underlying connection
pipelineHandler the pipeline handler
multiplexer the multiplexer to use
Throws:
java.io.IOException if an exception occurs
 
 	public MultiplexedConnection(INonBlockingConnection connectionIHandler pipelineHandlerIMultiplexer multiplexerthrows IOException {
 		this(connection, PipelineHandlerAdapter.newInstance(pipelineHandler), multiplexer);
 	}


constructor. The org.xsocket.connection.multiplexed.multiplexer.SimpleMultiplexer class will be used to (de)multiplex the data

Parameters:
connection the underlying connection
pipelineHandler the pipeline handler
Throws:
java.io.IOException if an exception occurs
 
 	public MultiplexedConnection(INonBlockingConnection connectionIHandler pipelineHandlerthrows IOException {
 		this(connection, PipelineHandlerAdapter.newInstance(pipelineHandler), new SimpleMultiplexer());
 	}


internal constructor
 
 	MultiplexedConnection(INonBlockingConnection connectionPipelineHandlerAdapter handlerAdapterIMultiplexer multiplexerthrows IOException {
 		this. = connection;
 		this. = multiplexer;		
 		this. = handlerAdapter;
 	
 		connection.setAutoflush(false);
 	
 		connection.setHandler(new MultiplexedConnectionHandler(this));
 	}



 
 	public boolean isOpen() {
 		return .isOpen();
 	}


 
 	@SuppressWarnings("unchecked")
 	public void close() throws IOException {
 		
 		HashMap<StringNonBlockingPipelinecopy = null;
 		synchronized () {
 		}
 		
 		for (NonBlockingPipeline pipeline : copy.values()) {
 			pipeline.closeSilence();
 		}
 
 		try {
 		} catch (Exception e) { 
 				.fine("[" + getId() + "] error occured by closing connection " +e.toString());
 			}
 		}
 	}


 
 	public String getId() {
 		return .getId();
 	}


 
 	public void activateSecuredMode() throws IOException {
 		synchronized () {
 			for (NonBlockingPipeline pipline : .values()) {
 				pipline.flush();
 			}
 		}
 		
 	}


 
 	public boolean isSecure() {
 		return .isSecure();
 	}


 
 	public String getDefaultEncoding() {
 	}


 
 	public void setDefaultEncoding(String encoding) {
 	}


 
 	}


 
 	public int getLocalPort() {
 	}


 
 	}


 
 	public int getRemotePort() {
 	}


 
 	public Object getOption(String namethrows IOException {
 		return .getOption(name);
 	}


 
 	@SuppressWarnings("unchecked")
 	public Map<StringClassgetOptions() {
 	}


 
 	public void setOption(String nameObject valuethrows IOException {
 		.setOption(namevalue);
 	}


 
 	public void setConnectionTimeoutMillis(long timeoutMillis) {
 	}


 
 	public long getConnectionTimeoutMillis() {
 	}


 
 	public void setIdleTimeoutMillis(long timeoutMillis) {
 	}


 
 	public long getIdleTimeoutMillis() {
 	}


 
 	}


 
 	}


 
 	public void setAttachment(Object obj) {
 	}


 
 	public Object getAttachment() {
 	}


 
 	public String createPipeline() throws IOException {
 		String pipelineId = registerNewPipeline();
 
 		NonBlockingPipeline pipeline = new NonBlockingPipeline(pipelineId);
 		synchronized () {
 			.put(pipelineIdpipeline);
 		}
 		
 			.fine("[" + getId() + "] pipeline " + pipelineId + " created");
 		}
 		
 		return pipelineId;
 	}


 
 		String[] result = null;
 		
 		synchronized () {
 			Set<Stringids = .keySet();
 			result = ids.toArray(new String[ids.size()]);
 		}
 		
 		return result;
 	}
 	
 	
 	private void closePipeline(NonBlockingPipeline pipelinethrows IOException {
 		
 		if (pipeline == null) {
 				.fine("warning try to close a <null> pipeline");
 			}
 			return;
 		}
 
 		removePipeline(pipeline);
 	}
 
 	
 	private void removePipeline(NonBlockingPipeline pipeline) {
 		
 		NonBlockingPipeline pipe = null;
 		synchronized () {
 			pipe = .remove(pipeline.getId());
 		}
 		
 		if (pipe != null) {
 			pipeline.onClose();
 		
 				.fine("[" + getId() + "] pipeline " + pipeline.getId() + " destroyed");
 			}
 		}
 	}



 	
 		if (.isOpen()) {
 			synchronized () {
 				return .get(pipelineId);
 			}
 		} else {
 			throw new ClosedChannelException();
 		}
 	}


 
 		return new BlockingPipeline(getNonBlockingPipeline(pipelineId));
 	}
 	
 	
 	
 	private void onData() throws IOException {
 		synchronized () {
 		}
 	}
 	
 	
 	
 	private void onPipelineOpened(final String pipelineId) {
 			.fine("[" + getId() + "] pipeline " + pipelineId + " opened by peer");
 		}
 
 		NonBlockingPipeline pipeline = new NonBlockingPipeline(pipelineId);
 		synchronized () {
 			.put(pipelineIdpipeline);
 		}	
 	}
 	
 	
 	
 	
 	private void onPipelineClosed(final String pipelineId) {
 			.fine("[" + getId() + "] pipeline " + pipelineId + " closed by peer");
 		}
 
 		NonBlockingPipeline pipeline = null;
 		synchronized () {
 			pipeline = .get(pipelineId);
 		}
 
 		if (pipeline != null) {
 			removePipeline(pipeline);
 		}
 	}
 	
 
 	
 	
 	private void onPipelineData(final String pipelineIdByteBuffer[] data) {
 		NonBlockingPipeline pipeline = null;
 		synchronized () {
 			pipeline = .get(pipelineId);
 		}
 
 		if (pipeline != null) {
 				.fine("notifying pipeline data handler");
 			}
 			pipeline.onData(data);
 	
 		} else {
 				.fine("data received for non existing pipeline " + pipelineId);
 			}
 		}
 	}
 	
 	
 	
 		synchronized () {
 		}
 	}
 
 	
 	
 	private void deregisterPipeline(String pipelineIdthrows ClosedChannelExceptionIOException {
 		synchronized () {
 		}
 	}
 
 	private void sendPipelineData(String pipelineIdByteBuffer[] dataToWriteFlushMode flushModethrows ClosedChannelExceptionIOException {
 		if (dataToWrite == null) {
 			return;
 		}
 		
 		if (dataToWrite.length == 0) {
 			return;
 		}
 		
 		synchronized () {
 			.multiplex(pipelineIddataToWrite);
 		}
 	}
 	
 
 	
 	private static final class BlockingPipeline extends BlockingConnection implements IBlockingPipeline {
 		private INonBlockingPipeline delegee = null;
 		
 			super(delegee);
 			this. = delegee;
 		}
 		
 
 		}
 	}		
 	
 	
 	private final class DemultiplexResultHandler implements IDemultiplexResultHandler {
 		public void onPipelineOpend(String pipelineId) {
 		}
 		
 		public void onPipelineClosed(String pipelineId) {
 		}
 		
 		public void onPipelineData(String pipelineIdByteBuffer[] data) {
 			MultiplexedConnection.this.onPipelineData(pipelineIddata);			
 		}
 	}
 
 	
 	
 	
 	
 					
 		
 		private MultiplexedConnectionHandler(MultiplexedConnection multiplexedConnection) {
 			this. = multiplexedConnection;
 		}
 				
 
 			if (connection.available() > 0) {
 				onData(connection);
 			}
 			
 			return true;
 		}
 		
 			return true;
 		}
 		
 		
 		public boolean onDisconnect(INonBlockingConnection connectionthrows IOException {
 			return true;
 		}
 		
 		
 		public boolean onIdleTimeout(INonBlockingConnection connectionthrows IOException {
 			return true;
 		}
 		
 		
 		public boolean onConnectionTimeout(INonBlockingConnection connectionthrows IOException {
 			return true;
 		}
 	}
 	
 	
 	
 	
 	
 	
 	
 	
 	private final class NonBlockingPipeline extends AbstractNonBlockingStream implements INonBlockingPipeline {
 		
 		
 		// the pipeline id
 		private String pipelineId = null;
 
 		// close flag
 		private boolean isOpen = true;
 		
 		// suspend support
 		private boolean isSuspendRead = false;
 		private final ArrayList<ByteBuffersuspendBuffer = new ArrayList<ByteBuffer>();
 
 
 		
 		// timeouts 
 		private long idleTimeoutDateMillis = .;
 		private long lastReceivedMillis = System.currentTimeMillis();
 		
 		
 		private boolean idleTimeoutOccured = false;
 		private boolean connectionTimeoutOccured = false;
 	
 		
 		private WatchDogTask watchDogTask = null;
 		
 	
 		// handler
 		private final AtomicReference<IHandlerhandler = new AtomicReference<IHandler>(null);
 		
 		
 		NonBlockingPipeline(String pipelineIdPipelineHandlerAdapter handlerAdapter) {
 			this. = pipelineId;
 			
 			.set(handlerAdapter.getConnectionInstance());
 			onOpen();
 		}
 
 		
 		private void onOpen() {
 			try {
 			} catch (IOException ioe) {
 					.fine("error occured by performing onConnect call back on " + .get() + " " + ioe.toString());
 			}
 		}
 
 		
 
 		protected boolean isDataWriteable() {
 			try {
 				return (getNonBlockingPipeline() != null);
 			} catch (ClosedChannelException ce) {
 				return false;
 			}
 		}
 		
 		
 		protected boolean isMoreInputDataExpected() {
 			try {
 				return (getNonBlockingPipeline() != null);
 			} catch (ClosedChannelException ce) {
 				return false;
 			}
 		}
 		
 		
 		public boolean isOpen() {
 			if (!) {
 				return false;
 				
 			} else {
 				if (!isReadBufferEmpty()) {
 					return true;
 				}
 				
 				try {
 					return (getNonBlockingPipeline() != null);
 				} catch (ClosedChannelException ce) {
 					return false;
 				}
 			}
 		}
 		
 		
 		public void close() throws IOException {
 			super.close();
 
 			if () {
 			}
 			
 			 = false;			
 		}
 
 		
 		private void onClose() {
 			
 			try {
 			} catch (IOException ioe) {
 					.fine("error occured by performing onDisconnect call back on " + .get() + " " + ioe.toString());
 			}
 
 		}
 
 		
 		public void onData(ByteBuffer[] data) {
 			
 			if () {
 				for (ByteBuffer byteBuffer : data) {
 					.add(byteBuffer);	
 				}
 				return;
 			}
 			
 			int size = 0;
 			if (data != null) {
 				for (ByteBuffer byteBuffer : data) {
 					size += byteBuffer.remaining();
 				}
 			}
 				
 			if (size > 0) {
 				appendDataToReadBuffer(datasize);
 			}
 			
 			try {
 					.fine("notifying handler " + .get());
 				}
 			} catch (IOException ioe) {
 					.fine("error occured by calling onData for pipeline " + getId() + " " + ioe.toString());
 				}
 			}
 		}
 		
 		
 		
 		
 		private void onConnectionTimeout() {
 				try {
 				} catch (IOException ioe) {
 						.fine("error occured by performing onConnectionTimeout call back on " + .get() + " " + ioe.toString());
 				}
 				
 			} else {
 			}
 
 		}
 				
 		
 		private void onIdleTimeout() {
 				try {
 				} catch (IOException ioe) {
 						.fine("error occured by performing onIdleTimeout call back on " + .get() + " " + ioe.toString());
 				}
 				
 			} else {
 			}
 		}
 		
 	
 
 		
 		void closeSilence() {
 			try {
 				close();
 			} catch (IOException ioe) {
 					.fine("Error occured by closing connection " + getId() + " " + ioe.toString());
 				}
 			}
 		}
 		
 		
 		public String getId() {
 			return ;
 		}
 
 		
 		public Executor getWorkerpool() {
 		}
 		
 		public void setWorkerpool(Executor workerpool) {
 			.warning("setWorkerpool is not supported for a pipeline. perform this operation on the MultiplexedConnection");			
 		}
 		
 			return MultiplexedConnection.this;
 		}
 		
 		public void setMaxReadBufferThreshold(int size) {
 			throw new UnsupportedOperationException("setMaxReadBufferThreshold is not supported for a pipeline. perform this operation on the MultiplexedConnection");			
 		}
 		
 		public int getMaxReadBufferThreshold() {
 		}
 		
 		
 		public long getConnectionTimeoutMillis() {
 		}
 		
 		
 		public void setConnectionTimeoutMillis(long timeoutMillis) {
 			 = System.currentTimeMillis() + timeoutMillis;
 			
 			if ( != timeoutMillis) {
 				 = timeoutMillis;
 			}
 			
 		}
 		
 		
 		public long getIdleTimeoutMillis() {
 		}
 		
 		
 		public void setIdleTimeoutMillis(long timeoutMillis) {
 			 = System.currentTimeMillis() + timeoutMillis;
 			
 			if ( != timeoutMillis) {
 				 = timeoutMillis;
 			}
 			
 			 = false;
 		}
 		
 		
 		
 
 		
 		private synchronized void updateWatchdog(long connectionTimeoutMillislong idleTimeoutMillis) {
 			
 			long watchdogPeriod = connectionTimeoutMillis;
 			if (idleTimeoutMillis < watchdogPeriod) {
 				watchdogPeriod = idleTimeoutMillis
 			}
 
 			if (watchdogPeriod > 500) {
 				watchdogPeriod = watchdogPeriod / 5;
 			}
 			
 			if (watchdogPeriod > ) {
 				watchdogPeriod = ;
 			}
 			
 			
 	         = new WatchDogTask(this);
 	        .schedule(watchdogPeriodwatchdogPeriod);
 		}
 		
 		
 		private synchronized void terminateWatchDog() {
 	        if ( != null) {
 	            .cancel();
 	        }			
 		}
 
 		
 
 		private void checkTimeouts() {
 			long currentMillis = System.currentTimeMillis();
 					
 			if (getRemainingMillisToConnectionTimeout(currentMillis) <= 0) {
 			}
 					
 			if (getRemainingMillisToIdleTimeout(currentMillis) <= 0) {
 			}
 		}


 
 		}
 
 		
 		private long getRemainingMillisToConnectionTimeout(long currentMillis) {
 			return  - currentMillis;
 		}


 
 		}
 		
 		
 		private long getRemainingMillisToIdleTimeout(long currentMillis) {
 			long remaining =  - currentMillis;
 			
 			// time out received
 			if (remaining > 0) {
 				return remaining;	
 				
 			// ... yes 
 			} else {
 				
 				// ... but check if meantime data has been received! 
 				return ( + ) - currentMillis;
 			}	
 		}
 		
 		
 		
 		public void setWriteTransferRate(int bytesPerSecondthrows ClosedChannelExceptionIOException {
 			throw new UnsupportedOperationException("setWriteTransferRate is not supported for a pipeline. perform this operation on the MultiplexedConnection");			
 		}
 		
 		}
 		
 		public void setHandler(IHandler hdlthrows IOException {
 				.fine("[" + getId() + "] set handler " + hdl);
 			}
 			
 			.set(PipelineHandlerAdapter.newInstance(hdl));
 			
 			if (available() > 0) {
 				onData(null);
 			}
 		}
 		
 		public IHandler getHandler() {
 			IHandler hdl = .get();
 			if (hdl == null) {
 				return null;
 			} else {
 				return ((PipelineHandlerAdapterhdl).getHandler();
 			}
 		}
 		
 		
 		public void setOption(String nameObject valuethrows IOException {
 			.warning("set option is vaild for all pipelines. Better use <MultiplexedcCnnection>.serOptions(String, Object)");
 			.setOption(namevalue);
		public Object getOption(String namethrows IOException {
			return .getOption(name);
		@SuppressWarnings("unchecked")
		public Map<StringClassgetOptions() {
		public int getLocalPort() {
		public int getRemotePort() {
		public void suspendRead() throws IOException {
		public void suspendReceiving() throws IOException {
		public boolean isReadSuspended() {
		public boolean isReceivingSuspended() {
		@SuppressWarnings("unchecked")
		public void resumeReceiving() throws IOException {
			 = false;
			onData(data.toArray(new ByteBuffer[data.size()]));
		public void resumeRead() throws IOException {
		public int getPendingWriteDataSize() {
		public void activateSecuredMode() throws IOException {
			throw new UnsupportedOperationException("activateSecuredMode is not supported for a pipeline. perform this operation on the MultiplexedConnection");			
		public boolean isSecuredModeActivateable() {
		public boolean isSecure() {
			int chunkSize = (IntegergetOption();
			long transfered = 0;
			int read = 0;
			do {
				ByteBuffer transferBuffer = ByteBuffer.allocate(chunkSize);
				read = sourceChannel.read(transferBuffer);
				if (read > 0) { 
					if (transferBuffer.remaining() == 0) {
						transferBuffer.flip();
						write(transferBuffer);
else {
						transferBuffer.flip();
						write(transferBuffer.slice());
					transfered += read;
while (read > 0);
			return transfered;
			if (isAutoflush()) {
		public void flush() throws ClosedChannelExceptionIOException {
			ByteBuffer[] dataToWrite = drainWriteQueue();
			sendPipelineData(getId(), dataToWritegetFlushmode());
		}


		public String toString() {
			if (isOpen()) {
				return "id=" + getId() + ", remote=" + getRemoteAddress().getCanonicalHostName() + "(" + getRemoteAddress() + ":" + getRemotePort() + ")";
else {
				return "id=" + getId() + " (closed)";
	private static final class WatchDogTask extends TimerTask {
		public WatchDogTask(NonBlockingPipeline nonBlockingPipeline) {
		public void run() {
			if (nonBlockingPipeline == null)  {
				this.cancel();
else {
				nonBlockingPipeline.checkTimeouts();
		}