Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Copyright 2014 Cisco Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 
 
 
 
 package com.cisco.oss.foundation.directory.impl;
 
 import java.util.List;
 import java.util.Map;
 
 
 import static com.cisco.oss.foundation.directory.ServiceDirectory.getServiceDirectoryConfig;

The DirectoryRegistrationService with heartbeat and ServiceInstanceHealth callback checking. The HeartbeatDirectoryRegistrationService will cache the registered ProvidedServiceInstance at the startup, and send heartbeats, and perform ServiceInstanceHealth callback for the monitorEnabled ProvidedServiceInstance.
 
 public class HeartbeatDirectoryRegistrationService extends
         DirectoryRegistrationService implements Stoppable {
 
     private static final Logger LOGGER = LoggerFactory
             .getLogger(HeartbeatDirectoryRegistrationService.class);

    
The RegistrationManager health check executor kickoff delay time property name in seconds.
 
     public static final String SD_API_REGISTRY_HEALTH_CHECK_DELAY_PROPERTY = "com.cisco.oss.foundation.directory.registry.health.check.delay";

    
The default delay time of health check executor kickoff.
 
     public static final int SD_API_REGISTRY_HEALTH_CHECK_DELAY_DEFAULT = 1;

    
The RegistrationManager health check interval property name in seconds.
 
     public static final String SD_API_REGISTRY_HEALTH_CHECK_INTERVAL_PROPERTY = "com.cisco.oss.foundation.directory.registry.health.check.interval";

    
The default health check interval value of RegistrationManager.
 
     public static final int SD_API_REGISTRY_HEALTH_CHECK_INTERVAL_DEFAULT = 5;

    
The RegistrationManager heartbeat executor kickoff delay time property name in seconds.
 
     public static final String SD_API_HEARTBEAT_DELAY_PROPERTY = "com.cisco.oss.foundation.directory.heartbeat.delay";

    
The default delay time of RegistrationManager heartbeat executor kickoff
 
     public static final int SD_API_HEARTBEAT_DELAY_DEFAULT = 1;

    
The RegistrationManager send ServiceInstance heartbeat interval property name.
 
     public static final String SD_API_HEARTBEAT_INTERVAL_PROPERTY = "com.cisco.oss.foundation.directory.heartbeat.interval";

    
The default interval value of RegistrationManager send ServiceInstance heartbeats.
    public static final int SD_API_HEARTBEAT_INTERVAL_DEFAULT = 10;

    
The CachedProviderServiceInstance Set
ServiceInstanceHealth check ExecutorService.
    private final ScheduledExecutorService healthCheckService;

    
The heartbeat executor service.
    private final ScheduledExecutorService heartbeatService;

    
Mark whether component is started?
    private final AtomicBoolean isStarted = new AtomicBoolean(false);

    
The Read and Write lock to protect the CachedProviderServiceInstance Set.
    private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    
The Read lock.
    private final Lock read = .readLock();

    
The Write lock.
    private final Lock write = .writeLock();

    
Start the component.
    @Override
    public void start() {
        if(.compareAndSet(false,true)){
            scheduleTasks();
        }
    }

    
Stop the Component. it is idempotent, it can be invoked in multiple times.
    @Override
    public void stop() {
        if(.compareAndSet(true,false)){
            .shutdown();
            .shutdown();
        }
    }

    
Constructor.

Parameters:
directoryServiceClient the DirectoryServiceClientManager to get DirectoryServiceClient.
            DirectoryServiceClient directoryServiceClient) {
        super(directoryServiceClient);
         = Executors
                .newSingleThreadScheduledExecutor(new ThreadFactory() {
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setName("SD API RegistryHealth Check");
                        t.setDaemon(true);
                        return t;
                    }
                });
         = Executors
                .newSingleThreadScheduledExecutor(new ThreadFactory() {
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setName("SD API Heartbeat");
                        t.setDaemon(true);
                        return t;
                    }
                });
     }

    
    @Override
    public void registerService(ProvidedServiceInstance serviceInstance) {
        super.registerService(serviceInstance);
        if(serviceInstance.isMonitorEnabled()){
            registerCachedServiceInstance(serviceInstancenull);
        }
    }

    
    @Override
    public void registerService(ProvidedServiceInstance serviceInstanceServiceInstanceHealth registryHealth) {
        super.registerService(serviceInstanceregistryHealth);
        registerCachedServiceInstance(serviceInstanceregistryHealth);
    }

    
    @Override
    public void updateServiceUri(String serviceNameString providerIdString uri){
        boolean isOwned = false;
        CachedProviderServiceInstance inst = getCachedServiceInstance(serviceNameproviderId);
        if(inst != null){
            isOwned = true;
        }
        this.getServiceDirectoryClient().updateInstanceUri(serviceNameproviderIduriisOwned);
    }

    
    @Override
    public void updateServiceOperationalStatus(String serviceName,
            String providerIdOperationalStatus status) {
        boolean isOwned = false;
        CachedProviderServiceInstance inst = getCachedServiceInstance(serviceNameproviderId);
        if(inst != null){
            isOwned = true;
            inst.setStatus(status);
        }
        this.getServiceDirectoryClient().updateInstanceStatus(serviceNameproviderIdstatusisOwned);
    }

    
    @Override
    public void updateService(ProvidedServiceInstance serviceInstance) {
        if(serviceInstance.isMonitorEnabled()){
            CachedProviderServiceInstance inst = getCachedServiceInstance(serviceInstance.getServiceName(),
                    serviceInstance.getProviderId());
            if(inst == null){
                throw new ServiceException(.);
            }
            this.editCachedServiceInstance(serviceInstance);
        }
        super.updateService(serviceInstance);
    }

    
    @Override
    public void unregisterService(String serviceNameString providerId) {
        boolean isOwned = false;
        CachedProviderServiceInstance inst = getCachedServiceInstance(serviceNameproviderId);
        if(inst != null){
            isOwned = true;
            this.unregisterCachedServiceInstance(serviceNameproviderId);
        }
        this.getServiceDirectoryClient().unregisterInstance(serviceNameproviderIdisOwned);
    }

    
Register a ProvidedServiceInstance to the Cache. Add/update the ProvidedServiceInstance in the cache. It is thread safe.

Parameters:
instance the ProvidedServiceInstance.
registryHealth the ServiceInstanceHealth callback.
    private void registerCachedServiceInstance(ProvidedServiceInstance instanceServiceInstanceHealth registryHealth) {
        try {
            .lock();
            ServiceInstanceId id = new ServiceInstanceId(instance.getServiceName(),
                    instance.getProviderId());
            CachedProviderServiceInstance cachedInstance = getCacheServiceInstances().get(id);
            if(cachedInstance == null){
                cachedInstance = new CachedProviderServiceInstance(
                    instance);
                getCacheServiceInstances().put(idcachedInstance);
            }
            if(.isDebugEnabled()){
                .debug("add cached ProvidedServiceInstance: {}."cachedInstance.toString());
            }
            cachedInstance.setServiceInstanceHealth(registryHealth);
        } finally {
            .unlock();
        }
    }

    
Edit the Cached ProvidedServiceInstance when updateService is called. It is thread safe.

Parameters:
instance the ProvidedServiceInstance.
    private void editCachedServiceInstance(ProvidedServiceInstance instance) {
        try {
            .lock();
            ServiceInstanceId id = new ServiceInstanceId(instance.getServiceName(),
                    instance.getProviderId());
            CachedProviderServiceInstance cachedInstance = getCacheServiceInstances().get(id);
            if(cachedInstance != null){
                cachedInstance.setMonitorEnabled(instance.isMonitorEnabled());
                cachedInstance.setStatus(instance.getStatus());
            }
            
            .debug("update cached ProvidedServiceInstance: {}."cachedInstance.toString());
        } finally {
            .unlock();
        }
    }

    
Get the Cached ProvidedServiceInstance by serviceName and providerId. It is thread safe.

Parameters:
serviceName the serviceName
providerId the providerId
Returns:
the CachedProviderServiceInstance
            String serviceNameString providerId) {
        try {
            .lock();
            ServiceInstanceId id = new ServiceInstanceId(serviceNameproviderId);
            return getCacheServiceInstances().get(id);
        } finally {
            .unlock();
        }
    }

    
Delete the Cached ProvidedServiceInstance by serviceName and providerId. It is thread safe.

Parameters:
serviceName the serviceName.
providerId the providerId.
    private void unregisterCachedServiceInstance(
            String serviceNameString providerId) {
        try {
            .lock();
            ServiceInstanceId id = new ServiceInstanceId(serviceNameproviderId);
            getCacheServiceInstances().remove(id);
            .debug(
                    "delete cached ProvidedServiceInstance, serviceName={}, providerId={}.",
                    serviceNameproviderId);
        } finally {
            .unlock();
        }
    }

    
Get the CachedProviderServiceInstance Set. It is lazy initialized and thread safe.

Returns:
the CachedProviderServiceInstance Set.
       return ;
    }

    
schedule the Heartbeat task and Health Check task.
    private void scheduleTasks() {
        int rhDelay = getServiceDirectoryConfig().getInt(
                ,
                );
        int rhInterval = getServiceDirectoryConfig().getInt(
                ,
                );
        .info(
                "Start the SD API RegistryHealth Task scheduler, delay={}, interval={}.",
                rhDelayrhInterval);
                rhInterval.);
                );
        int hbInterval = getServiceDirectoryConfig().getInt(
                ,
                );
        .info(
                "Start the SD API Heartbeat Task scheduler, delay={}, interval={}.",
                hbDelayhbInterval);
        .scheduleAtFixedRate(new HeartbeatTask(), hbDelay,
                hbInterval.);
    }

    
The ServiceInstanceHealth checking task.
    private class HealthCheckTask implements Runnable {
        @Override
        public void run() {
            .lock();
            try {
                
                .debug("Kick off the HealthCheckTask thread");
                for (CachedProviderServiceInstance ist : getCacheServiceInstances().values()) {
                    if (ist.getServiceInstanceHealth() == null) {
                        continue;
                    }
                   
                    .debug(
                            "Check the Health for service={}, providerId={}.",
                            ist.getServiceName(), ist.getProviderId());
                    ist.isHealth = ist.getServiceInstanceHealth().isHealthy();
                }
            } catch (Exception e) {
                .error("ServiceInstanceHealth callback check failed."e);
            } finally{
                .unlock();
            }
        }
    }

    
The heartbeat task.
    private class HeartbeatTask implements Runnable {
        @Override
        public void run() {
            .lock();
            try {
                .debug("Kick off the heartbeat thread");
                List<ServiceInstanceHeartbeatserviceHBList = new ArrayList<ServiceInstanceHeartbeat>();
                for (CachedProviderServiceInstance cachedInstance : getCacheServiceInstances().values()) {
                    .debug("Service instance: {}."cachedInstance.toString());
                    if (cachedInstance.monitorEnabled && ..equals(cachedInstance.status)
                            && cachedInstance.isHealth) {
                        ServiceInstanceHeartbeat hb = new ServiceInstanceHeartbeat(
                                cachedInstance.getServiceName(),
                                cachedInstance.getProviderId());
                        serviceHBList.add(hb);
                    }
                }
                .debug(
                        "Send heartbeat for ServiceInstances, ServiceInstanceNumber={}.",
                        serviceHBList.size());
                if (serviceHBList.isEmpty()) {
                    return;
                }
                Map<StringServiceInstanceHeartbeatheartbeatMap = new HashMap<StringServiceInstanceHeartbeat>();
                for (ServiceInstanceHeartbeat instance : serviceHBList) {
                    String id = instance.getServiceName() + "-"
                            + instance.getProviderId();
                    heartbeatMap.put(idinstance);
                }
                Map<StringOperationResult<String>> operateResult = getServiceDirectoryClient()
                        .sendHeartBeat(heartbeatMap);
                if (operateResult != null) {
                    for (Entry<StringOperationResult<String>> entry : operateResult
                            .entrySet()) {
                        boolean result = entry.getValue().getResult();
                        if (result == false) {
                            ServiceInstanceHeartbeat instance = heartbeatMap
                                    .get(entry.getKey());
                            .error(
                                    "Send heartbeat failed, serviceName={}, providerId={}. {}.",
                                    new Object[] {
                                            instance.getServiceName(),
                                            instance.getProviderId(),
                                            entry.getValue().getError().getErrorMessage() });
                  }
                    }
                } else {
                    .error("No heartbeat response from Directory Server.");
                }
            } catch (Exception e) {
                .error("Send heartbeat failed."e);
            } finally{
                .unlock();
            }
        }
    }

    
The cached ProviderServiceInstance for the ServiceInstanceHealth and heartbeat.
    private static class CachedProviderServiceInstance {
        
The ServiceName of ProvidedServiceInstance.
        private final String serviceName;

        
The providerId of ProvidedServiceInstance.
        private final String providerId;

        
Whether the instance enabled Monitor in Service Directory.
        private boolean monitorEnabled = true;

        
The instance OperationalStatus.
        private OperationalStatus status;

        
The ServiceInstanceHealth callback of the ProvidedServiceInstance.
        private ServiceInstanceHealth healthCallback;

        
Store the ServiceInstanceHealth callback result. If the ServiceInstanceHealth is null, default to true;
        private boolean isHealth = true;

        
Constructor

Parameters:
serviceInstance the ProvidedServiceInstance.
        public CachedProviderServiceInstance(
                ProvidedServiceInstance serviceInstance) {
            this. = serviceInstance.getServiceName();
            this. = serviceInstance.getProviderId();
            this. = serviceInstance.isMonitorEnabled();
            this. = serviceInstance.getStatus();
        }

        
Set the ServiceInstanceHealth.

Parameters:
healthCallback the ServiceInstanceHealth.
        public void setServiceInstanceHealth(
                ServiceInstanceHealth healthCallback) {
            this. = healthCallback;
            this. = true;
        }

        
Get the ServiceInstanceHealth.

Returns:
the ServiceInstanceHealth callback.
            return ;
        }

        
Get the service name.

Returns:
the service name.
        public String getServiceName() {
            return ;
        }

        
Get the providerId.

Returns:
the providerId.
        public String getProviderId() {
            return ;
        }

        
Get the OperationalStatus.

Returns:
the OperationalStatus.
        public OperationalStatus getStatus() {
            return ;
        }

        
Set the OperationalStatus.

Parameters:
status the OperationalStatus.
        public void setStatus(OperationalStatus status) {
            this. = status;
        }

        
check whether it is monitor-enabled service instance

Returns:
true if monitor enabled.
        public boolean isMonitorEnabled() {
            return ;
        }

        
Set the service to be monitored.

Parameters:
monitor true or false.
        public void setMonitorEnabled(boolean monitor) {
            this. = monitor;
        }
        @Override
        public String toString() {
            return "serviceName=" +  + ", providerId=" +  + ", status=" +  +", monitor=" +  + ", isHealth=" + ;
        }
    }
    private static class ServiceInstanceId{
        private String serviceName;
        private String providerId;
        public ServiceInstanceId(String serviceNameString providerId){
            this. = serviceName;
            this. = providerId;
        }
        @Override
        public String toString() {
            return "serviceName=" +  + ", providerId=" + ;
        }
        @Override
        public boolean equals(Object obj) {
            if (obj != null && obj instanceof ServiceInstanceId) {
                ServiceInstanceId instance = (ServiceInstanceIdobj;
                return (instance.serviceName.equals() && instance.providerId
                        .equals());
            }
            return false;
        }
        @Override
        public int hashCode() {
            int result =  != null ? .hashCode() : 0;
            result = 31 * result +  != null ? .hashCode()
                    : 0;
            return result;
        }
    }
New to GrepCode? Check out our FAQ X