Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright 2002-2014 the original author or authors.
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *      http://www.apache.org/licenses/LICENSE-2.0
   *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
 package org.springframework.messaging.core;
 
 
 
A messaging template that resolves destinations names to org.springframework.messaging.MessageChannel's to send and receive messages from.

Author(s):
Mark Fisher
Rossen Stoyanchev
Since:
4.0
 
 		implements BeanFactoryAware {
 
 	private volatile long sendTimeout = -1;
 
 	private volatile long receiveTimeout = -1;
 
 	private volatile boolean throwExceptionOnLateReply = false;


Configure the timeout value to use for send operations.

Parameters:
sendTimeout the send timeout in milliseconds
 
 	public void setSendTimeout(long sendTimeout) {
 		this. = sendTimeout;
 	}

Return the configured send operation timeout value.
 
 	public long getSendTimeout() {
 		return this.;
 	}

Configure the timeout value to use for receive operations.

Parameters:
receiveTimeout the receive timeout in milliseconds
 
 	public void setReceiveTimeout(long receiveTimeout) {
 		this. = receiveTimeout;
 	}

Return the configured receive operation timeout value.
 
 	public long getReceiveTimeout() {
 		return this.;
 	}

Whether the thread sending a reply should have an exception raised if the receiving thread isn't going to receive the reply either because it timed out, or because it already received a reply, or because it got an exception while sending the request message.

The default value is false in which case only a WARN message is logged. If set to true a org.springframework.messaging.MessageDeliveryException is raised in addition to the log message.

Parameters:
throwExceptionOnLateReply whether to throw an exception or not
 
 	public void setThrowExceptionOnLateReply(boolean throwExceptionOnLateReply) {
 		this. = throwExceptionOnLateReply;
 	}
 
 	public void setBeanFactory(BeanFactory beanFactorythrows BeansException {
		super.setDestinationResolver(new BeanFactoryMessageChannelDestinationResolver(beanFactory));
	}
	protected final void doSend(MessageChannel channelMessage<?> message) {
		Assert.notNull(channel"'channel' is required");
		long timeout = this.;
		boolean sent = (timeout >= 0 ? channel.send(messagetimeout) : channel.send(message));
		if (!sent) {
			throw new MessageDeliveryException(message,
					"failed to send message to channel '" + channel + "' within timeout: " + timeout);
		}
	}
	protected final Message<?> doReceive(MessageChannel channel) {
		Assert.notNull(channel"'channel' is required");
		Assert.state(channel instanceof PollableChannel"A PollableChannel is required to receive messages");
		long timeout = this.;
		Message<?> message = (timeout >= 0 ?
				((PollableChannelchannel).receive(timeout) : ((PollableChannelchannel).receive());
		if (message == null && this..isTraceEnabled()) {
			this..trace("Failed to receive message from channel '" + channel + "' within timeout: " + timeout);
		}
		return message;
	}
	protected final Message<?> doSendAndReceive(MessageChannel channelMessage<?> requestMessage) {
		Assert.notNull(channel"'channel' is required");
		Object originalReplyChannelHeader = requestMessage.getHeaders().getReplyChannel();
		Object originalErrorChannelHeader = requestMessage.getHeaders().getErrorChannel();
		TemporaryReplyChannel tempReplyChannel = new TemporaryReplyChannel();
		requestMessage = MessageBuilder.fromMessage(requestMessage).setReplyChannel(tempReplyChannel).
				setErrorChannel(tempReplyChannel).build();
		try {
			doSend(channelrequestMessage);
		}
		catch (RuntimeException ex) {
			tempReplyChannel.setSendFailed(true);
			throw ex;
		}
		Message<?> replyMessage = this.doReceive(tempReplyChannel);
		if (replyMessage != null) {
			replyMessage = MessageBuilder.fromMessage(replyMessage)
					.setHeader(.originalReplyChannelHeader)
					.setHeader(.originalErrorChannelHeader)
		}
		return replyMessage;
	}


A temporary channel for receiving a single reply message.
	private class TemporaryReplyChannel implements PollableChannel {
		private final Log logger = LogFactory.getLog(TemporaryReplyChannel.class);
		private final CountDownLatch replyLatch = new CountDownLatch(1);
		private volatile Message<?> replyMessage;
		private volatile boolean hasReceived;
		private volatile boolean hasTimedOut;
		private volatile boolean hasSendFailed;
		public void setSendFailed(boolean hasSendError) {
			this. = hasSendError;
		}
		public Message<?> receive() {
			return this.receive(-1);
		}
		public Message<?> receive(long timeout) {
			try {
					this. = true;
				}
				else {
						this. = true;
					}
					else {
						this. = true;
					}
				}
			}
			catch (InterruptedException e) {
			}
			return this.;
		}
		public boolean send(Message<?> message) {
			return this.send(message, -1);
		}
		public boolean send(Message<?> messagelong timeout) {
			this. = message;
			boolean alreadyReceivedReply = this.;
			String errorDescription = null;
			if (this.) {
				errorDescription = "Reply message received but the receiving thread has exited due to a timeout";
			}
			else if (alreadyReceivedReply) {
				errorDescription = "Reply message received but the receiving thread has already received a reply";
			}
			else if (this.) {
				errorDescription = "Reply message received but the receiving thread has exited due to " +
						"an exception while sending the request message";
			}
			if (errorDescription != null) {
					.warn(errorDescription + ":" + message);
				}
					throw new MessageDeliveryException(messageerrorDescription);
				}
			}
			return true;
		}
	}
New to GrepCode? Check out our FAQ X