Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
Copyright 2014 Cisco Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
 
 
 
 
 
 package com.cisco.oss.foundation.directory.impl;
 
 import java.util.List;
 import java.util.Map;
 
 
 import static com.cisco.oss.foundation.directory.ServiceDirectory.getServiceDirectoryConfig;
 import static com.cisco.oss.foundation.directory.utils.JsonSerializer.*;

It is the DirectoryLookupService with client-side Cache. It caches ServiceInstance for quick lookup and provides the cache sync function to sync the latest changes of the cached ServiceInstances.
 
 public class CachedDirectoryLookupService extends DirectoryLookupService implements Stoppable {
 
     private static final Logger LOGGER = LoggerFactory
             .getLogger(CachedDirectoryLookupService.class);
 
     // Set this log to DEBUG to enable Service Cache dump in LookupManager.
     // It will dump the whole ServiceCache to log file when the Logger Changed first time,
     // and every time the Service Cache has new update.
     private static final Logger CacheDumpLogger = LoggerFactory.getLogger("com.cisco.oss.foundation.directory.cache.dump");

    
The LookupManager cache sync executor kickoff delay time property name in seconds.
 
     public static final String SD_API_CACHE_SYNC_DELAY_PROPERTY = "com.cisco.oss.foundation.directory.cache.sync.delay";

    
The default delay time of LookupManager cache sync executor kickoff.
 
     public static final int SD_API_CACHE_SYNC_DELAY_DEFAULT = 1;

    
The LookupManager cache sync interval property name in seconds.
 
     public static final String SD_API_CACHE_SYNC_INTERVAL_PROPERTY = "com.cisco.oss.foundation.directory.cache.sync.interval";

    
The default LookupManager cache sync interval value.
 
     public static final int SD_API_CACHE_SYNC_INTERVAL_DEFAULT = 10;


    
ScheduledExecutorService to sync cache.
 
     private final ScheduledExecutorService syncService;

    
Internal cache that maps the service name to a list of service instances.
 
     private final ConcurrentHashMap<StringModelServicecache = new ConcurrentHashMap<String,ModelService>();

    
Internal cache that maps the metadata key name to a list of service instances.
            = new ConcurrentHashMap<StringModelMetadataKey>();

    
Mark whether component is started.
    private final AtomicBoolean isStarted = new AtomicBoolean(false);

    
Constructor.

Parameters:
directoryServiceClient the DirectoryServiceClient.
    public CachedDirectoryLookupService(DirectoryServiceClient directoryServiceClient) {
        super(directoryServiceClient);
         = Executors
                .newSingleThreadScheduledExecutor(new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setName("SD_Cache_Sync_Task");
                        t.setDaemon(true);
                        return t;
                    }
                });
    }

    
Start the CachedDirectoryLookupService. It is thread safe.
    @Override
    public void start(){
        if (.compareAndSet(false,true)){
            initCacheSyncTask();
        }
    }

    
Stop the CachedDirectoryLookupService. It is thread safe.
    @Override
    public void stop(){
        if (.compareAndSet(true,false)) {
            this..shutdown();
            getCache().clear();
            getMetadataKeyCache().clear();
        }
    }

    
Get the ModelService. It will query the cache first. If not found in the cache, the service will be added to the cache.

Parameters:
serviceName the Service name.
Returns:
the ModelService.
    @Override
    protected ModelService getModelService(String serviceName){
        ModelService service = getCache().get(serviceName);
        if (service == null) {
            getCache().putIfAbsent(serviceName,super.getModelService(serviceName));
            service = getCache().get(serviceName);
        }
        return service;
    }

    
Get ModelMetadataKey, which is an object holding a list of service instances that contain the key name in the service metadata. It will query the cache first. If no match is found in the cache, the metadata key will be added to the cache.

Parameters:
keyName the metadata key name.
Returns:
the ModelMetadataKey.
    @Override
    protected ModelMetadataKey getModelMetadataKey(String keyName){
        ModelMetadataKey key = getMetadataKeyCache().get(keyName);
        if (key == null) {
            getMetadataKeyCache().putIfAbsent(keyNamesuper.getModelMetadataKey(keyName));
            key = getMetadataKeyCache().get(keyName);
        }
        return key;
    }

    
initialization of the CacheSyncTask
    private void initCacheSyncTask(){
        int delay = getServiceDirectoryConfig().getInt(
                ,
                );
        int interval = getServiceDirectoryConfig().getInt(
                ,
                );
                delayinterval.);
    }

    
Get the ServiceDirectoryCache that caches metadata key map. It is thread safe.

Returns:
the ServiceDirectoryCache.
        return ;
    }

    
Get the ServiceDirectoryCache that caches Services. It is thread safe.

Returns:
the ServiceDirectoryCache.
        return this.;
    }

    
Get the ModelService List for cache sync. The ModelService doesn't contain the referenced ModelServiceInstances.

Returns:
the ModelService List.
        List<ModelServiceallServices = new ArrayList<ModelService>();
        allServices.addAll(this..values());
        List<ModelServicesyncServices = new ArrayList<ModelService>();
        for(ModelService service : allServices){
            ModelService syncService = new ModelService(service.getName(), service.getId(), service.getCreateTime());
            syncServices.add(syncService);
        }
        return syncServices;
    }

    
Get the ModelMetadataKey List for cache sync. The ModelMetadataKey doesn't contain the referenced ModelServiceInstances.

Returns:
the ModelMetadataKey List.
        List<ModelMetadataKeyallKeys = new ArrayList<ModelMetadataKey>();
        allKeys.addAll(this..values());
        List<ModelMetadataKeysyncKeys = new ArrayList<ModelMetadataKey>();
        for(ModelMetadataKey service : allKeys){
            ModelMetadataKey syncKey = new ModelMetadataKey(service.getName(), service.getId(), service.getModifiedTime(), service.getCreateTime());
            syncKeys.add(syncKey);
        }
        return syncKeys;
    }

    
Get the changed list for the MetadataKey from the server.

Parameters:
keyMap the MetadataKey list.
Returns:
the ModelMetadataKey list that has been changed.
        return this.getDirectoryServiceClient().getChangedMetadataKeys(keyMap);
    }

    
Get the changed Services list from the server.

Parameters:
serviceMap the Service list.
Returns:
the Service list that has been changed.
Throws:
com.cisco.oss.foundation.directory.exception.ServiceException
        return this.getDirectoryServiceClient().getChangedServices(serviceMap);
    }

    
Dump the ServiceCache to CacheDumpLogger Logger.

Returns:
true if dump complete.
    private boolean dumpCache(){
        if (.isDebugEnabled()) {
            try {
                List<ModelServiceservices = new ArrayList<ModelService>();
                services.addAll(getCache().values());
                StringBuilder sb = new StringBuilder();
                sb.append("LookupManager dumpped Service Cache at: ").append(System.currentTimeMillis()).append("\n");
                for (ModelService service : services) {
                    sb.append(new String(serialize(service))).append("\n");
                }
                .debug(sb.toString());
            } catch (Exception e) {
                .warn("Dump Service Cache failed. Set Logger {} to INFO to disable this message.",
                            .getName());
                if (.isTraceEnabled()) {
                    .trace("Dump Service Cache failed. "e);
                }
                return false;
            }
            return true;
        }
        return false;
    }

    
The Runnable for Cache Sync.
    private class CacheSyncTask implements Runnable{
        private boolean lastCacheDump = false;
        public CacheSyncTask(){
        }
        @Override
        public void run() {
            try{
                List<ModelMetadataKeykeys = .getAllMetadataKeysForSync();
                if(keys.size() > 0){
                    Map<StringModelMetadataKeykeyMap = new HashMap<StringModelMetadataKey>();
                    for(ModelMetadataKey key : keys){
                        keyMap.put(key.getName(), key);
                    }
                    Map<StringOperationResult<ModelMetadataKey>> deltaKeys = .getChangedMetadataKeys(keyMap);
                    if(deltaKeys != null){
                        for(Entry<StringOperationResult<ModelMetadataKey>> deltaKey : deltaKeys.entrySet()){
                            String keyName = deltaKey.getKey();
                            OperationResult<ModelMetadataKeyresult = deltaKey.getValue();
                            if(result.getResult()){
                                ModelMetadataKey newKey = result.getobject();
                                if(newKey != null){
                                    .getMetadataKeyCache().put(keyNamenewKey);
                                    .info("Update the ModelMetadataKey in cache, keyName={}."keyName);
                                }
                            } else {
                                .error("Cache sync ModelMetadataKey failed, keyName={}. {}.",
                                            keyNameresult.getError().getErrorMessage());
                            }
                        }
                    } else {
                        .debug("No MetadataKey is changed.");
                    }
                } else {
                    .debug("No MetadataKey in the cache, skip cache sync.");
                }
            }catch(Exception e){
                .error("Sync ModelMetadataKey cache from ServiceDirectory Server failed. "e);
            }
            try{
                boolean cacheUpdated = false;
                List<ModelServiceservices = .getAllServicesForSync();
                Map<StringModelServiceserviceMap = new HashMap<StringModelService>();
                for(ModelService service : services){
                    serviceMap.put(service.getName(), service);
                }
                Map<StringOperationResult<ModelService>> deltaSvcs = .getChangedServices(serviceMap);
                if(deltaSvcs != null){
                    for(Entry<StringOperationResult<ModelService>> deltaService : deltaSvcs.entrySet()){
                        String serviceName = deltaService.getKey();
                        OperationResult<ModelServiceresult = deltaService.getValue();
                        if(result.getResult()){
                            ModelService newService = result.getobject();
                            ModelService oldService = .getCache().get(serviceName);
                            if(newService != null){
                                cacheUpdated = true;
                                .getCache().put(serviceNamenewService);
                                .info("Update the ModelService in cache, serviceName={}."serviceName );
                            }
                            onServiceChanged(newServiceoldService);
                        } else {
                            .error("Cache sync ModelService failed, serviceName={}. {}.",
                                            serviceNameresult.getError().getErrorMessage());
                        }
                    }
                } else {
                    .debug("No Service is changed.");
                }
                if (cacheUpdated ||  == false) {
                     = .dumpCache();
                }
            }catch(Exception e){
                .error("Sync ModelService cache from ServiceDirectory Server failed. "e);
            }
        }
        
        private void onServiceChanged(ModelService newServiceModelService oldService){
            if(newService == null || oldService == null || newService == oldService){
                return;
            }
            List<ModelServiceInstanceoldInstances = oldService.getServiceInstances();
            List<ModelServiceInstancenewInstances = newService.getServiceInstances();
            if(newInstances == null || newInstances.size() == 0){
                if(oldInstances != null){
                    for(ModelServiceInstance model : oldInstances){
                        if (model.getStatus().equals(.)) {
                            //Change the status to DOWN before the notification when unregistering a running instance
                            model.setStatus(.);
                            .onServiceInstanceUnavailable(ServiceInstanceUtils.toServiceInstance(model));
                        }
                    }
                }
            } else {
                if(oldInstances == null || oldInstances.size() == 0){
                    for(ModelServiceInstance model : newInstances){
                        if (model.getStatus().equals(.)) {
                            .onServiceInstanceAvailable(ServiceInstanceUtils.toServiceInstance(model));
                        }
                    }
                } else {
                    // Loop through all instances (added, deleted, changed) and send the proper notifications
                    // Can not operate directly on newInstances or oldIntances since it will remove the item from cache
                    List<ModelServiceInstancenewTmp = new ArrayList<ModelServiceInstance>();
                    List<ModelServiceInstanceoldTmp = new ArrayList<ModelServiceInstance>();
                    for (ModelServiceInstance model : oldInstances) {
                        oldTmp.add(model);
                    }
                    for (ModelServiceInstance model : newInstances) {
                        newTmp.add(model);
                    }
                    
                    Iterator<ModelServiceInstanceitnew = newTmp.iterator();
                    Iterator<ModelServiceInstanceitold = oldTmp.iterator();
                    ModelServiceInstance curnew = null;
                    ModelServiceInstance curold = null;
                    while (itnew.hasNext()) {
                        curnew = itnew.next();
                        while (itold.hasNext()) {
                            curold = itold.next();
                            if (curnew != null && curold != null && curnew.getInstanceId().equals(curold.getInstanceId())) {
                                
                                if(curnew.getStatus().equals(.) && curold.getStatus().equals(.)) { 
                                    .onServiceInstanceAvailable(ServiceInstanceUtils.toServiceInstance(curnew));
                                } 
                                if (curnew.getStatus().equals(.) && curold.getStatus().equals(.)) {
                                    .onServiceInstanceUnavailable(ServiceInstanceUtils.toServiceInstance(curnew));
                                }
                                // Check if the service instance metadata has been changed
                                if (curnew.getMetadata() != null && curold.getMetadata() != null && !curnew.getMetadata().equals(curold.getMetadata())) {
                                    .onServiceInstanceChanged(ServiceInstanceUtils.toServiceInstance(curnew));
                                }
                                
                                itnew.remove();
                                itold.remove();
                            }
                        }
                    }
                    
                    for (ModelServiceInstance model : oldTmp) {
                        if (model.getStatus().equals(.)) {
                            //Change the status to DOWN before the notification when unregistering a running instance
                            model.setStatus(.);
                            .onServiceInstanceUnavailable(ServiceInstanceUtils.toServiceInstance(model));
                        }
                    }
                    
                    for (ModelServiceInstance model : newTmp) {
                        if (model.getStatus().equals(.)) {
                            .onServiceInstanceAvailable(ServiceInstanceUtils.toServiceInstance(model));
                        }
                    }
                }
            }
        }
    }
New to GrepCode? Check out our FAQ X