Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Copyright (C) FuseSource, Inc. http://fusesource.com Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 package org.fusesource.fabric.dosgi.impl;
 
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 
 import static org.osgi.service.remoteserviceadmin.RemoteConstants.*;
 
 
     public static final String CONFIG = "fabric-dosgi";
 
     private static final Logger LOGGER = LoggerFactory.getLogger(Manager.class);
     private static final String DOSGI_REGISTRY = "/fabric/dosgi";
     private static final String FABRIC_ADDRESS = "fabric.address";
 
     private final BundleContext bundleContext;
 
     private ServiceRegistration registration;
 
     //
     // Discovery part
     //
 
     // The zookeeper client
     private final IZKClient zooKeeper;
     // The tracked zookeeper tree
     private ZooKeeperTreeTracker<Stringtree;
     // Remote endpoints
 
     //
     // Internal data structures
     //
     private final DispatchQueue queue;
 
    private final Map<ListenerInfoSimpleFilterlisteners;
    private String uuid;
    private final String uri;
    private final String exportedAddress;
    private final long timeout;
    private ClientInvoker client;
    private ServerInvoker server;
    public Manager(BundleContext contextIZKClient zooKeeperthrows Exception {
        this(contextzooKeeper"tcp://0.0.0.0:2543"null..toMillis(5));
    }
    public Manager(BundleContext contextIZKClient zooKeeperString uriString exportedAddresslong timeoutthrows Exception {
        this. = Dispatch.createQueue();
        this. = new ConcurrentHashMap<ListenerInfoSimpleFilter>();
        this. = new CapabilitySet<EndpointDescription>(
                Arrays.asList(.), false);
        this. = context;
        this. = zooKeeper;
        this. = uri;
        this. = exportedAddress;
        this. = timeout;
    }
    public void init() throws Exception {
        // Create client and server
        this. = new ServerInvokerImpl();
        this..start();
        this..start();
        // ZooKeeper tracking
        try {
        } catch (KeeperException.NodeExistsException e) {
            // The node already exists, that's fine
        }
        this. = new ZooKeeperTreeTracker<String>(this.new ZKStringDataReader(), );
        this..track(this);
        // UUID
        this. = Utils.getUUID(this.);
        // Service listener filter
        String filter = "(" + . + "=*)";
        // Initialization
        this..addServiceListener(thisfilter);
        // Service registration
        this. = this..registerService(new String[] { ListenerHook.class.getName(), EventHook.class.getName(), FindHook.class.getName() }, thisnull);
        // Check existing services
        ServiceReference[] references = this..getServiceReferences((Stringnullfilter);
        if (references != null) {
            for (ServiceReference reference : references) {
                exportService(reference);
            }
        }
    }
    public void destroy() {
        for (Map<LongImportRegistrationregistrations : this..values()) {
            for (ImportRegistration registration : registrations.values()) {
                registration.getImportedService().unregister();
            }
        }
        for (ServiceReference reference : this..keySet()) {
            unExportService(reference);
        }
        this..stop();
        this..stop();
        this..destroy();
        if ( != null) {
            this..unregister();
        }
        this..removeServiceListener(this);
    }
    //
    // ServiceListener
    //
    public void serviceChanged(final ServiceEvent event) {
        final ServiceReference reference = event.getServiceReference();
        switch (event.getType()) {
            case .:
                exportService(reference);
                break;
            case .:
                updateService(reference);
                break;
            case .:
                unExportService(reference);
                break;
        }
    }
    //
    // ListenerHook
    //
    @SuppressWarnings("unchecked")
    public void added(final Collection listenerInfos) {
        for (ListenerInfo listenerInfo : (Collection<ListenerInfo>) listenerInfos) {
            // Ignore our own listeners or those that don't have any filter
            if (listenerInfo.getBundleContext() ==  || listenerInfo.getFilter() == null) {
                continue;
            }
            // Make sure we only import remote services
            String filter = "(&" + listenerInfo.getFilter() + "(!(" +  + "=" + this. + ")))";
            SimpleFilter exFilter = SimpleFilter.parse(filter);
            .put(listenerInfoexFilter);
            // Iterate through known services and import them if needed
            Set<EndpointDescriptionmatches = .match(exFilter);
            for (EndpointDescription endpoint : matches) {
                doImportService(endpointlistenerInfo);
            }
        }
    }
    @SuppressWarnings("unchecked")
    public void removed(final Collection listenerInfos) {
        for (ListenerInfo listenerInfo : (Collection<ListenerInfo>) listenerInfos) {
            // Ignore our own listeners or those that don't have any filter
            if (listenerInfo.getBundleContext() ==  || listenerInfo.getFilter() == null) {
                continue;
            }
            SimpleFilter exFilter = .remove(listenerInfo);
            // Iterate through known services and dereference them if needed
            Set<EndpointDescriptionmatches = .match(exFilter);
            for (EndpointDescription endpoint : matches) {
                Map<LongImportRegistrationregistrations = .get(endpoint);
                if (registrations != null) {
                    ImportRegistration registration = registrations.get(listenerInfo.getBundleContext().getBundle().getBundleId());
                    if (registration != null) {
                        registration.removeReference(listenerInfo);
                        if (!registration.hasReferences()) {
                            registration.getImportedService().unregister();
                            registrations.remove(listenerInfo.getBundleContext().getBundle().getBundleId());
                        }
                    }
                }
            }
        }
    }
    //
    // EventHook
    //
    @SuppressWarnings("unchecked")
    public void event(ServiceEvent eventCollection collection) {
        // Our imported services are exported from within the importing bundle and should only be visible it
        ServiceReference reference = event.getServiceReference();
        if (reference.getProperty() != null && reference.getProperty() != null) {
            Collection<BundleContextcontexts = (Collection<BundleContext>) collection;
            for (Iterator<BundleContextiterator = contexts.iterator(); iterator.hasNext();) {
                BundleContext context = iterator.next();
                if (context != reference.getBundle().getBundleContext() && context != this.) {
                    iterator.remove();
                }
            }
        }
    }
    //
    // FindHook
    //
    @SuppressWarnings("unchecked")
    public void find(BundleContext contextString nameString filterboolean allServicesCollection collection) {
        // Our imported services are exported from within the importing bundle and should only be visible it
        Collection<ServiceReferencereferences = (Collection<ServiceReference>) collection;
        for (Iterator<ServiceReferenceiterator = references.iterator(); iterator.hasNext();) {
            ServiceReference reference = iterator.next();
            if (reference.getProperty() != null && reference.getProperty() != null) {
                if (context != reference.getBundle().getBundleContext() && context != this.) {
                    iterator.remove();
                }
            }
        }
    }
    //
    // NodeEventsListener
    //
    public void onEvents(final Collection<NodeEvent<String>> nodeEvents) {
        try {
            for (NodeEvent<Stringevent : nodeEvents) {
                if (event.getDepth() == 0) {
                    continue;
                }
                switch (event.getEventType()) {
                    case : {
                        EndpointDescription endpoint = Utils.getEndpointDescription(event.getData());
                        .addCapability(endpoint);
                        // Check existing listeners
                        for (Map.Entry<ListenerInfoSimpleFilterentry : .entrySet()) {
                            if (CapabilitySet.matches(endpointentry.getValue())) {
                                doImportService(endpointentry.getKey());
                            }
                        }
                    }
                    break;
                    case : {
                        EndpointDescription endpoint = Utils.getEndpointDescription(event.getData());
                        Map<LongImportRegistrationregistrations = .get(endpoint);
                        if (registrations != null) {
                            for (ImportRegistration reg : registrations.values()) {
                                reg.importedService.setProperties(new Hashtable<StringObject>(endpoint.getProperties()));
                            }
                        }
                    }
                    break;
                    case : {
                        EndpointDescription endpoint = Utils.getEndpointDescription(event.getData());
                        .removeCapability(endpoint);
                        Map<LongImportRegistrationregistrations = .remove(endpoint);
                        if (registrations != null) {
                            for (ImportRegistration reg : registrations.values()) {
                                reg.getImportedService().unregister();
                            }
                        }
                    }
                    break;
                }
            }
        } catch (Exception e) {
            .info("Error when handling zookeeper events"e);
        }
    }
    //
    // Export logic
    //
    protected void exportService(final ServiceReference reference) {
        if (!.containsKey(reference)) {
            try {
                ExportRegistration registration = doExportService(reference);
                if (registration != null) {
                    .put(referenceregistration);
                }
            } catch (Exception e) {
                .info("Error when exporting endpoint"e);
            }
        }
    }
    protected void updateService(final ServiceReference reference) {
        ExportRegistration registration = .get(reference);
        if (registration != null) {
            try {
                // TODO: implement logic
                // TODO: need to reflect simple properties change, but also export
                // TODO: related properties like the exported interfaces
            } catch (Exception e) {
                .info("Error when updating endpoint"e);
            }
        }
    }
    protected void unExportService(final ServiceReference reference) {
        try {
            ExportRegistration registration = .remove(reference);
            if (registration != null) {
                .unregisterService(registration.getExportedEndpoint().getId());
                .delete(registration.getZooKeeperNode());
            }
        } catch (Exception e) {
            .info("Error when unexporting endpoint"e);
        }
    }
    protected ExportRegistration doExportService(final ServiceReference referencethrows Exception {
        // Compute properties
        Map<StringObjectproperties = new TreeMap<StringObject>(.);
        for (String k : reference.getPropertyKeys()) {
            properties.put(kreference.getProperty(k));
        }
        // Bail out if there is any intents specified, we don't support any
        Set<Stringintents = Utils.normalize(properties.get());
        Set<StringextraIntents = Utils.normalize(properties.get());
        if (!intents.isEmpty() || !extraIntents.isEmpty()) {
            throw new UnsupportedOperationException();
        }
        // Bail out if there are any configurations specified, we don't support any
        Set<Stringconfigs = Utils.normalize(properties.get());
        if (configs.isEmpty()) {
            configs.add();
        } else if (!configs.contains()) {
            throw new UnsupportedOperationException();
        }
        URI connectUri = new URI(this..getConnectAddress());
        String fabricAddress = connectUri.getScheme() + "://" +  + ":" + connectUri.getPort();
        properties.remove();
        properties.put(new String[] {  });
        properties.put(this.);
        properties.put(fabricAddress);
        String uuid = UuidGenerator.getUUID();
        properties.put(uuid);
        // Now, export the service
        EndpointDescription description = new EndpointDescription(properties);
        // Export it
        .registerService(description.getId(), new ServerInvoker.ServiceFactory() {
            public Object get() {
                return reference.getBundle().getBundleContext().getService(reference);
            }
            public void unget() {
                reference.getBundle().getBundleContext().ungetService(reference);
            }
        }, AriesFrameworkUtil.getClassLoader(reference.getBundle()));
        String descStr = Utils.getEndpointDescriptionXML(description);
        // Publish in ZooKeeper
        final String nodePath = .create( + "/" + uuiddescStr.);
        // Return
        return new ExportRegistration(referencedescriptionnodePath);
    }
    //
    // Import logic
    //
    protected ImportRegistration doImportService(final EndpointDescription endpointfinal ListenerInfo listener) {
        Map<LongImportRegistrationregistrations = .get(endpoint);
        if (registrations == null) {
            registrations = new HashMap<LongImportRegistration>();
            .put(endpointregistrations);
        }
        ImportRegistration reg = registrations.get(listener.getBundleContext().getBundle().getBundleId());
        if (reg == null) {
            Bundle bundle = .getBundle(listener.getBundleContext().getBundle().getBundleId());
            ServiceRegistration registration = bundle.getBundleContext().registerService(
                    endpoint.getInterfaces().toArray(new String[endpoint.getInterfaces().size()]),
                    new Factory(endpoint),
                    new Hashtable<StringObject>(endpoint.getProperties())
            );
            reg = new ImportRegistration(registrationendpoint);
            registrations.put(listener.getBundleContext().getBundle().getBundleId(), reg);
        }
        reg.addReference(listener);
        return reg;
    }
    public DispatchQueue queue() {
        return ;
    }
    class Factory implements ServiceFactory {
        private final EndpointDescription description;
        Factory(EndpointDescription description) {
            this. = description;
        }
        public Object getService(Bundle bundleServiceRegistration registration) {
            ClassLoader classLoader = AriesFrameworkUtil.getClassLoader(bundle);
            List<Classinterfaces = new ArrayList<Class>();
            for (String interfaceName : .getInterfaces()) {
                try {
                    interfaces.add(classLoader.loadClass(interfaceName));
                } catch (ClassNotFoundException e) {
                    // Ignore
                }
            }
            String address = (String.getProperties().get();
            InvocationHandler handler = .getProxy(address.getId(), classLoader);
            return Proxy.newProxyInstance(classLoaderinterfaces.toArray(new Class[interfaces.size()]), handler);
        }
        public void ungetService(Bundle bundleServiceRegistration registrationObject service) {
        }
    }
New to GrepCode? Check out our FAQ X