Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /* This Source Code Form is subject to the terms of the Mozilla Public
   * License, v. 2.0. If a copy of the MPL was not distributed with this file,
   * You can obtain one at http://mozilla.org/MPL/2.0/. */
  
  package pt.webdetails.cda.cache;
  
 
 
 import  org.apache.commons.logging.Log;
 import  org.apache.commons.logging.LogFactory;
 import  org.pentaho.platform.engine.core.system.PentahoSystem;
 
 
 import  com.hazelcast.core.EntryEvent;
 import  com.hazelcast.core.EntryListener;
 import  com.hazelcast.core.Hazelcast;
 import  com.hazelcast.core.HazelcastInstance;
 import  com.hazelcast.core.IMap;
 import  com.hazelcast.core.LifecycleService;
 import  com.hazelcast.core.MapEntry;
 import  com.hazelcast.impl.base.DataRecordEntry;
 import  com.hazelcast.query.SqlPredicate;
 
Hazelcast implementation of CDA query cache
 
 public class HazelcastQueryCache extends ClassLoaderAwareCaller implements IQueryCache {
 
   private static final Log logger = LogFactory.getLog(HazelcastQueryCache.class);
   
   public static final String MAP_NAME = "cdaCache";
   public static final String AUX_MAP_NAME = "cdaCacheStats";
   
   private static final String PLUGIN_PATH = "system/" + . + "/";
   private static final String CACHE_CFG_FILE_HAZELCAST = "hazelcast.xml";
 
   private static final String GROUP_NAME = "cdc";
   private static HazelcastInstance hzInstance;
   private static LifecycleService lifeCycleService;
   
   private static long getTimeout = CdaPropertiesHelper.getIntProperty("pt.webdetails.cda.cache.getTimeout", 5);
 //  private static long putTimeout = CdaPropertiesHelper.getIntProperty("pt.webdetails.cda.cache.putTimeout", 5);
   private static TimeUnit timeoutUnit = .;
 //max consecutive timeouts
   private static int maxTimeouts = CdaPropertiesHelper.getIntProperty("pt.webdetails.cda.cache.maxTimeouts", 4);
   private static long cacheDisablePeriod = CdaPropertiesHelper.getIntProperty("pt.webdetails.cda.cache.disablePeriod", 5);
   private static boolean debugCache = CdaPropertiesHelper.getBoolProperty("pt.webdetails.cda.cache.debug"true);
 
   
   private static int timeoutsReached = 0;
   private static boolean active=true;
  
  

Returns:
main cache (will hold actual values)
 
   private static IMap<TableCacheKeyTableModelgetCache(){
     return getHazelcast().getMap();
   }
  
  

Returns:
used for holding extra info
 
   private static IMap<TableCacheKeyExtraCacheInfogetCacheStats(){
     return getHazelcast().getMap();
   }
   
   private static synchronized HazelcastInstance getHazelcast(){
     if( == null || !.isRunning()){
       .debug("finding hazelcast instance..");
       for(HazelcastInstance instance : Hazelcast.getAllHazelcastInstances()){
         .debug("found hazelcast instance [" + instance.getName() + "]");
         if(instance.getConfig().getGroupConfig().getName().equals()){
           .info( + " hazelcast instance found: [" + instance.getName() + "]");
            = instance;
            = .getLifecycleService();
           break;
         }
       }
       
       if( == null){
         .fatal("No valid hazelcast instance found.");
       }
    }
    return ;
  }
  
  public HazelcastQueryCache(){
    this(PentahoSystem.getApplicationContext().getSolutionPath( + ),true);
  }
  
  private HazelcastQueryCache(String cfgFileboolean superClient){
    super(Thread.currentThread().getContextClassLoader());
    init();
  }
  
  private static int incrTimeouts(){
    return ++;
  }
  //no problem if unsynched
  private static int resetTimeouts(){
    return =0;
  }
  
    
  private static void init()
  {  
    
    .info("CDA CDC Hazelcast INIT");
    //sync cache removals with cacheStats
    ClassLoader cdaPluginClassLoader = Thread.currentThread().getContextClassLoader();
    SyncRemoveStatsEntryListener syncRemoveStats = new SyncRemoveStatsEntryListenercdaPluginClassLoader );
    
    IMap<TableCacheKeyTableModelcache = getCache();
    
    cache.removeEntryListener(syncRemoveStats);
    cache.addEntryListener(syncRemoveStatsfalse);
    
    if(){
      .debug("Added logging entry listener");
      cache.addEntryListener(new LoggingEntryListener(cdaPluginClassLoader), false);
    }
  }
  
  public void shutdownIfRunning()
  {
    //let cdc handle this
  }
  
  public void putTableModel(TableCacheKey keyTableModel tableint ttlSecExtraCacheInfo info) {
    getCache().putAsync(keytable);
    info.setEntryTime(System.currentTimeMillis());
    info.setTimeToLive(ttlSec*1000);
    getCacheStats().putAsync(keyinfo);    
  }
  
  private <K,V> V getWithTimeout(K key, IMap<K,V> map){
    if(!return null;
    Future<V> future = map.getAsync(key);
    try{
      V result = future.get();
      resetTimeouts();
      return result;
    }
    catch(TimeoutException e){
      int nbrTimeouts = incrTimeouts();
      checkNbrTimeouts(nbrTimeouts);
      .error("Timeout " +  + " " +   + " expired fetching from " + 
          map.getName() + " (timeout#" + nbrTimeouts + ")" );
    } catch (InterruptedException e) {
      .error(e);
    } catch (ExecutionException e) {
      .error(e);
    }
    return null;
  }
  
  private void checkNbrTimeouts(int nbrTimeouts){
    if(nbrTimeouts > 0 && nbrTimeouts %  == 0){
      //too many consecutive timeouts may mean hazelcast is gettint more requests
      //than it can handle, give it some space
      .error("Too many timeouts, disabling for " +  + " seconds.");
      resetTimeouts();
       = false;
      new Thread(new Runnable(){
        @Override
        public void run() {
          try {
            Thread.sleep( * 1000);
          } catch (InterruptedException e) {
            .error(e);
          } catch (Exception e){
            .error(e);
          }
           = true;
        }
        
      }).start();
    }
  }
  
    try
    {
      ExtraCacheInfo info = getWithTimeout(keygetCacheStats());
      if(info != null)
      {
        //per instance ttl not supported by hazelcast, need to check manually
        if(info.getTimeToLive() > 0 && (info.getTimeToLive() + info.getEntryTime()) < System.currentTimeMillis())
        {
          .info("Cache element expired, removing from cache.");
          getCache().removeAsync(key);
          return null;
        }
        else {
          TableModel tm = getWithTimeout(keygetCache());
          if(tm == null) {
            .error("Cache stats out of sync! Removing element.");
            getCacheStats().removeAsync(key);
            return null;
          }
          .info("Table found in cache. Returning.");
          return tm;
        }
      }
      return null;
    } 
    catch(ClassCastException e)
    {//handle issue when map would return a dataRecordEntry instead of element type//TODO: hasn't been caught in a while, maybe we can drop this
      Object obj = getCache().get(key);
      .error("Expected TableModel in cache, found " + obj.getClass().getCanonicalName() + " instead.");
      if(obj instanceof DataRecordEntry)
      {
        DataRecordEntry drEntry = (DataRecordEntry) obj;
        .info("Cache holding DataRecordEntry, attempting recovery");
        Object val = drEntry.getValue();
        
        if(val instanceof TableModel)
        {
          TableModel tm = (TableModelval;
          .warn("TableModel found in record, attempting to replace cache entry..");
          getCache().replace(keytm);
          .info("Cache entry replaced.");
          return tm;
        }
        else {
          .error("DataRecordEntry in cache has value of unexpected type " + obj.getClass().getCanonicalName());
          .warn("Removing incompatible cache entry.");
          getCache().remove(key);
        }
      }
      return null;
    }
    catch (Exception e){
      if(e.getCause() instanceof IOException
      {//most likely a StreamCorruptedException
        .error("IO error while attempting to get key " + key + "(" + e.getCause().getMessage() + "), removing from cache!"e);
        getCache().removeAsync(key);
        return null;
      }
      else .error("Unexpected exception "e);
      return null;
    }
  }
  
  public void clearCache() {
    getCache().clear();
    getCacheStats().clear();
  }
  public boolean remove(TableCacheKey key) {
    return getCache().remove(key) != null;
  }
  public Iterable<TableCacheKeygetKeys() {
    return getCache().keySet();
  }
  
  public Iterable<TableCacheKeygetKeys(String cdaSettingsIdString dataAccessId)
  {
    return getCacheStats().keySet(new SqlPredicate("cdaSettingsId = " + cdaSettingsId + " AND dataAccessId = " + dataAccessId));   
  }
  
    return  getCacheStats().get(key);
  }
  
  
Synchronizes both maps' removals and evictions
  private static final class SyncRemoveStatsEntryListener extends ClassLoaderAwareCaller implements EntryListener<TableCacheKeyTableModel>  {
    
    public SyncRemoveStatsEntryListener(ClassLoader classLoader){
      super(classLoader); 
    }
    
    @Override
    public void entryAdded(EntryEvent<TableCacheKeyTableModelevent) {}//ignore
    @Override
    public void entryUpdated(EntryEvent<TableCacheKeyTableModelevent) {}//ignore
    
    @Override
    public void entryRemoved(final EntryEvent<TableCacheKeyTableModelevent
    {
      runInClassLoader(new Runnable(){
      
        public void run(){
          TableCacheKey key = event.getKey();
          .debug("entry removed, removing stats for query " + key);
          getCacheStats().remove(key);
        }
        
      });
    }
    @Override
    public void entryEvicted(final EntryEvent<TableCacheKeyTableModelevent) {
      runInClassLoader(new Runnable(){
        
        public void run(){
          TableCacheKey key = event.getKey();
          .debug("entry evicted, removing stats for query " + key);
          getCacheStats().remove(key);
        }
        
      });
    }
    
    //used for listener removal
    @Override
    public boolean equals(Object other){
      return other instanceof SyncRemoveStatsEntryListener;
    }
  }
  
  static class LoggingEntryListener extends ClassLoaderAwareCaller implements EntryListener<TableCacheKeyTableModel> {
    
    private static final Log logger = LogFactory.getLog(HazelcastQueryCache.class);
    
    public LoggingEntryListener(ClassLoader classLoader){
        super(classLoader);
    }
    
    @Override
    public void entryAdded(final EntryEvent<TableCacheKeyTableModelevent) {
      runInClassLoader(new Runnable() {
        public void run() {
          .debug("CDA ENTRY ADDED " + event);
        }
      });
    }
    @Override
    public void entryRemoved(final EntryEvent<TableCacheKeyTableModelevent) {
      runInClassLoader(new Runnable() {
        public void run() {
          .debug("CDA ENTRY REMOVED " + event);
        }
      });
    }
    @Override
    public void entryUpdated(final EntryEvent<TableCacheKeyTableModelevent) {
      runInClassLoader(new Runnable() {
        public void run() {
          .debug("CDA ENTRY UPDATED " + event);
        }
      });
    }
    @Override
    public void entryEvicted(final EntryEvent<TableCacheKeyTableModelevent) {
      runInClassLoader(new Runnable() {
        public void run() {
          .debug("CDA ENTRY EVICTED " + event);
        }
      });
    }
    
  }
  public int removeAll(final String cdaSettingsIdfinal String dataAccessId) {
    if(cdaSettingsId == null){
      int size = getCache().size();
      getCache().clear();
      return size;
    }
    
    try {
      return callInClassLoader(new Callable<Integer>(){
      
        public Integer call(){
          int size=0;
          Iterable<Entry<TableCacheKeyExtraCacheInfo>> entries = getCacheStatsEntries(cdaSettingsIddataAccessId);
          if(entries != nullfor(Entry<TableCacheKeyExtraCacheInfoentryentries){
            getCache().remove(entry.getKey());
            size++;
          }
          return size;
        }
      });
    } catch (Exception e) {
      .error("Error calling removeAll"e);
      return -1;
    }
  }
    ExtraCacheInfo info = getCacheStats().get(key);
    MapEntry<TableCacheKey,TableModelentry = getCache().getMapEntry(key);
    
    CacheElementInfo ceInfo = new CacheElementInfo();
    ceInfo.setAccessTime(entry.getLastAccessTime());
    ceInfo.setByteSize(entry.getCost());
    ceInfo.setInsertTime(entry.getLastUpdateTime());// getCreationTime()
    ceInfo.setKey(key);
    ceInfo.setHits(entry.getHits());
    
    ceInfo.setRows(info.getNbrRows());
    ceInfo.setDuration(info.getQueryDurationMs());
    return ceInfo;
  }
  
  
(Make sure right class loader is set when accessing the iterator)

Parameters:
cdaSettingsId
dataAccessId
Returns:
  public Iterable<ExtraCacheInfogetCacheEntryInfo(String cdaSettingsIdString dataAccessId)
  {
    return getCacheStats().values(new SqlPredicate("cdaSettingsId = " + cdaSettingsId + ((dataAccessId != null)? " AND dataAccessId = " + dataAccessId : "")));
  }

  
(Make sure right class loader is set when accessing the iterator)

Parameters:
cdaSettingsId
dataAccessId
Returns:
  public Iterable<Entry<TableCacheKeyExtraCacheInfo>> getCacheStatsEntries(final String cdaSettingsId,final String dataAccessId)
  {
    return getCacheStats().entrySet(new SqlPredicate("cdaSettingsId = " + cdaSettingsId + ((dataAccessId != null)? " AND dataAccessId = " + dataAccessId : "")));
  }
  
New to GrepCode? Check out our FAQ X