Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
COOS - Connected Objects Operating System (www.connectedobjects.org). Copyright (C) 2009 Telenor ASA and Tellu AS. All rights reserved. DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. This library is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. You may also contact one of the following for additional information: Telenor ASA, Snaroyveien 30, N-1331 Fornebu, Norway (www.telenor.no) Tellu AS, Hagalokkveien 13, N-1383 Asker, Norway (www.tellu.no)
 
 package org.coos.messaging.impl;
 
 

Author(s):
Knut Eilif Husa, Tellu AS
anders
 
 public abstract class DefaultEndpoint extends DefaultProcessor implements Endpoint {
 
     private String uri;
     private String endpointUuid;
     private Plugin plugin;
     private Vector aliases = new Vector();
     private Hashtable outLinks = new Hashtable();
     private Vector services = new Vector();
     private Hashtable exchanges = new Hashtable();
     private Hashtable callbacks = new Hashtable();
     private UuidGenerator uuidGenerator = new UuidGenerator();
     private int maxPoolSize = ;
     protected ExecutorService threadPool;
     private long timeout;
     protected Vector deferQueue = new Vector();
     protected Log log = LogFactory.getLog(this.getClass().getName());
 
 
     private String endpointState = ;
     private Hashtable childStates = new Hashtable();
     protected boolean heartbeat;
     protected Timer timer;
 
     protected DefaultEndpoint() {
     }
 
     public DefaultEndpoint(String uriProcessor processor) {
         // This is for test purposes since this constructor only is called in
         // tests
         this. = uri;
         .put("coos"processor);
          = Executors.newThreadPool(2);
          = new UuidGenerator("xId-" + ((getName() == null) ? getEndpointUuid() : getName()));
    }
    public void setName(String name) {
        if (name != null && !UuidHelper.isUuid(name)) {
            .addElement(name);
        }
        super.setName(name);
    }
    public Consumer createConsumer() {
        // TODO Auto-generated method stub
        return null;
    }
    public Producer createProducer() {
        // TODO Auto-generated method stub
        return null;
    }
    public String getEndpointUuid() {
        return ;
    }
    public void setEndpointUuid(String endpointUuid) {
    	endpointUuid = UuidHelper.getQualifiedUuid(endpointUuid);
        this. = endpointUuid;
        .putMDC("UUID"endpointUuid);
    }
    public String getEndpointState() {
        return ;
    }
    public void setEndpointState(String endpointState) {
        this. = endpointState;
        if (isStateRunning()) {
            for (int i = 0; i < .size(); i++) {
                Message message = (Message.elementAt(i);
                processMessage(message);
            }
            .removeAllElements();
        }
    }
    public void setChildEndpointState(String childNameString state) {
        .put(childNamestate);
    }
    public String getChildEndpointState(String childName) {
        return (String.get(childName);
    }
    public void setProperties(Hashtable properties) {
        this. = properties;
        String timoutStr = (Stringproperties.get();
        if (timoutStr != null) {
             = Long.parseLong(timoutStr);
        } else {
             = ;
        }
    }
    public Exchange createExchange() {
        Exchange exchange = new DefaultExchange(new ExchangePattern(.));		
        return exchange;
    }
    public Exchange createExchange(ExchangePattern pattern) {
        Exchange exchange = new DefaultExchange(pattern);		
        return exchange;
    }
    public String getEndpointUri() {
        return ;
    }
    public void setEndpointUri(String endpointUri) {
        this. = endpointUri;
        .putMDC("URI"endpointUri);
    }
    public String createUuid() {
        return .generateId();
    }

    
Method that registers endpoint into lifecycle manager.

Throws:
org.coos.messaging.EndpointException if registration fails
    private void registerLCM() throws EndpointException {
        // registration to LCM
        boolean failIfError = false;
        String lcmRegReq = (String.get();
        if (lcmRegReq != null && lcmRegReq.equals("true")) {
            failIfError = true;
        }
        .info("Registering endpoint: " +  + " to LifeCycleManager.");
        String pollingInterval=getProperty();
        Exchange ex;
        if(pollingInterval==null)
            ex = LCMEdgeMessageFactory.createRegisterEndpointExchange(getChildStates(),);
        else
            ex = LCMEdgeMessageFactory.createRegisterEndpointExchange(getChildStates(),Long.parseLong(pollingInterval));
        InteractionHelper helper=new InteractionHelper(this);
        ex=helper.request(,ex);
        if (ex.getFaultMessage() != null) {
            if (failIfError)
                throw new EndpointException("Registration to LifeCycleManager failed due to :"
                        + ex.getFaultMessage().getHeader(.));
        } else {
            Message reply = ex.getInBoundMessage();
            if (reply==null || reply.getHeader(.).equals(.)) {
                if (failIfError)
                    throw new EndpointException("Registration to lifecycle manager failed");
                // else
                // log.
            }
        }
    }
    private void unRegisterLCM() throws EndpointException {
        // unregistration to LCM
        .info("UnRegistering endpoint: " +  + " from LifeCycleManager.");
        InteractionHelper helper=new InteractionHelper(this);
        Exchange ex= LCMEdgeMessageFactory.createUnregisterEndpointExchange();
        ex = helper.request(ex);
        if (ex.getFaultMessage() != null) {
            .info("Endpoint: " + getEndpointUri()
                    + ", Unregistration to LifeCycleManager failed due to: "
                    + ex.getFaultMessage().getHeader(.));
        }
        else {
            Message reply = ex.getInBoundMessage();
            if (reply==null || reply.getHeader(.).equals(.)) {
                .info("Endpoint: " + getEndpointUri()
                        + ", Unregistration to lifecycle manager failed");
            }
        }
    }
    private void reportChildren() throws EndpointException {
        .info("Reporting state: " +  + " to LifeCycleManager.");
        Exchange ex=LCMEdgeMessageFactory.createSetChildrenStatesExchange();
        InteractionHelper helper=new InteractionHelper(this);
        ex=helper.request(,ex);
        if (ex.getFaultMessage() != null) {
            throw new EndpointException("Children reply to LifeCycleManager failed due to :"
                    + ex.getFaultMessage().getHeader(.));
        }
        Message reply = ex.getInBoundMessage();
        String signalName = reply.getHeader(.);
        if (signalName.equals(.)) {
            throw new EndpointException("Children reply to lifecycle manager failed");
        }
    }
    public void reportState() throws EndpointException {
        boolean failIfError = false;
        String lcmRegReq = (String.get();
        if (lcmRegReq != null && lcmRegReq.equals("true")) {
            failIfError = true;
        }
        .info("Sending state: " + getEndpointState() + " to LifeCycleManager.");
        Exchange ex = LCMEdgeMessageFactory.createSetStateExchange(getEndpointState());
        InteractionHelper helper=new InteractionHelper(this);
        ex=helper.request(,ex);
        if (ex.getFaultMessage() != null) {
            if (failIfError)
                throw new EndpointException("Pushing state to Lifecycle Mangager failed :"
                        + ex.getFaultMessage().getHeader(.));
        } else {
            Message reply = ex.getInBoundMessage();
            String signalName = reply.getHeader(.);
            if (signalName.equals(.)) {
                if (failIfError)
                    throw new EndpointException("Pushing state to Lifecycle Manager failed.");
                // else
                // log.
            }
        }
    }
    public void reportChildState(String childNamethrows EndpointException {
        boolean failIfError = false;
        String lcmRegReq = (String.get();
        if (lcmRegReq != null && lcmRegReq.equals("true")) {
            failIfError = true;
        }
        String childState = (StringgetChildStates().get(childName);
        .info("Sending state: " + childState + " of child: " + childName + " to LifeCycleManager.");
        Exchange ex= LCMEdgeMessageFactory.createRegisterEndpointChildExchange(childNamechildState);
        InteractionHelper helper=new InteractionHelper(this);
        ex=helper.request(,ex);
        if (ex.getFaultMessage() != null) {
            if (failIfError)
                throw new EndpointException("Pushing child-state to Lifecycle Mangager failed :"
                        + ex.getFaultMessage().getHeader(.));
        } else {
            Message reply = ex.getInBoundMessage();
            String signalName = reply.getHeader(.);
            if (signalName.equals(.)) {
                if (failIfError)
                    throw new EndpointException("Pushing child-state to Lifecycle Manager failed.");
                // else
                // log.
            }
        }
    }



    
Sync processing of inBound messages

Parameters:
exchange
Returns:
    public Exchange processExchange(Exchange exchange) {
    	if(exchange.isProcessed()){
    		exchange.setException(new EndpointException("Exchange can not be reused. Is already processed"));
    		.warn("Endpoint: " +  + ", Exception processing exchange, already processed: " + exchange);
    		return exchange;
    	}
        Processor processor = prepareExchange(exchange);
        .debug("Endpoint: " +  + ", Processing outgoing exchange: " + exchange);
        if (exchange.getFaultMessage() != null) {
            return exchange;
        }
        synchronized (exchange) {
            try {
                processor.processMessage(exchange.getOutBoundMessage());
            } catch (ProcessorException e) {
                Message fault = new DefaultMessage();
                fault.setReceiverEndpointUri(getEndpointUri());
                fault.setHeader(..);
                fault.setHeader(.e.getMessage());
                fault.setHeader(.exchange.getExchangeId());
                exchange.setFaultMessage(fault);
                exchange.setException(e);
                .warn("Endpoint: " +  + ", Exception processing exchange: " + exchange);
                return exchange;
            }
            ExchangePattern xp = exchange.getPattern();
            if (xp.equals(.)||xp.equals(.)) {
                try {
                    exchange.wait();
                    if (!exchange.isProcessed()) {
                        .warn("Endpoint: " +  + ", exchange: "+exchange+" timed out ("++" ms).");
                        Message fault = new DefaultMessage();
                        fault.setReceiverEndpointUri(getEndpointUri());
                        fault.setHeader(..);
                        fault.setHeader(."Exchange timeout");
                        fault.setHeader(.exchange.getExchangeId());
                        exchange.setFaultMessage(fault);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        return exchange;
    }

    
Simple method to send a message to a receiver, using a set exchange pattern. Returns the inbound Message object from the used exchange.

Parameters:
msg
receiver - coos url of the receiver
exchangePattern
Returns:
    public Message sendMessage(Message msgString receiverString exchangePattern) {
        Exchange e = createExchange(new ExchangePattern(exchangePattern));
        msg.setReceiverEndpointUri(receiver);
        e.setOutBoundMessage(msg);
        Exchange response = processExchange(e);
        return response.getInBoundMessage();
    }

    
Prepares the exchange before it is processed. This includes checking that sender and receiver adresses are resolved

Parameters:
exchange the exchange
    private Processor prepareExchange(Exchange exchange) {
        try {
            //            if (!isStateRunning()&&!endpointState.equals(STATE_STARTING)) {
            //                throw new EndpointException("Endpoint not able to handle exchanges. Endpoint State is: "
            //                        + endpointState);
            //            }
            String senderUri = exchange.getOutBoundMessage().getSenderEndpointUri();
            URIHelper helper;
            if (senderUri == null || senderUri.equals("")) {
                helper = new URIHelper();
            } else {
                helper = new URIHelper(senderUri);
            }
            helper.setEndpoint();
            exchange.getOutBoundMessage().setSenderEndpointUri(helper.getEndpointUri());
            String uri = exchange.getOutBoundMessage().getReceiverEndpointUri();
            if(exchange.getExchangeId() == null){
                exchange.setExchangeId(.generateId());
            }
            ExchangePattern xPattern = exchange.getPattern();
            if (xPattern.equals(.) || xPattern.equals(.)) {
                .put(exchange.getExchangeId(), exchange);
            }
            if(exchange.getPattern().equals(.)){
                //If this is a reply we search through the message for header values that are to be passed
                //along with the reply
                exchange.getOutBoundMessage().setHeader(.exchange.getInBoundMessage().getHeader(.));
                Message outMsg = exchange.getOutBoundMessage();
                Message inMsg = exchange.getInBoundMessage();
                Enumeration keys = inMsg.getHeaders().keys();
                while (keys.hasMoreElements()) {
                    String key = (Stringkeys.nextElement();
                    if(key.startsWith(.)){
                        outMsg.setHeader(keyexchange.getInBoundMessage().getHeader(key));
                    }
                }
            } else {
                exchange.getOutBoundMessage().setHeader(.exchange.getExchangeId());
            }
            exchange.getOutBoundMessage().setHeader(.exchange.getPattern().toString());
            if (exchange.getOutBoundMessage().getType() == null) {
                exchange.getOutBoundMessage().setHeader(..);
            }
            if(xPattern.equals(.)){
                long rdTimeout = /2;
                exchange.getOutBoundMessage().setHeader(., String.valueOf(rdTimeout));
                exchange.getOutBoundMessage().setHeader(.helper.getEndpointUri());
            }
            String protocol = URIProtocolHelper.getProtocol(uri);
            Processor processor = resolveOutgoingProcessor(protocol);
            if (protocol.startsWith("coos")) {
                helper = new URIHelper(uri);
                exchange.getOutBoundMessage().setReceiverEndpointName(helper.getEndpoint());
                exchange.getOutBoundMessage().setSenderEndpointName(
                        ((getName() == null) ? getEndpointUuid() : getName()));
            }
            return processor;
        } catch (Exception e) {
            e.printStackTrace();
					."Exception: "
e.getClass().getName() 
", Message: "e.getMessage()));
            exchange.setException(e);
        }
        return null;
    }
    private boolean isStateRunning() {
        if (.equals()) {
            return true;
        }
                || .equals() || .equals()
                || .equals())
            return false;
        return true;
    }
    protected Processor resolveOutgoingProcessor(String protocolthrows EndpointException {
        Processor processor = (Processor.get(protocol);
        if (processor == null) {
            throw new EndpointException("No channel defined for protocol: " + protocol);
        }
        return processor;
    }

    
Async processing of outbound messages

Parameters:
exchange
callback
    public void processExchange(Exchange exchangeAsyncCallback callback) {
        .debug("Endpoint: " +  + ", Processing outgoing exchange: " + exchange);
        Processor processor = prepareExchange(exchange);
        if (exchange.getFaultMessage() != null) {
            callback.processExchange(exchange);
            return;
        }
        .put(exchange.getExchangeId(), exchange);
        .put(exchange.getExchangeId(), callback);
        synchronized (exchange) {
            try {
                processor.processMessage(exchange.getOutBoundMessage());
            } catch (ProcessorException e) {
                Message fault = new DefaultMessage();
                fault.setHeader(..);
                fault.setHeader(.e.getMessage());
                fault.setHeader(.exchange.getExchangeId());
                fault.setHeader(.exchange.getPattern().toString());
                processMessage(fault);
            }
        }
    }

    
Processing of Inbound messages

Parameters:
msg
    public void processMessage(Message msg) {
        //if(coContainer != null) 
        String msgType = msg.getHeader(.);
        if (msgType.equals(.) || msgType.equals(.)) {
            if (msg.getName().equals(.)) {
                msg = new DefaultNotification(msg);
            }
            String xpattern = msg.getHeader(.);
            ExchangePattern xp = new ExchangePattern(xpattern);
            final String xId = msg.getHeader(.);
            final Message retMsg = msg;
            try {
                retMsg.getBody();
            } catch (Exception e) {
                retMsg.setHeader(..);
                retMsg.setHeader(."Deserialization error: " + e.getMessage());
            }
            Consumer consumer = createConsumer();
            // Error handling
            if (msg.getHeader(.).equals(.)) {
                if (.containsKey(xId)) {
                    final Exchange exchange = (Exchange.remove(xId);
                    .warn("Endpoint: " + 
                            + ", Processing incoming exchange: " + exchange
                            + ": fault :" + retMsg);
                    exchange.setFaultMessage(retMsg);
                    exchange.setProcessed(true);
                    if (.containsKey(xId)) {
                        .execute(new Runnable() {
                            public void run() {
                                // this is an async return
                                AsyncCallback callback = (AsyncCallback
                                .remove(xId);
                                callback.processExchange(exchange);
                            }
                        });
                    } else {
                        // this is a sync return
                        // todo when moved to java 1.6 can check if current thread holds lock
                        // New thread is only necessary in that case
                        new Thread(new Runnable() {
                            public void run() {
                                synchronized (exchange) {
                                    exchange.notifyAll();
                                }
                            }
                        }).start();
                    }
                } else {
                    .warn("Endpoint: " +  + ", Error message:" + retMsg);
                }
            } else {
                // Message handling
                Exchange exchange = null;
                if (.containsKey(xId) && !xp.isOutBoundInitiated()) {
                    // this is a return or a message to same endpoint.
                    final Exchange exchange2 = (Exchange.remove(xId);
                    .debug("Endpoint: " + 
                            + ", Processing incoming exchange: " + exchange2);
                    if(xp.equals(.)){
                        exchange2.setInBoundMessage(retMsg);
                    }
                    exchange2.setProcessed(true);
                    if (.containsKey(xId)) {
                        .execute(new Runnable() {
                            public void run() {
                                // this is an async return
                                AsyncCallback callback = (AsyncCallback
                                .remove(xId);
                                callback.processExchange(exchange2);
                            }
                        });
                    } else {
                        // this is a sync return
                        // todo when moved to java 1.6 can check if current thread holds lock
                        // New thread is only necessary in that case
                        new Thread(new Runnable() {
                            public void run() {
                                synchronized (exchange2) {
                                    exchange2.notifyAll();
                                }
                            }
                        }).start();
                    }
                } else if (consumer != null) {
                    if (msg.getHeader(.).equals(.)) {
                        .execute(new Runnable() {
                            public void run() {
                                try {
                                    reportState();
                                } catch (EndpointException e) {
                                    e.printStackTrace();
                                }
                            }
                        });
                        return;
                    } else if (msg.getHeader(.).equals(
                            .)) {
                        final String childAddress = (Stringmsg.getBodyAsProperties().get(
                                .);
                        .execute(new Runnable() {
                            public void run() {
                                try {
                                    reportChildState(childAddress);
                                } catch (EndpointException e) {
                                    e.printStackTrace();
                                }
                            }
                        });
                        return;
                        .execute(new Runnable() {
                            public void run() {
                                try {
                                    reportChildren();
                                } catch (EndpointException e) {
                                    e.printStackTrace();
                                }
                            }
                        });
                        return;
                    }
                    // this is an initiating request, will only be handled in
                    // state running, else it will be deferred to
                    // the endpoint reaches state running
                    if (checkDefer(msg))
                        return;
                    if (xpattern.equals(.)) {
                        exchange = createExchange(new ExchangePattern(.));
                    } else {
                        // default behaviour is inOnly
                        if(xpattern.equals(.)){
                            DefaultMessage confMsg = new DefaultMessage();
                            confMsg.setHeader(..);
                            Exchange e = createExchange(new ExchangePattern(.));
                            e.setExchangeId(msg.getHeader(.));
                            confMsg.setReceiverEndpointUri(msg.getHeader(.));
                            e.setOutBoundMessage(confMsg);
                            .debug("Endpoint: " +  + ", Sending robust ack: " + e);
                            processExchange(e);
                        }
                        exchange = createExchange(new ExchangePattern(.));
                    }
                    if (exchange != null) {
                        exchange.setInBoundMessage(msg);
                        exchange.setExchangeId(xId);
                        final Exchange exchange1 = exchange;
                        final Consumer consumer1 = consumer;
                        .execute(new Runnable() {
                            public void run() {
                                // Todo might insert e semaphore here to control
                                // concurrent access to consumer
                                .debug("Endpoint: " +  + ", Processing incoming exchange: " + exchange1);
                                consumer1.process(exchange1);
                            }
                        });
                    }
                }
            }
        }
    }
    protected boolean checkDefer(Message msg) {
            return false;
        }
        .debug("State isn't Running ("++"). Deferring message to queue.");
        .addElement(msg);
        return true;
    }
    public Vector getServices() {
        return ;
    }
    public final void initializeEndpoint() {
         = new UuidGenerator("xId-" + ((getName() == null) ? getEndpointUuid() : getName()));
         = Executors.newThreadPool();
        .removeAllElements();
        if (getEndpointState().equals()) {
            try {
                preStart();
                start();
                postStart();
            } catch (Exception e) {
                e.printStackTrace();
                setEndpointState();
            }
        }
    }
    protected void preStart() throws EndpointException {
        String s = !.equals()?"(name="++")":"";
        .info("Starting endpoint: " +  + s);
        // if (producer instanceof Service && !(producer instanceof Endpoint)) {
        // services.addElement(producer);
        // ((Service) producer).start();
        // }
    }
    public void start() throws Exception {
        // Override this method
    }
    protected void postStart() throws EndpointException {
        createProducer();
        // register to the life cycle manager
        String lcmRegEnabled=getProperty();
        if(lcmRegEnabled==null || !lcmRegEnabled.equalsIgnoreCase("false")) {
            registerLCM();
            =true;
            startLCMHeartbeat();
        }
        String s = !.equals()?"(name="++")":"";
        .info("Successfully started endpoint: " +  + s);
    }
    private void startLCMHeartbeat() {
         = new Timer();
        long delay=Long.parseLong(getProperty("120000"));
        if(delay>0) {
            TimerTask task=new TimerTask() {
                public void run() {
                    .execute(new Runnable() {
                        public void run() {
                            try {
                                reportState();
                            } catch (EndpointException e) {}
                        }
                    });
                }
            };
            .schedule(taskdelaydelay);
        }
    }
    public void stop() throws Exception {
        // Override this method
    }
    public final void shutDownEndpoint() {
        if (!isStateRunning()) {
            return;
        }
        .info("Stopping endpoint: " + );
        try {
            stop();
        } catch (Exception e) {
            e.printStackTrace();
        }
        for (int i = 0; i < .size(); i++) {
            Service service = (Service.elementAt(i);
            try {
                service.stop();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        // unregister to the life cycle manager
        if(!=null)
            .cancel();
        String lcmRegEnabled=getProperty();
        if(lcmRegEnabled==null || !lcmRegEnabled.equalsIgnoreCase("false")) {
            try {
                unRegisterLCM();
            } catch (EndpointException e) {
                e.printStackTrace();
            }
        }
        if ( != null// Null if endpoint was not started successfully
            // (ie couldn't connect)
            .stop();
    }
    public void addLink(String protocolLink linkthrows ConnectingException {
        .put(protocollink);
    }
    public void addAlias(String alias) {
        if (alias != null 
                && !UuidHelper.isUuid(alias
                && UuidHelper.isValidAliasForUuid(aliasgetEndpointUuid())) {
            .addElement(alias);
            updateAliases();
        } else {
            .warn("Cannot assign this endpoint alias:" + alias 
                    + ". Only alias with segment 'dico', 'localcoos' or '" 
                    + UuidHelper.getSegmentFromEndpointNameOrEndpointUuid(getEndpointUuid()) + "' is allowed.");
        }
    }
    public void removeAlias(String alias) {
        .removeElement(alias);
        updateAliases();
    }
    public Vector getAliases() {
        return ;
    }
    private void updateAliases() {
        Link link = (Link.get("coos");
        if (link != null) {
            Message msg = new DefaultMessage("alias".);
            msg.setReceiverEndpointUri("coos://" + link.getDestinationUuid());
            msg.setBody();
            Exchange ex = createExchange(new ExchangePattern(.));
            ex.setOutBoundMessage(msg);
            processExchange(ex);
        }
    }
    public Processor getDefaultProcessor() {
        return this;
    }
    public Link getLink(String id) {
        return (Link.get(id);
    }
    public void removeLink(String id) {
        .remove(id);
    }
    public void removeLinkById(String linkId) {
        // TODO Auto-generated method stub
    }
    public boolean subscribe(SubscriptionFilter filter) {
        .debug("Endpoint: " +  + " subscribing: " + filter);
        filter.setSenderEndpointUri();
        ex.setOutBoundMessage(filter);
        processExchange(ex);
        if(ex.getFaultMessage() != null){
            return false;
        }
        return true;
    }
    public void unsubscribe() {
        .debug("Endpoint: " +  + " unSubscribing all");
        msg.setSenderEndpointUri();
        ex.setOutBoundMessage(msg);
        processExchange(ex);
    }
    public void unsubscribe(SubscriptionFilter filter) {
        .debug("Endpoint: " +  + " unSubscribing: " + filter);
        filter.setSenderEndpointUri();
        ex.setOutBoundMessage(filter);
        processExchange(ex);
    }
    public void publish(Notification notification) {
        .debug("Endpoint: " +  + " publishing: " + notification);
        notification.setSenderEndpointUri();
        ex.setOutBoundMessage(notification);
        processExchange(ex);
    }
    public void setChildStates(Hashtable childStates) {
        this. = childStates;
    }
    public Hashtable getChildStates() {
        return ;
    }
    public Hashtable getProperties() {
        return ;
    }
    public void setLinkAliases(Vector regAliasesLink outlink) {
        // Not implemented
    }
    public Plugin getPlugin() {
        return ;
    }
    public void setPlugin(Plugin plugin) {
        this. = plugin;
    }