Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright (C) 2011 The Guava Authors
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   * http://www.apache.org/licenses/LICENSE-2.0
   *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
 package com.google.common.util.concurrent;
 
 
 
 import  javax.annotation.concurrent.GuardedBy;

Base class for services that can implement startUp and shutDown but while in the "running" state need to perform a periodic task. Subclasses can implement startUp, shutDown and also a runOneIteration method that will be executed periodically.

This class uses the ScheduledExecutorService returned from executor to run the startUp and shutDown methods and also uses that service to schedule the runOneIteration that will be executed periodically as specified by its Scheduler. When this service is asked to stop via stop or stopAndWait, it will cancel the periodic task (but not interrupt it) and wait for it to stop before running the shutDown method.

Subclasses are guaranteed that the life cycle methods (runOneIteration, .startUp and shutDown) will never run concurrently. Notably, if any execution of runOneIteration takes longer than its schedule defines, then subsequent executions may start late. Also, all life cycle methods are executed with a lock held, so subclasses can safely modify shared state without additional synchronization necessary for visibility to later executions of the life cycle methods.

Usage Example

Here is a sketch of a service which crawls a website and uses the scheduling capabilities to rate limit itself.
 class CrawlingService extends AbstractScheduledService {
   private Set<Uri> visited;
   private Queue<Uri> toCrawl; 
   protected void startUp() throws Exception {
     toCrawl = readStartingUris();
   
 
   protected void runOneIteration() throws Exception {
     Uri uri = toCrawl.remove();
     Collection<Uri> newUris = crawl(uri);
     visited.add(uri);
     for (Uri newUri : newUris) {
       if (!visited.contains(newUri)) { toCrawl.add(newUri); }
     }
   }
   
   protected void shutDown() throws Exception {
     saveUris(toCrawl);
   }
 
   protected Scheduler scheduler() {
     return Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS);
   }
 }}
This class uses the life cycle methods to read in a list of starting URIs and save the set of outstanding URIs when shutting down. Also, it takes advantage of the scheduling functionality to rate limit the number of queries we perform.

Author(s):
Luke Sandberg
Since:
11.0
 
 public abstract class AbstractScheduledService implements Service {
   private static final Logger logger = Logger.getLogger(AbstractScheduledService.class.getName());
  
  
A scheduler defines the policy for how the AbstractScheduledService should run its task.

Consider using the newFixedDelaySchedule and newFixedRateSchedule factory methods, these provide Scheduler instances for the common use case of running the service with a fixed schedule. If more flexibility is needed then consider subclassing CustomScheduler.

Author(s):
Luke Sandberg
Since:
11.0
  public abstract static class Scheduler {
    
Returns a Scheduler that schedules the task using the ScheduledExecutorService.scheduleWithFixedDelay method.

Parameters:
initialDelay the time to delay first execution
delay the delay between the termination of one execution and the commencement of the next
unit the time unit of the initialDelay and delay parameters
    public static Scheduler newFixedDelaySchedule(final long initialDelayfinal long delay
        final TimeUnit unit) {
      return new Scheduler() {
        @Override
        public Future<?> schedule(AbstractService serviceScheduledExecutorService executor,
            Runnable task) {
          return executor.scheduleWithFixedDelay(taskinitialDelaydelayunit);
        } 
      };
    }

    
Returns a Scheduler that schedules the task using the ScheduledExecutorService.scheduleAtFixedRate method.

Parameters:
initialDelay the time to delay first execution
period the period between successive executions of the task
unit the time unit of the initialDelay and period parameters
    public static Scheduler newFixedRateSchedule(final long initialDelayfinal long period
        final TimeUnit unit) {
      return new Scheduler() {
        @Override
        public Future<?> schedule(AbstractService serviceScheduledExecutorService executor,
            Runnable task) {
          return executor.scheduleAtFixedRate(taskinitialDelayperiodunit);
        }
      };
    }
    
    
Schedules the task to run on the provided executor on behalf of the service.
    abstract Future<?> schedule(AbstractService serviceScheduledExecutorService executor
        Runnable runnable);
    
    private Scheduler() {}
  }
  
  /* use AbstractService for state management */
  private final AbstractService delegate = new AbstractService() {
    
    // A handle to the running task so that we can stop it when a shutdown has been requested.
    // These two fields are volatile because their values will be accessed from multiple threads.
    private volatile Future<?> runningTask;
    private volatile ScheduledExecutorService executorService;
    
    // This lock protects the task so we can ensure that none of the template methods (startUp, 
    // shutDown or runOneIteration) run concurrently with one another.
    private final ReentrantLock lock = new ReentrantLock();
    
    private final Runnable task = new Runnable() {
      @Override public void run() {
        .lock();
        try {
          AbstractScheduledService.this.runOneIteration();
        } catch (Throwable t) {
          try {
            shutDown();
          } catch (Exception ignored) {
            .log(.
                "Error while attempting to shut down the service after failure."ignored);
          }
          notifyFailed(t);
          throw Throwables.propagate(t);
        } finally {
          .unlock();
        }
      }
    };
    
    @Override protected final void doStart() {
       = executor();
      .execute(new Runnable() {
        @Override public void run() {
          .lock();
          try {
            startUp();
             = scheduler().schedule();
            notifyStarted();
          } catch (Throwable t) {
            notifyFailed(t);
            throw Throwables.propagate(t);
          } finally {
            .unlock();
          }
        }
      });
    }
    @Override protected final void doStop() {
      .cancel(false); 
      .execute(new Runnable() {
        @Override public void run() {
          try {
            .lock();
            try {
              if (state() != .) {
                // This means that the state has changed since we were scheduled.  This implies that
                // an execution of runOneIteration has thrown an exception and we have transitioned
                // to a failed state, also this means that shutDown has already been called, so we
                // do not want to call it again.
                return;
              }
              shutDown();
            } finally {
              .unlock();
            }
            notifyStopped();
          } catch (Throwable t) {
            notifyFailed(t);
            throw Throwables.propagate(t);
          }
        }
      });
    }
  };
  
  
Constructor for use by subclasses.
  protected AbstractScheduledService() {}

  
Run one iteration of the scheduled task. If any invocation of this method throws an exception, the service will transition to the Service.State.FAILED state and this method will no longer be called.
  protected abstract void runOneIteration() throws Exception;

  
Start the service.

By default this method does nothing.

  protected void startUp() throws Exception {}

  
Stop the service. This is guaranteed not to run concurrently with runOneIteration.

By default this method does nothing.

  protected void shutDown() throws Exception {}

  
Returns the Scheduler object used to configure this service. This method will only be called once.
  protected abstract Scheduler scheduler();
  
  
Returns the ScheduledExecutorService that will be used to execute the startUp, runOneIteration and shutDown methods. If this method is overridden the executor will not be shutdown when this service terminates or fails. Subclasses may override this method to supply a custom ScheduledExecutorService instance. This method is guaranteed to only be called once.

By default this returns a new ScheduledExecutorService with a single thread thread pool that sets the name of the thread to the service name. Also, the pool will be shut down when the service terminates or fails.

    final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
        new ThreadFactory() {
          @Override public Thread newThread(Runnable runnable) {
            return MoreExecutors.newThread(serviceName(), runnable);
          }
        });
    // Add a listener to shutdown the executor after the service is stopped.  This ensures that the
    // JVM shutdown will not be prevented from exiting after this service has stopped or failed.
    // Technically this listener is added after start() was called so it is a little gross, but it
    // is called within doStart() so we know that the service cannot terminate or fail concurrently
    // with adding this listener so it is impossible to miss an event that we are interested in.
    addListener(new Listener() {
      @Override public void starting() {}
      @Override public void running() {}
      @Override public void stopping(State from) {}
      @Override public void terminated(State from) {
        executor.shutdown();
      }
      @Override public void failed(State fromThrowable failure) {
        executor.shutdown();
      }}, MoreExecutors.sameThreadExecutor());
    return executor;
  }

  
Returns the name of this service. AbstractScheduledService may include the name in debugging output.

Since:
14.0
  protected String serviceName() {
    return getClass().getSimpleName();
  }
  
  @Override public String toString() {
    return serviceName() + " [" + state() + "]";
  }
  // We override instead of using ForwardingService so that these can be final.
  @Override public final ListenableFuture<Statestart() {
    return .start();
  }
  @Override public final State startAndWait() {
    return .startAndWait();
  }
  @Override public final boolean isRunning() {
    return .isRunning();
  }
  @Override public final State state() {
    return .state();
  }
  @Override public final ListenableFuture<Statestop() {
    return .stop();
  }
  @Override public final State stopAndWait() {
    return .stopAndWait();
  }
  
  

Since:
13.0
  @Override public final void addListener(Listener listenerExecutor executor) {
    .addListener(listenerexecutor);
  }
  
  

Since:
14.0
  @Override public final Throwable failureCause() {
    return .failureCause();
  }
  
  
A Scheduler that provides a convenient way for the AbstractScheduledService to use a dynamically changing schedule. After every execution of the task, assuming it hasn't been cancelled, the getNextSchedule method will be called.

Author(s):
Luke Sandberg
Since:
11.0
 
  @Beta
  public abstract static class CustomScheduler extends Scheduler {

    
A callable class that can reschedule itself using a CustomScheduler.
    private class ReschedulableCallable extends ForwardingFuture<Voidimplements Callable<Void> {
      
      
The underlying task.
      private final Runnable wrappedRunnable;
      
      
The executor on which this Callable will be scheduled.
      private final ScheduledExecutorService executor;
      
      
The service that is managing this callable. This is used so that failure can be reported properly.
      private final AbstractService service;
      
      
This lock is used to ensure safe and correct cancellation, it ensures that a new task is not scheduled while a cancel is ongoing. Also it protects the currentFuture variable to ensure that it is assigned atomically with being scheduled.
 
      private final ReentrantLock lock = new ReentrantLock();
      
      
The future that represents the next execution of this task
      @GuardedBy("lock")
      private Future<VoidcurrentFuture;
      
          Runnable runnable) {
        this. = runnable;
        this. = executor;
        this. = service;
      }
      
      @Override
      public Void call() throws Exception {
        .run();
        reschedule();
        return null;
      }

      
Atomically reschedules this task and assigns the new future to currentFuture.
      public void reschedule() {
        // We reschedule ourselves with a lock held for two reasons. 1. we want to make sure that
        // cancel calls cancel on the correct future. 2. we want to make sure that the assignment
        // to currentFuture doesn't race with itself so that currentFuture is assigned in the 
        // correct order.
        .lock();
        try {
          if ( == null || !.isCancelled()) {
            final Schedule schedule = CustomScheduler.this.getNextSchedule();
             = .schedule(thisschedule.delayschedule.unit);
          }
        } catch (Throwable e) {
          // If an exception is thrown by the subclass then we need to make sure that the service
          // notices and transitions to the FAILED state.  We do it by calling notifyFailed directly
          // because the service does not monitor the state of the future so if the exception is not
          // caught and forwarded to the service the task would stop executing but the service would
          // have no idea.
          .notifyFailed(e);
        } finally {
          .unlock();
        }
      }
      
      // N.B. Only protect cancel and isCancelled because those are the only methods that are 
      // invoked by the AbstractScheduledService.
      @Override
      public boolean cancel(boolean mayInterruptIfRunning) {
        // Ensure that a task cannot be rescheduled while a cancel is ongoing.
        .lock();
        try {
          return .cancel(mayInterruptIfRunning);
        } finally {
          .unlock();
        }
      }
      @Override
      protected Future<Voiddelegate() {
        throw new UnsupportedOperationException("Only cancel is supported by this future");
      }
    }
    
    @Override
    final Future<?> schedule(AbstractService serviceScheduledExecutorService executor
        Runnable runnable) {
      ReschedulableCallable task = new ReschedulableCallable(serviceexecutorrunnable);
      task.reschedule();
      return task;
    }
    
    
A value object that represents an absolute delay until a task should be invoked.

Author(s):
Luke Sandberg
Since:
11.0
    @Beta
    protected static final class Schedule {
      
      private final long delay;
      private final TimeUnit unit;
      
      

Parameters:
delay the time from now to delay execution
unit the time unit of the delay parameter
      public Schedule(long delayTimeUnit unit) {
        this. = delay;
        this. = Preconditions.checkNotNull(unit);
      }
    }
    
    
Calculates the time at which to next invoke the task.

This is guaranteed to be called immediately after the task has completed an iteration and on the same thread as the previous execution of AbstractScheduledService.runOneIteration.

Returns:
a schedule that defines the delay before the next execution.
    protected abstract Schedule getNextSchedule() throws Exception;
  }
New to GrepCode? Check out our FAQ X