Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  package com.fasterxml.clustermate.service.cleanup;
  
  import java.util.List;
  
 
Helper class that handles details of background cleanup processing, related to data storage and expiration.
 
 public class CleanerUpper<K extends EntryKey, E extends StoredEntry<K>>
     implements RunnableStartAndStoppable
 {
     private final Logger LOG;
 
     /*
     /**********************************************************************
     /* Configuration
     /**********************************************************************
      */
 
     protected final SharedServiceStuff _stuff;
     protected final TimeSpan _delayBetweenCleanups;
     
     /*
     /**********************************************************************
     /* Related helper objects
     /**********************************************************************
      */
 
     protected final TimeMaster _timeMaster;
 
     protected final Stores<K,E> _stores;
    
    
Object that keeps track of observed cluster state.
 
     protected ClusterViewByServer _cluster;
 
     /*
     /**********************************************************************
     /* Other state
     /**********************************************************************
      */

    
Flag used for communicating need to shut down processing.
 
     protected final AtomicBoolean _shutdown = new AtomicBoolean(false);

    
State flag used to indicate clean shutdown.
 
     protected final AtomicBoolean _completed = new AtomicBoolean(false);

    
Marker we use to simplify checks for "is cleaner upper running right now".
 
     protected final AtomicBoolean _isRunning = new AtomicBoolean(false);

    
And for testing purposes let's also keep track of run count.
 
     protected final AtomicInteger _runCount = new AtomicInteger(0);
    
    
Actual clean up thread we use
 
     protected Thread _thread;
    
    
Timestamp of next time that cleanup tasks get triggered.
 
     protected AtomicLong _nextStartTime = new AtomicLong(0L);
    
    
Reference to currently active cleanup component; used for producing status descriptions.
 
     protected AtomicReference<CleanupTask<?>> _currentTask = new AtomicReference<CleanupTask<?>>();
 
     protected final CleanupTask<?>[] _tasks;
     
     /*
     /**********************************************************************
    /* Life-cycle
    /**********************************************************************
     */
    public CleanerUpper(SharedServiceStuff stuffStores<K,E> stores,
            ClusterViewByServer cluster,
            List<CleanupTask<?>> tasks)
    {
        this(stuffstoresclustertasksnull);
    }
    public CleanerUpper(SharedServiceStuff stuffStores<K,E> stores,
            ClusterViewByServer cluster,
            List<CleanupTask<?>> tasks,
            Logger logger)
    {
        if (logger == null) {
            logger = LoggerFactory.getLogger(getClass());
        }
         = logger;
         = stuff;
         = stuff.getTimeMaster();
         = stores;
         = cluster;
        // Important: start with LocalEntryCleaner (to try to avoid dangling files),
        // then do FileCleaner
         = tasks.toArray(new CleanupTask[tasks.size()]);
        for (CleanupTask<?> task : ) {
            task.init();
        }
    }
    
    @Override
    public synchronized void start()
    {
        if ( != null) {
            throw new IllegalStateException("CleanerUpper.start() called second time");
        }
        // let's wait 50% of minimum delay; typically 30 minutes
//        long delayMsecs = (_delayBetweenCleanups.toMilliseconds() / 2);
        long delayMsecs = 5000L;
        
        .set(.currentTimeMillis() + delayMsecs);
         = new Thread(this);
        .setDaemon(true);
        .start();
    }
    @Override
    public void prepareForStop() {
        // Let's mark the fact we are shutting down, but not yet terminate
        // on-going processes.
        // NOTE! Tasks are also given this marker so that they also know when
        // to bail out
        .set(true);
        CleanupTask<?> curr = .get();
        if (curr != null) {
            curr.prepareForStop();
        }
    }
    @Override
    public void stop()
    {
        if (!.getAndSet(true)) {
            .warn("CleanerUpper's 'shutdown' flag was not set when 'stop()' called; setting it now");
        }
        
        if (!.get() && ( != null)) {
            CleanupTask<?> curr = .get();
            // with actual task running, need to be bit more careful?
            if (curr != null) {
                .warn("CleanerUpper not complete when stop() called: will wait a bit first");
                boolean complete = false;
                long start = System.currentTimeMillis();
                // up to 3 seconds only
                try {
                    for (int i = 0; !complete && i < 60; ++i) {
                        Thread.sleep(50L);
                        complete = .get();
                    }
                } catch (InterruptedException e) { }
                if (!complete) {
                    long msecs = System.currentTimeMillis() - start;
                    curr = .get();
                    String desc = (curr == null) ? "NONE" : curr.toString();
                    .warn("CleanerUpper task '{}' still not complete after {} msec wait: will try Thread.interrupt() as last resort",
                            descmsecs);
                    .interrupt();
                }
            } else { // otherwise... should be fine
                .warn("CleanerUpper not complete when stop() called, but no task running: will try Thread.interrupt()");
                .interrupt();
            }
        }
    }
    /*
    /**********************************************************************
    /* Main loop
    /**********************************************************************
     */
    @Override
    public void run()
    {
        try {
            while (!.get()) {
                _runOnce();
            }
        } finally {
            .set(true);
        }
    }
    
    protected void _runOnce()
    {
        long delayMsecs = .get() - .currentTimeMillis();
        if (delayMsecs > 0L) {
            .info("Waiting up to {} until running cleanup tasks...", TimeMaster.timeDesc(delayMsecs));
            try {
                .sleep(delayMsecs);
            } catch (InterruptedException e) {
                final long msecs = .get() - .currentTimeMillis();
                .info("Interruped during wait for next runtime ({} before next run)", TimeMaster.timeDesc(msecs));
                return;
            }
        }
        final long startTime = .currentTimeMillis();
        // ok, run...
        .info("Starting cleanup tasks ({})".);
        .set(startTime + .getMillis());
        .set(true);
        for (CleanupTask<?> task : ) {
            if (.get()) {
//                if (!_stuff.isRunningTests()) {
                    .info("Stopping cleanup tasks due to shutdown");
//                }
                break;
            }
            .set(task);
            try {
                Object result = task.cleanUp();
                long took = .currentTimeMillis() - startTime;
                .info("Clean up task {} complete in {}, result: {}",
                        task.getClass().getName(), TimeMaster.timeDesc(took), result);
            } catch (Exception e) {
                .warn("Problems running clean up task of type "+task.getClass().getName()+": "+e.getMessage(), e);
            } finally {
                .set(null);
            }
        }
        .set(false);
        .addAndGet(1);
        long tookAll = .currentTimeMillis() - startTime;
        .info("Completed running of clean up tasks in {}", TimeMaster.timeDesc(tookAll));
    }
    /*
    /**********************************************************************
    /* Other methods
    /**********************************************************************
     */
    public boolean isRunning() {
        return .get();
    }
    public int getRunCount() {
        return .get();
    }
    
    
Method is overridden to provide somewhat readable description of current state, to be served externally.
    @Override
    public String toString()
    {
        CleanupTask<?> task = .get();
        if (task != null) {
            return "Current task: "+task.toString();
        }
        long msecs = .get() - .currentTimeMillis();
        if (msecs < 0L) {
            msecs = 0L;
        }
        return "Waiting for "+TimeMaster.timeDesc(msecs)+" until next cleanup round";
    }
New to GrepCode? Check out our FAQ X