Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package io.advantageous.qbit.metrics;
  
 
 import java.util.List;
 
 import static io.advantageous.boon.core.Str.sputs;


Clustered Stat Replicator created by rhightower on 3/24/15.
 
 
 
     private final ServiceDiscovery serviceDiscovery;
             replicatorsMap = new ConcurrentHashMap<>();
     private final Logger logger = LoggerFactory.getLogger(ClusteredStatReplicator.class);
     private final boolean debug = . || .isDebugEnabled();
     private final boolean trace = .isTraceEnabled();
     private final String serviceName;
     private final ServicePool servicePool;
     private final String localServiceId;
     private final Timer timer;
     private final int tallyInterval;
     private final int flushInterval;
     private long currentTime;
     private long lastReconnectTime;
     private long lastSendTime;
     private long lastReplicatorFlush = 0;
     private final ConcurrentHashMap<StringLocalCountcountMap = new ConcurrentHashMap<>();
 
 
     public ClusteredStatReplicator(final String serviceName,
                                    final ServiceDiscovery serviceDiscovery,
                                    final StatReplicatorProvider statReplicatorProvider,
                                    final String localServiceId,
                                    final Timer timer,
                                    final int tallyInterval,
                                    final int flushInterval) {
         this. = serviceDiscovery;
         this. = statReplicatorProvider;
         this. = serviceName;
         this. = localServiceId;
         this. = new ServicePool(serviceNamenull);
         this. = timer;
         this. = tallyInterval;
         this. = flushInterval;
 
     }
 
     @Override
     public void replicateCount(final String namefinal int countfinal long now) {
 
         if (.trace(sputs("ClusteredStatReplicator::replicateCount()",
                 namecountnow));
 
         if () {
             if (.size() == 0) {
                 .debug(sputs("ClusteredStatReplicator::replicateCount"namecountnow));
             }
         }
 
         LocalCount localCount = .get(name);
 
         if (localCount == null) {
             localCount = new LocalCount();
             localCount.name = name;
             .put(namelocalCount);
         }
         localCount.count += count;
 
 
     }
 
     @Override
     public void replicateLevel(String nameint levellong time) {
 
         LocalCount localCount = .get(name);
 
         if (localCount == null) {
             localCount = new LocalCount();
             localCount.name = name;
             .put(namelocalCount);
        }
        localCount.count = level;
    }
    @Override
    public void replicateTiming(String nameint levellong time) {
        LocalCount localCount = .get(name);
        if (localCount == null) {
            localCount = new LocalCount();
            localCount.name = name;
            .put(namelocalCount);
        }
        localCount.count = level;
    }
    private void doRecordCount(Pair<EndpointDefinitionStatReplicatorstatReplicator,
                               final String namefinal int countfinal long now) {
        try {
            statReplicator.getSecond().replicateCount(namecountnow);
        } catch (Exception ex) {
            .error(sputs("ClusteredStatReplicator::Replicator failed"statReplicator), ex);
        }
    }
            .,
            .})
    void process() {
         = .now();
        sendIfNeeded();
        checkForReconnect();
    }
    private void sendIfNeeded() {
        long duration =  - ;
        if (duration > ) {
            this. = ;
            final Collection<LocalCountcountCollection = this..values();
            for (LocalCount localCount : countCollection) {
                if (localCount.count > 0) {
                    .forEach(
                            statReplicator -> doRecordCount(statReplicatorlocalCount.namelocalCount.count)
                    );
                }
                localCount.count = 0;
            }
            if (.size() > 10_000_000) {
                .clear();
            }
            flushReplicatorsAll();
        }
    }
    private void flushReplicatorsAll() {
        if ( -  > ) {
             = ;
            final List<Pair<EndpointDefinitionStatReplicator>> badReplicators = new ArrayList<>();
            .forEach(
                    statReplicator -> flushReplicator(statReplicatorbadReplicators)
            );
            badReplicators.forEach(statReplicator -> {
                        try {
                            statReplicator.getSecond().stop();
                        } catch (Exception ex) {
                            .info("Failed to stop failed node"ex);
                        }
                        .remove(statReplicator);
                        .remove(statReplicator.getFirst().getId());
                    }
            );
            if () {
                .trace(sputs("ClusteredStatReplicator::flushReplicatorsAll()",
                        badReplicators.size()));
                badReplicators.forEach(statReplicator -> .debug(sputs(statReplicator)));
            }
        }
    }
    private void checkForReconnect() {
        long duration =  - ;
        if (duration > 60_000) {
            doCheckReconnect();
        }
    }
    public void doCheckReconnect() {
         = ;
        final List<EndpointDefinitionservices = .services();
        if ((services.size() - 1) != this..size()) {
            .info(sputs("DOING RECONNECT"services.size() - 1,
                    this..size()));
            shutDownReplicators();
            services.forEach(this::addService);
        }
    }
    private void shutDownReplicators() {
        if (.debug("Shutting down replicators");
        for (Pair<EndpointDefinitionStatReplicatorstatReplicator : ) {
            try {
                statReplicator.getSecond().stop();
            } catch (Exception ex) {
                .debug("Shutdown replicator failed"ex);
            }
            if (.debug("Shutting down replicator");
        }
        .clear();
        .clear();
    }
    private void flushReplicator(final Pair<EndpointDefinitionStatReplicatorstatReplicator,
                                 final List<Pair<EndpointDefinitionStatReplicator>> badReplicators) {
        try {
            ServiceProxyUtils.flushServiceProxy(statReplicator.getSecond());
        } catch (Exception exception) {
            badReplicators.add(statReplicator);
            .info("Replicator failed " + statReplicatorexception);
        }
    }

    
Event handler

Parameters:
serviceName service name
    @Override
    public void servicePoolChanged(final String serviceName) {
        if (this..equals(serviceName)) {
            .info("ClusteredStatReplicator::servicePoolChanged({})"serviceName);
            updateServicePool(serviceName);
        } else if () {
            .debug("ClusteredStatReplicator::servicePoolChanged({})",
                    "got event for another service"serviceName);
        }
    }
    private void updateServicePool(final String serviceName) {
        try {
            final List<EndpointDefinitionnodes = .loadServices(serviceName);
            .setHealthyNodes(nodesnew ServicePoolListener() {
                @Override
                public void servicePoolChanged(String serviceName) {
                }
                @Override
                public void serviceAdded(String serviceNameEndpointDefinition endpointDefinition) {
                    addService(endpointDefinition);
                }
                @Override
                public void serviceRemoved(String serviceNameEndpointDefinition endpointDefinition) {
                    removeService(endpointDefinition);
                }
            });
        }catch (Exception ex) {
            .error("Error updating service pool");
        }
    }
    private void removeService(final EndpointDefinition endpointDefinition) {
        .info(sputs("ClusteredStatReplicator::removeService()",
                endpointDefinition" replicator count ".size()));
        final Pair<EndpointDefinitionStatReplicatorstatReplicatorPair = this..get(endpointDefinition.getId());
        if (statReplicatorPair == null) {
            .error(sputs("ClusteredStatReplicator::removeService() Trying to remove a service that we are not managing",
                    "END POINT ID"endpointDefinition.getId(), " replicator count ".size()));
            return;
        }
        if (statReplicatorPair.getSecond() == null) {
            .error(sputs("ClusteredStatReplicator::removeService() Trying to remove a service that we are nto managing" +
                            "and the getSecond() is null",
                    "END POINT ID"endpointDefinition.getId(), " replicator count ".size()));
            return;
        }
        try {
            statReplicatorPair.getSecond().stop();
        } catch (Exception ex) {
            .error("Unable to stop service endpoint that was removed " + endpointDefinitionex);
        }
        this..remove(endpointDefinition.getId());
        this. = new ArrayList<>(.values());
        .info(sputs("ClusteredStatReplicator::removeService() removed",
                endpointDefinition" replicator count ".size()));
    }
    private void addService(final EndpointDefinition endpointDefinition) {
        if (endpointDefinition.getId().equals()) {
            return;
        }
        .info(sputs("ClusteredStatReplicator::addService()"endpointDefinition,
                " replicator count ".size()));
        final StatReplicator statReplicator = .provide(endpointDefinition);
        this..put(endpointDefinition.getId(), Pair.pair(endpointDefinitionstatReplicator));
        this. = new ArrayList<>(.values());
        .info(sputs("ClusteredStatReplicator::addService() added",
                endpointDefinition" replicator count ".size()));
    }
    @Override
    public void flush() {
        process();
    }
    final static class LocalCount {
        int count;
        String name;
    }
New to GrepCode? Check out our FAQ X