Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
COOS - Connected Objects Operating System (www.connectedobjects.org). Copyright (C) 2009 Telenor ASA and Tellu AS. All rights reserved. DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. This library is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. You may also contact one of the following for additional information: Telenor ASA, Snaroyveien 30, N-1331 Fornebu, Norway (www.telenor.no) Tellu AS, Hagalokkveien 13, N-1383 Asker, Norway (www.tellu.no)
  
  package org.coos.messaging.impl;
  
  import java.util.Vector;
  

Author(s):
Knut Eilif Husa, Tellu AS
anders
  
  public abstract class DefaultEndpoint extends DefaultProcessor implements Endpoint {
  
  	private String uri;
  	private String endpointUuid;
  	private Plugin plugin;
  	private Vector aliases = new Vector();
  	private Hashtable outLinks = new Hashtable();
  	private Vector services = new Vector();
  	private Hashtable exchanges = new Hashtable();
  	private Hashtable callbacks = new Hashtable();
  	private UuidGenerator uuidGenerator = new UuidGenerator();
  	private int maxPoolSize = ;
  	protected ExecutorService threadPool;
  	private long timeout;
  	protected Vector deferQueue = new Vector();
  	protected Log log = LogFactory.getLog(this.getClass().getName());
  
  
  	private String securityToken;
  	private String endpointState = ;
  	private Hashtable childStates = new Hashtable();
  
  	protected DefaultEndpoint() {
  	}
  
  	public DefaultEndpoint(String uriProcessor processor) {
  		// This is for test purposes since this constructor only is called in
  		// tests
  		this. = uri;
  		.put("coos"processor);
  		 = Executors.newThreadPool(2);
  		 = new UuidGenerator("xId-" + ((getName() == null) ? getEndpointUuid() : getName()));
  
  	}
 
 	public void setName(String name) {
 		if (name != null && !UuidHelper.isUuid(name)) {
 		}
 		super.setName(name);
 	}
 
 	public Consumer createConsumer() {
 		// TODO Auto-generated method stub
 		return null;
 	}
 
 	public Producer createProducer() {
 		// TODO Auto-generated method stub
 		return null;
 	}
 
 	public String getEndpointUuid() {
 		return ;
 	}
 
 	public void setEndpointUuid(String endpointUuid) {
 		this. = endpointUuid;
 		.putMDC("UUID"endpointUuid);
 	}
 
 	public String getEndpointState() {
 		return ;
 	}
 
 	public void setEndpointState(String endpointState) {
 		this. = endpointState;
 		if (isStateRunning()) {
 			for (int i = 0; i < .size(); i++) {
 				Message message = (Message.elementAt(i);
 				processMessage(message);
 			}
 		}
 	}
 
 	public void setChildEndpointState(String childNameString state) {
 		.put(childNamestate);
 	}
 
 	public String getChildEndpointState(String childName) {
 		return (String.get(childName);
 	}
 
 	public void setProperties(Hashtable properties) {
 		this. = properties;
 		String timoutStr = (Stringproperties.get();
 		if (timoutStr != null) {
 			 = Long.parseLong(timoutStr);
 		} else {
 		}
 
 		String maxPoolSizeStr = (Stringproperties.get();
 		if (maxPoolSizeStr != null) {
 			 = Integer.parseInt(maxPoolSizeStr);
 		} else {
 		}
 	}
 
 	public String getSecurityToken() {
 		return ;
 	}
 
 	public Exchange createExchange() {
 		return exchange;
 	}
 
 	public Exchange createExchange(ExchangePattern pattern) {
 		Exchange exchange = new DefaultExchange(pattern);		
 		return exchange;
 	}
 
 	public String getEndpointUri() {
 		return ;
 	}
 
 	public void setEndpointUri(String endpointUri) {
 		this. = endpointUri;
 		.putMDC("URI"endpointUri);
 	}
 
 	public String createUuid() {
 	}

Method that registers endpoint into lifecycle manager.

Throws:
org.coos.messaging.EndpointException if registration failes
 
 	private void registerLCM() throws EndpointException {
 		// registration to LCM
 
 		boolean failIfError = false;
 
 		if (lcmRegReq != null) {
 			if (lcmRegReq.equals("true")) {
 				failIfError = true;
 			} else
 				return// Skip registration
 		}
 		.info("Registering endpoint: " +  + " to LifeCycleManager.");
 		Message regMsg = LCMEdgeMessageFactory.createRegisterEndpointMessage(getChildStates(),
 		ex.setOutBoundMessage(regMsg);
 		ex = processExchange(ex);
 		if (ex.getFaultMessage() != null) {
 			if (failIfError)
 				throw new EndpointException("Registration to LifeCycleManager failed due to :"
 		} else {
 			Message reply = ex.getInBoundMessage();
 			String signalName = reply.getHeader(.);
 			if (signalName.equals(.)) {
 				if (failIfError)
 					throw new EndpointException("Registration to lifecycle manager failed");
 				// else
 				// log.
 			}
 		}
 	}
 
 	private void unRegisterLCM() throws EndpointException {
 		// unregistration to LCM
 		if (lcmRegReq == null || !lcmRegReq.equals("true")) {
 			return;
 		}
 		.info("UnRegistering endpoint: " +  + " from LifeCycleManager.");
 		Message regMsg = LCMEdgeMessageFactory.createUnregisterEndpointMessage();
 		ex.setOutBoundMessage(regMsg);
 		ex = processExchange(ex);
 		if (ex.getFaultMessage() != null) {
 			throw new EndpointException("Endpoint: " + getEndpointUri()
 					+ ", Unregistration to LifeCycleManager failed due to: "
 		}
 		Message reply = ex.getInBoundMessage();
 		String signalName = reply.getHeader(.);
 		if (signalName.equals(.)) {
 			throw new EndpointException("Endpoint: " + getEndpointUri()
 					+ ", Unregistration to lifecycle manager failed");
 		}
 
 	}

Method that reports state to LCM

Parameters:
childAddress
Throws:
org.coos.messaging.EndpointException
 
 	private void replyChildState(String childAddressthrows EndpointException {
 		.info("Reporting state: " +  + " to LifeCycleManager.");
 		Hashtable msgProps = new Hashtable();
 		ex.setOutBoundMessage(regMsg);
 		ex = processExchange(ex);
 		if (ex.getFaultMessage() != null) {
 			throw new EndpointException("Child State reply to LifeCycleManager failed due to :"
 		}
 		Message reply = ex.getInBoundMessage();
 		String signalName = reply.getHeader(.);
 		if (signalName.equals(.)) {
 			throw new EndpointException("Child State reply to lifecycle manager failed");
 		}
 	}

Method that reports state to LCM

 
 	private void replyState() throws EndpointException {
 		.info("Reporting state: " +  + " to LifeCycleManager.");
 		Hashtable msgProps = new Hashtable();
 		regMsg.setBody(msgProps);
 		ex.setOutBoundMessage(regMsg);
 		ex = processExchange(ex);
 		if (ex.getFaultMessage() != null) {
 			throw new EndpointException("State reply to LifeCycleManager failed due to :"
 		}
 		Message reply = ex.getInBoundMessage();
 		String signalName = reply.getHeader(.);
 		if (signalName.equals(.)) {
 			throw new EndpointException("State reply to lifecycle manager failed");
 		}
 	}
 
 	private void replyChildren() throws EndpointException {
 		.info("Reporting state: " +  + " to LifeCycleManager.");
 		Hashtable msgProps = new Hashtable();
 		regMsg.setBody(msgProps);
 		ex.setOutBoundMessage(regMsg);
 		ex = processExchange(ex);
 		if (ex.getFaultMessage() != null) {
 			throw new EndpointException("Children reply to LifeCycleManager failed due to :"
 		}
 		Message reply = ex.getInBoundMessage();
 		String signalName = reply.getHeader(.);
 		if (signalName.equals(.)) {
 			throw new EndpointException("Children reply to lifecycle manager failed");
 		}
 	}
 
 	public void reportState() throws EndpointException {
 		boolean failIfError = false;
 
 		if (lcmRegReq != null) {
 			if (lcmRegReq.equals("true")) {
 				failIfError = true;
 			} else
 				return// Skip registration
 		}
 
 		.info("Sending state: " + getEndpointState() + " to LifeCycleManager.");
 		Message regMsg = LCMEdgeMessageFactory.createSetStateMessage(getEndpointState());
 
 		ex.setOutBoundMessage(regMsg);
 		ex = processExchange(ex);
 		if (ex.getFaultMessage() != null) {
 			if (failIfError)
 				throw new EndpointException("Pushing state to Lifecycle Mangager failed :"
 		} else {
 			Message reply = ex.getInBoundMessage();
 			String signalName = reply.getHeader(.);
 			if (signalName.equals(.)) {
 				if (failIfError)
 					throw new EndpointException("Pushing state to Lifecycle Manager failed.");
 				// else
 				// log.
 			}
 		}
 	}
 
 	public void reportChildState(String childNamethrows EndpointException {
 		boolean failIfError = false;
 
 		if (lcmRegReq != null) {
 			if (lcmRegReq.equals("true")) {
 				failIfError = true;
 			} else
 				return// Skip registration
 		}
 		String childState = (StringgetChildStates().get(childName);
 
 		.info("Sending state: " + childState + " of child: " + childName + " to LifeCycleManager.");
 		Message regMsg = LCMEdgeMessageFactory.createRegisterEndpointChildMessage(childNamechildState);
 		ex.setOutBoundMessage(regMsg);
 		ex = processExchange(ex);
 		if (ex.getFaultMessage() != null) {
 			if (failIfError)
 				throw new EndpointException("Pushing child-state to Lifecycle Mangager failed :"
 		} else {
 			Message reply = ex.getInBoundMessage();
 			String signalName = reply.getHeader(.);
 			if (signalName.equals(.)) {
 				if (failIfError)
 					throw new EndpointException("Pushing child-state to Lifecycle Manager failed.");
 				// else
 				// log.
 			}
 		}
 	}

Method that logs the endpoint onto the Bus. The login required property must be set to true for the endpoint to login. Upon a successful login a security token will be issued

 
 	private void login() throws EndpointException {
 		// login
 		if (loginReq == null || !loginReq.equals("true")) {
 			return;
 		}
 		if (loginName == null) {
 			throw new EndpointException("No loginName defined");
 		}
 		if (password == null) {
 			throw new EndpointException("No password defined");
 		}
 		.info("Login endpoint: " +  + " with user: " + loginName + " to AccessControlManager.");
 
 		Message loginMsg = ACMessageFactory.createLoginMsg(loginNamepassword);
 
 		// Message loginMsg = new DefaultMessage(
 		// ACMessageConstants.AC_REQUEST_LOGIN);
 		// Hashtable msgProps = new Hashtable();
 		// msgProps.put(ACMessageConstants.AC_PROP_USERNAME, loginName);
 		// msgProps.put(ACMessageConstants.AC_PROP_PASSWORD, password);
 
 		// loginMsg.setBody(msgProps);
 		ex.setOutBoundMessage(loginMsg);
 		ex = processExchange(ex);
 
 		..println("finished processing exchange in login()");
 		if (ex.getFaultMessage() != null) {
 			throw new EndpointException("Login failed due to :" + ex.getFaultMessage().getHeader(.));
 		}
 		Message reply = ex.getInBoundMessage();
 		String signalName = reply.getHeader(.);
 			Hashtable msgProps = reply.getBodyAsProperties();
 			if ( == null) {
 				String errorMessage = (StringmsgProps.get(.);
 				throw new EndpointException("Login failed due to :" + errorMessage);
 			}
 		}
 
 	}

Sync processing of inBound messages

Parameters:
exchange
Returns:
 
 	public Exchange processExchange(Exchange exchange) {
 		.debug("Endpoint: " +  + ", Processing outgoing exchange: " + exchange);
 		Processor processor = prepareExchange(exchange);
 		if (exchange.getFaultMessage() != null) {
 			return exchange;
 		}
 		synchronized (exchange) {
 			try {
 				processor.processMessage(exchange.getOutBoundMessage());
 			} catch (ProcessorException e) {
 				Message fault = new DefaultMessage();
 				exchange.setFaultMessage(fault);
 				exchange.setException(e);
 				return exchange;
 			}
 
 			if (exchange.getPattern().equals(.)) {
 				try {
 					exchange.wait();
 					if (exchange.getInBoundMessage() == null && exchange.getFaultMessage() == null) {
 						// must fix here
 						..println("FAULT2");
 						Message fault = new DefaultMessage();
 						fault.setHeader(."Exchange timeout");
 						exchange.setFaultMessage(fault);
 					}
 				} catch (InterruptedException e) {
 				}
 			}
 		}
 		return exchange;
 	}

Prepares the exchange before it is processed. This includes checking that sender and receiver adresses are resolved

Parameters:
exchange the exchange
 
 	private Processor prepareExchange(Exchange exchange) {
 		try {
 
 			if (!isStateRunning()) {
 				throw new EndpointException("Endpoint not able to handle exchanges. Endpoint State is: "
 			}
 
 			String senderUri = exchange.getOutBoundMessage().getSenderEndpointUri();
 			URIHelper helper;
 
 			if (senderUri == null || senderUri.equals("")) {
 				helper = new URIHelper();
 			} else {
 				helper = new URIHelper(senderUri);
 			}
 
 						
 			if(exchange.getExchangeId() == null){
 			}
 			
 			if (exchange.getPattern().equals(.)) {
 				.put(exchange.getExchangeId(), exchange);
 			}
 			
 			if(exchange.getOutBoundMessage().getHeader(.) == null){
 				} else {
 				}
 			}
 						
 			if (exchange.getOutBoundMessage().getType() == null) {
 			}
 			
 
 			String protocol = URIProtocolHelper.getProtocol(uri);
 
 			Processor processor = resolveOutgoingProcessor(protocol);
 
 			if (protocol.equals("coos")) {
 
 				helper = new URIHelper(uri);
 						((getName() == null) ? getEndpointUuid() : getName()));
 
 				if ( != null) {
 				}
 			}
 
 			return processor;
 
 		} catch (Exception e) {
 		}
 
 		return null;
 
 	}
 
 	private boolean isStateRunning() {
 			return false;
 		return true;
 	}
 
 	protected Processor resolveOutgoingProcessor(String protocolthrows EndpointException {
 		Processor processor = (Processor.get(protocol);
 
 		if (processor == null) {
 			throw new EndpointException("No channel defined for protocol: " + protocol);
 		}
 		return processor;
 	}

Async processing of outbound messages

Parameters:
exchange
callback
 
 	public void processExchange(Exchange exchangeAsyncCallback callback) {
 		Processor processor = prepareExchange(exchange);
 		.debug("Endpoint: " +  + ", Processing outgoing exchange: " + exchange);
 		if (exchange.getFaultMessage() != null) {
 			callback.processExchange(exchange);
 			return;
 		}
 		.put(exchange.getExchangeId(), exchange);
 		.put(exchange.getExchangeId(), callback);
 		synchronized (exchange) {
 			try {
 				processor.processMessage(exchange.getOutBoundMessage());
 			} catch (ProcessorException e) {
 				Message fault = new DefaultMessage();
 
 			}
 		}
 	}

Processing of Inbound messages

Parameters:
msg
 
 	public void processMessage(Message msg) {
 		String msgType = msg.getHeader(.);
 		if (msgType.equals(.) || msgType.equals(.)) {
 				msg = new DefaultNotification(msg);
 			}
 			ExchangePattern xp = new ExchangePattern(xpattern);
 			final String xId = msg.getHeader(.);
 			final Message retMsg = msg;
 			try {
 				retMsg.getBody();
 			} catch (Exception e) {
 				retMsg.setHeader(."Deserialization error: " + e.getMessage());
 			}
 			Consumer consumer = createConsumer();
 			// Error handling
 				if (.containsKey(xId)) {
 
 						public void run() {
 							Exchange exchange = (Exchange.remove(xId);
 							.warn("Endpoint: " +  + ", Processing incoming exchange: " + exchange + ": fault :"
 									+ retMsg);
 							synchronized (exchange) {
 								exchange.setFaultMessage(retMsg);
 								if (.containsKey(xId)) {
 									// this is an async return
 									callback.processExchange(exchange);
 								} else {
 									// this is a sync return
 									exchange.notify();
 								}
 							}
 						}
 					});
 				} else {
 					.warn("Endpoint: " +  + ", Error message:" + retMsg);
 				}
 
 			} else {
 				// Message handling
 				Exchange exchange = null;
 				if (.containsKey(xId) && !xp.isOutBoundInitiated()) {
 					// this is a return or a message to same endpoint.
 
 						public void run() {
 							Exchange exchange2 = (Exchange.remove(xId);
 							.debug("Endpoint: " +  + ", Processing incoming exchange: " + exchange2);
 							synchronized (exchange2) {
 								exchange2.setInBoundMessage(retMsg);
 								if (.containsKey(xId)) {
 									// this is an async return
 									callback.processExchange(exchange2);
 								} else {
 									// this is a sync return
 									exchange2.notify();
 								}
 							}
 						}
 					});
 
 				} else if (consumer != null) {
 							public void run() {
 								try {
 								} catch (EndpointException e) {
 								}
 							}
 						});
 						return;
 					} else if (msg.getHeader(.).equals(
 						final String childAddress = (Stringmsg.getBodyAsProperties().get(
 							public void run() {
 								try {
 									replyChildState(childAddress);
 								} catch (EndpointException e) {
 								}
 							}
 						});
 						return;
 							public void run() {
 								try {
 								} catch (EndpointException e) {
 								}
 							}
 						});
 						return;
 					}
 
 					// this is an initiating request, will only be handled in
 					// state running, else it will be defered to
 					// the endpoint reaches state running
 					if (checkDefer(msg))
 						return;
 
 					if (xpattern.equals(.)) {
 					} else {
 						// default behaviour is inOnly
 					}
 
 					if (exchange != null) {
 						exchange.setInBoundMessage(msg);
 						exchange.setExchangeId(xId);
 						final Exchange exchange1 = exchange;
 						final Consumer consumer1 = consumer;
 							public void run() {
 								// Todo might insert e semaphore here to control
 								// concurrent access to consumer
 								.debug("Endpoint: " +  + ", Processing incoming exchange: " + exchange1);
 								consumer1.process(exchange1);
 							}
 						});
 					}
 				}
 			}
 		}
 	}
 
 	protected boolean checkDefer(Message msg) {
 		if (!isStateRunning()) {
 			return true;
 		}
 		return false;
 	}
 
 	public Vector getServices() {
 		return ;
 	}
 
 	public final void initializeEndpoint() {
 		 = new UuidGenerator("xId-" + ((getName() == null) ? getEndpointUuid() : getName()));
 
 			public void run() {
 					try {
 						start();
 					} catch (Exception e) {
 					}
 				}
 				// Let the hanging thread in pluginchannel connect return
 				synchronized (DefaultEndpoint.this) {
 				}
 
 			}
 		});
 	}
 
 	protected void preStart() throws EndpointException {
 		.info("Starting endpoint: " + );
 
 		// login to the accessControl Manager
 		login();
 
 		// if (producer instanceof Service && !(producer instanceof Endpoint)) {
 		// services.addElement(producer);
 		// ((Service) producer).start();
 		// }
 	}
 
 	public void start() throws Exception {
 		// Override this method
 	}
 
 	protected void postStart() throws EndpointException {
 
 		// register to the life cycle manager
 
 		.info("Successfully started endpoint: " + );
 	}
 
 	public void stop() throws Exception {
 		// Override this method
 	}
 
 	public final void shutDownEndpoint() {
 
 		if (!isStateRunning()) {
 			return;
 		}
 		.info("Stopping endpoint: " + );
 
 		try {
 			stop();
 		} catch (Exception e) {
 		}
 
 		for (int i = 0; i < .size(); i++) {
 			Service service = (Service.elementAt(i);
 			try {
 				service.stop();
 			} catch (Exception e) {
 			}
 		}
 
 		// register to the life cycle manager
 		try {
 		} catch (EndpointException e) {
 		}
 
 		if ( != null// Null if endpoint was not started successfully
 			// (ie couldn't connect)
 
 	}
 
 
 	public void addLink(String protocolLink linkthrows ConnectingException {
 		.put(protocollink);
 
 	}
 
 	public void addAlias(String alias) {
 		if (alias != null && !UuidHelper.isUuid(alias)) {
 		}
 	}
 
 	public void removeAlias(String alias) {
 	}
 
 	public Vector getAliases() {
 		return ;
 	}
 
 	private void updateAliases() {
 		Link link = (Link.get("coos");
 		if (link != null) {
 			Message msg = new DefaultMessage("alias".);
 			msg.setReceiverEndpointUri("coos://" + link.getDestinationUuid());
 		}
 	}
 
 		return this;
 	}
 
 	public Link getLink(String id) {
 		return (Link.get(id);
 	}
 
 	public void removeLink(String id) {
 
 	}
 
 	public void removeLinkById(String linkId) {
 		// TODO Auto-generated method stub
 
 	}
 
 	public void subscribe(SubscriptionFilter filter) {
 		.debug("Endpoint: " +  + " subscribing: " + filter);
 		ex.setOutBoundMessage(filter);
 	}
 
 	public void unsubscribe() {
 		.debug("Endpoint: " +  + " unSubscribing all");
 	}
 
 	public void unsubscribe(SubscriptionFilter filter) {
 		.debug("Endpoint: " +  + " unSubscribing: " + filter);
 		ex.setOutBoundMessage(filter);
 	}
 
 	public void publish(Notification notification) {
 		.debug("Endpoint: " +  + " publishing: " + notification);
 		notification.setSenderEndpointUri();
 		ex.setOutBoundMessage(notification);
 	}
 
 	public void setChildStates(Hashtable childStates) {
 		this. = childStates;
 	}
 
 	public Hashtable getChildStates() {
 		return ;
 	}
 
 	public Hashtable getProperties() {
 		return ;
 	}
 
 	public void setLinkAliases(Vector regAliasesLink outlink) {
 		// Not implemented
 	}
 
 	public Plugin getPlugin() {
 		return ;
 	}
 
 	public void setPlugin(Plugin plugin) {
 		this. = plugin;
 	}