Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2002-2014 the original author or authors.
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   * http://www.apache.org/licenses/LICENSE-2.0
   *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
 package org.springframework.messaging.tcp.reactor;
 
 
 
An implementation of org.springframework.messaging.tcp.TcpOperations based on the TCP client support of the Reactor project.

Author(s):
Rossen Stoyanchev
Since:
4.0
 
 public class ReactorTcpClient<P> implements TcpOperations<P> {
 
 	public static final Class<NettyTcpClientREACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class;
 
 
 	private final TcpClient<Message<P>, Message<P>> tcpClient;
 
 	private final Environment environment;


A constructor that creates a reactor.tcp.netty.NettyTcpClient with a reactor.event.dispatch.SynchronousDispatcher as a result of which network I/O is handled in Netty threads.

Also see the constructor accepting a pre-configured Reactor reactor.tcp.TcpClient.

Parameters:
host the host to connect to
port the port to connect to
codec the codec to use for encoding and decoding the TCP stream
 
 	public ReactorTcpClient(String hostint portCodec<BufferMessage<P>, Message<P>> codec) {
 
 		// Revisit in 1.1: is Environment still required w/ sync dispatcher?
 
 				.env(this.)
 				.codec(codec)
 				.connect(hostport)
 				.get();
 
 	}

A constructor with a pre-configured reactor.tcp.TcpClient.

NOTE: if the client is configured with a thread-creating dispatcher, you are responsible for shutting down the reactor.core.Environment instance with which the client is configured.

Parameters:
tcpClient the TcpClient to use
	public ReactorTcpClient(TcpClient<Message<P>, Message<P>> tcpClient) {
		Assert.notNull(tcpClient"'tcpClient' must not be null");
		this. = tcpClient;
		this. = null;
	}
	private static void checkReactorVersion() {
		Class<?> type = null;
		try {
			type = ReactorTcpClient.class.getClassLoader().loadClass("reactor.event.dispatch.BaseDispatcher");
			Assert.isTrue(Modifier.isPublic(type.getModifiers()),
					"Detected older version of reactor-tcp. Switch to 1.0.1.RELEASE or higher.");
		}
			// Ignore, must be 1.1+
		}
	}
	public ListenableFuture<Voidconnect(TcpConnectionHandler<P> connectionHandler) {
		Promise<TcpConnection<Message<P>, Message<P>>> promise = this..open();
		composeConnectionHandling(promiseconnectionHandler);
			protected Void adapt(TcpConnection<Message<P>, Message<P>> result) {
				return null;
			}
		};
	}
	public ListenableFuture<Voidconnect(final TcpConnectionHandler<P> connectionHandler,
			final ReconnectStrategy reconnectStrategy) {
		Assert.notNull(reconnectStrategy"ReconnectStrategy must not be null");
		Stream<TcpConnection<Message<P>, Message<P>>> stream =
				this..open(new Reconnect() {
					public Tuple2<InetSocketAddressLongreconnect(InetSocketAddress addressint attempt) {
						return Tuple.of(addressreconnectStrategy.getTimeToNextAttempt(attempt));
					}
				});
		composeConnectionHandling(streamconnectionHandler);
	}
			final TcpConnectionHandler<P> connectionHandler) {
		composable.when(Throwable.classnew Consumer<Throwable>() {
			public void accept(Throwable ex) {
				connectionHandler.afterConnectFailure(ex);
			}
		});
		composable.consume(new Consumer<TcpConnection<Message<P>, Message<P>>>() {
			public void accept(TcpConnection<Message<P>, Message<P>> connection) {
				connection.on().close(new Runnable() {
					public void run() {
						connectionHandler.afterConnectionClosed();
					}
				});
				connection.consume(new Consumer<Message<P>>() {
					public void accept(Message<P> message) {
						connectionHandler.handleMessage(message);
					}
				});
				connection.when(Throwable.classnew Consumer<Throwable>() {
					public void accept(Throwable t) {
						connectionHandler.handleFailure(t);
					}
				});
				connectionHandler.afterConnected(new ReactorTcpConnection<P>(connection));
			}
		});
	}
	private Promise<VoidtoPromise(Stream<TcpConnection<Message<P>, Message<P>>> stream) {
		final Deferred<Void,Promise<Void>> deferred = Promises.<Void>defer().get();
		stream.consume(SingleUseConsumer.once(new Consumer<TcpConnection<Message<P>, Message<P>>>() {
			public void accept(TcpConnection<Message<P>, Message<P>> conn) {
				deferred.accept((Voidnull);
			}
		}));
		stream.when(Throwable.class, SingleUseConsumer.once(new Consumer<Throwable>() {
			public void accept(Throwable throwable) {
				deferred.accept(throwable);
			}
		}));
		return deferred.compose();
	}
		try {
			Promise<Voidpromise = this..close();
				protected Boolean adapt(Void result) {
					return true;
				}
			};
		}
		finally {
		}
	}


A ConfigurationReader that enforces the use of a SynchronousDispatcher.

The reactor.core.configuration.PropertiesConfigurationReader used by default automatically creates other dispatchers with thread pools that are not needed.

	private static class SynchronousDispatcherConfigReader implements ConfigurationReader {
			return new ReactorConfiguration(Arrays.<DispatcherConfiguration>asList(), "sync"new Properties());
		}
	}
New to GrepCode? Check out our FAQ X