Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.facebook.presto.failureDetector;
 
 
 
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
 import static com.facebook.presto.util.ImmutableCollectors.toImmutableList;
 import static com.facebook.presto.util.ImmutableCollectors.toImmutableSet;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static io.airlift.concurrent.Threads.daemonThreadsNamed;
 import static io.airlift.http.client.Request.Builder.prepareHead;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 
         implements FailureDetector
 {
     private static final Logger log = Logger.get(HeartbeatFailureDetector.class);
 
     private final ServiceSelector selector;
     private final HttpClient httpClient;
     private final NodeInfo nodeInfo;
 
     private final ScheduledExecutorService executor = newSingleThreadScheduledExecutor(daemonThreadsNamed("failure-detector"));
 
     // monitoring tasks by service id
     private final ConcurrentMap<UUIDMonitoringTasktasks = new ConcurrentHashMap<>();
 
     private final double failureRatioThreshold;
     private final Duration heartbeat;
     private final boolean isEnabled;
     private final Duration warmupInterval;
     private final Duration gcGraceInterval;
 
     private final AtomicBoolean started = new AtomicBoolean();
 
     @Inject
     public HeartbeatFailureDetector(
             @ServiceType("presto"ServiceSelector selector,
             @ForFailureDetector HttpClient httpClient,
             FailureDetectorConfig config,
             NodeInfo nodeInfo)
     {
         checkNotNull(selector"selector is null");
         checkNotNull(httpClient"httpClient is null");
         checkNotNull(nodeInfo"nodeInfo is null");
         checkNotNull(config"config is null");
         checkArgument(config.getHeartbeatInterval().toMillis() >= 1, "heartbeat interval must be >= 1ms");
 
         this. = selector;
         this. = httpClient;
        this. = nodeInfo;
        this. = config.getFailureRatioThreshold();
        this. = config.getHeartbeatInterval();
        this. = config.getWarmupInterval();
        this. = config.getExpirationGraceInterval();
        this. = config.isEnabled();
    }
    public void start()
    {
        if ( && .compareAndSet(falsetrue)) {
            .scheduleWithFixedDelay(new Runnable()
            {
                @Override
                public void run()
                {
                    try {
                        updateMonitoredServices();
                    }
                    catch (Throwable e) {
                        // ignore to avoid getting unscheduled
                        .warn(e"Error updating services");
                    }
                }
            }, 0, 5, .);
        }
    }
    @PreDestroy
    public void shutdown()
    {
        .shutdownNow();
    }
    @Override
    public Set<ServiceDescriptorgetFailed()
    {
        return .values().stream()
                .filter(MonitoringTask::isFailed)
                .map(MonitoringTask::getService)
                .collect(toImmutableSet());
    }
    @Managed(description = "Number of failed services")
    public int getFailedCount()
    {
        return getFailed().size();
    }
    @Managed(description = "Total number of known services")
    public int getTotalCount()
    {
        return .size();
    }
    @Managed
    public int getActiveCount()
    {
        return .size() - getFailed().size();
    }
    public Map<ServiceDescriptorStatsgetStats()
    {
        ImmutableMap.Builder<ServiceDescriptorStatsbuilder = ImmutableMap.builder();
        for (MonitoringTask task : .values()) {
            builder.put(task.getService(), task.getStats());
        }
        return builder.build();
    }
    {
        Set<ServiceDescriptoronline = .selectAllServices().stream()
                .filter(descriptor -> !.getNodeId().equals(descriptor.getNodeId()))
                .collect(toImmutableSet());
        Set<UUIDonlineIds = online.stream()
                .map(ServiceDescriptor::getId)
                .collect(toImmutableSet());
        // make sure only one thread is updating the registrations
        synchronized () {
            // 1. remove expired tasks
            List<UUIDexpiredIds = .values().stream()
                    .filter(MonitoringTask::isExpired)
                    .map(MonitoringTask::getService)
                    .map(ServiceDescriptor::getId)
                    .collect(toImmutableList());
            .keySet().removeAll(expiredIds);
            // 2. disable offline services
            .values().stream()
                    .filter(task -> !onlineIds.contains(task.getService().getId()))
                    .forEach(MonitoringTask::disable);
            // 3. create tasks for new services
            Set<ServiceDescriptornewServices = online.stream()
                    .filter(service -> !.keySet().contains(service.getId()))
                    .collect(toImmutableSet());
            for (ServiceDescriptor service : newServices) {
                URI uri = getHttpUri(service);
                if (uri != null) {
                    .put(service.getId(), new MonitoringTask(serviceuri));
                }
            }
            // 4. enable all online tasks (existing plus newly created)
            .values().stream()
                    .filter(task -> onlineIds.contains(task.getService().getId()))
                    .forEach(MonitoringTask::enable);
        }
    }
    private static URI getHttpUri(ServiceDescriptor service)
    {
        try {
            String uri = service.getProperties().get("http");
            if (uri != null) {
                return new URI(uri);
            }
        }
        catch (URISyntaxException e) {
            // ignore, not a valid http uri
        }
        return null;
    }
    @ThreadSafe
    private class MonitoringTask
    {
        private final ServiceDescriptor service;
        private final URI uri;
        private final Stats stats;
        @GuardedBy("this")
        private ScheduledFuture<?> future;
        @GuardedBy("this")
        private Long disabledTimestamp;
        @GuardedBy("this")
        private Long successTransitionTimestamp;
        private MonitoringTask(ServiceDescriptor serviceURI uri)
        {
            this. = uri;
            this. = service;
            this. = new Stats(uri);
        }
        public Stats getStats()
        {
            return ;
        }
        public ServiceDescriptor getService()
        {
            return ;
        }
        public synchronized void enable()
        {
            if ( == null) {
                 = .scheduleAtFixedRate(new Runnable()
                {
                    @Override
                    public void run()
                    {
                        try {
                            ping();
                            updateState();
                        }
                        catch (Throwable e) {
                            // ignore to avoid getting unscheduled
                            .warn(e"Error pinging service %s (%s)".getId(), );
                        }
                    }
                }, .toMillis(), .toMillis(), .);
                 = null;
            }
        }
        public synchronized void disable()
        {
            if ( != null) {
                .cancel(true);
                 = null;
                 = System.nanoTime();
            }
        }
        public synchronized boolean isExpired()
        {
            return  == null &&  != null && Duration.nanosSince().compareTo() > 0;
        }
        public synchronized boolean isFailed()
        {
            return  == null || // are we disabled?
                     == null || // are we in success state?
                    Duration.nanosSince().compareTo() < 0; // are we within the warmup period?
        }
        private void ping()
        {
            try {
                .recordStart();
                .executeAsync(prepareHead().setUri().build(), new ResponseHandler<ObjectException>()
                {
                    @Override
                    public Exception handleException(Request requestException exception)
                    {
                        // ignore error
                        .recordFailure(exception);
                        // TODO: this will technically cause an NPE in httpClient, but it's not triggered because
                        // we never call get() on the response future. This behavior needs to be fixed in airlift
                        return null;
                    }
                    @Override
                    public Object handle(Request requestResponse response)
                            throws Exception
                    {
                        .recordSuccess();
                        return null;
                    }
                });
            }
            catch (RuntimeException e) {
                .warn(e"Error scheduling request for %s");
            }
        }
        private synchronized void updateState()
        {
            // is this an over/under transition?
            if (.getRecentFailureRatio() > ) {
                 = null;
            }
            else if ( == null) {
                 = System.nanoTime();
            }
        }
    }
    public static class Stats
    {
        private final long start = System.nanoTime();
        private final URI uri;
        private final DecayCounter recentRequests = new DecayCounter(ExponentialDecay.oneMinute());
        private final DecayCounter recentFailures = new DecayCounter(ExponentialDecay.oneMinute());
        private final DecayCounter recentSuccesses = new DecayCounter(ExponentialDecay.oneMinute());
        private final AtomicReference<DateTimelastRequestTime = new AtomicReference<>();
        private final AtomicReference<DateTimelastResponseTime = new AtomicReference<>();
        @GuardedBy("this")
        private final Map<Class<? extends Throwable>, DecayCounterfailureCountByType = new HashMap<>();
        public Stats(URI uri)
        {
            this. = uri;
        }
        public void recordStart()
        {
            .add(1);
            .set(new DateTime());
        }
        public void recordSuccess()
        {
            .add(1);
            .set(new DateTime());
        }
        public void recordFailure(Exception exception)
        {
            .add(1);
            .set(new DateTime());
            Throwable cause = exception;
            while (cause.getClass() == RuntimeException.class && cause.getCause() != null) {
                cause = cause.getCause();
            }
            synchronized (this) {
                DecayCounter counter = .get(cause.getClass());
                if (counter == null) {
                    counter = new DecayCounter(ExponentialDecay.oneMinute());
                    .put(cause.getClass(), counter);
                }
                counter.add(1);
            }
        }
        @JsonProperty
        public Duration getAge()
        {
            return Duration.nanosSince();
        }
        @JsonProperty
        public URI getUri()
        {
            return ;
        }
        @JsonProperty
        public double getRecentFailures()
        {
            return .getCount();
        }
        @JsonProperty
        public double getRecentSuccesses()
        {
            return .getCount();
        }
        @JsonProperty
        public double getRecentRequests()
        {
            return .getCount();
        }
        @JsonProperty
        public double getRecentFailureRatio()
        {
            return .getCount() / .getCount();
        }
        @JsonProperty
        public DateTime getLastRequestTime()
        {
            return .get();
        }
        @JsonProperty
        public DateTime getLastResponseTime()
        {
            return .get();
        }
        @JsonProperty
        public synchronized Map<StringDoublegetRecentFailuresByType()
        {
            ImmutableMap.Builder<StringDoublebuilder = ImmutableMap.builder();
            for (Map.Entry<Class<? extends Throwable>, DecayCounterentry : .entrySet()) {
                builder.put(entry.getKey().getName(), entry.getValue().getCount());
            }
            return builder.build();
        }
    }
New to GrepCode? Check out our FAQ X