Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Copyright 2005-2014 Red Hat, Inc. Red Hat licenses this file to you under the Apache License, version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 package io.fabric8.camel;
 
 
 import java.util.Map;
 import java.util.Set;

Creates an endpoint which uses FABRIC to map a logical name to physical endpoint names
 
 public class FabricLocatorEndpoint extends DefaultEndpoint implements GroupListener<CamelNodeState> {
     private static final transient Log LOG = LogFactory.getLog(FabricLocatorEndpoint.class);
 
     private final FabricComponent component;
     private final Group<CamelNodeStategroup;
 
     private LoadBalancer loadBalancer;
     private final Map<StringProcessorprocessors = new HashMap<StringProcessor>();
 
 
     public FabricLocatorEndpoint(String uriFabricComponent componentString singletonId) {
         super(uricomponent);
         this. = component;
 
         String path = getComponent().getFabricPath(singletonId);
          = getComponent().createGroup(path);
         .add(this);
     }
 
     @Override
     public synchronized void groupEvent(Group<CamelNodeStategroupGroupEvent event) {
         Map<StringCamelNodeStatemembers;
         if (event == . || !isStarted()) {
             members = Collections.emptyMap();
         } else {
             members = group.members();
         }
         //Find what has been removed.
         Set<Stringremoved = new LinkedHashSet<String>();
 
         for (Map.Entry<StringProcessorentry : .entrySet()) {
             String key = entry.getKey();
             if (!members.containsKey(key)) {
                 removed.add(key);
             }
         }
 
         //Add existing processors
         for (Map.Entry<StringCamelNodeStateentry : members.entrySet()) {
             try {
                 String key = entry.getKey();
                 if (!.containsKey(key)) {
                     Processor p = getProcessor(entry.getValue().);
                     .put(keyp);
                     .addProcessor(p);
                 }
             } catch (URISyntaxException e) {
                 .warn("Unable to add endpoint " + entry.getValue().e);
             }
         }
 
         //Update the list by removing old and adding new.
         for (String key : removed) {
             Processor p = .remove(key);
             .removeProcessor(p);
         }
    }
    @SuppressWarnings("unchecked")
    public Producer createProducer() throws Exception {
        final FabricLocatorEndpoint endpoint = this;
        return new DefaultProducer(endpoint) {
            public void process(Exchange exchangethrows Exception {
                .process(exchange);
            }
        };
    }
    public Consumer createConsumer(Processor processorthrows Exception {
        throw new UnsupportedOperationException("You cannot consume from a FABRIC endpoint using just its fabric name directly, you must use fabric:name:someActualUri instead");
    }
    public boolean isSingleton() {
        return true;
    }
    @Override
    public void start() throws Exception {
        super.start();
        if ( == null) {
             = createLoadBalancer();
        }
        .start();
    }
    @Override
    public void stop() throws Exception {
        super.stop();
        .close();
    }
    public Processor getProcessor(String urithrows URISyntaxException {
        uri = ZooKeeperUtils.getSubstitutedData(.getCurator(), uri);
        .info("Creating endpoint for " + uri);
        final Endpoint endpoint = getCamelContext().getEndpoint(uri);
        return new Processor() {
            public void process(Exchange exchangethrows Exception {
                ProducerCache producerCache = .getProducerCache();
                Producer producer = producerCache.acquireProducer(endpoint);
                try {
                    producer.process(exchange);
                } finally {
                    producerCache.releaseProducer(endpointproducer);
                }
            }
            @Override
            public String toString() {
                return "Producer for " + endpoint;
            }
        };
    }
    // Properties
    //-------------------------------------------------------------------------
    public FabricComponent getComponent() {
        return ;
    }
        if ( == null) {
        }
        return ;
    }
    public void setLoadBalancerFactory(LoadBalancerFactory loadBalancerFactory) {
        this. = loadBalancerFactory;
    }
    public LoadBalancer createLoadBalancer() {
        return getLoadBalancerFactory().createLoadBalancer();
    }
New to GrepCode? Check out our FAQ X