Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright (C) 2012 The Guava Authors
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   * http://www.apache.org/licenses/LICENSE-2.0
   *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.google.common.util.concurrent;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Predicates.instanceOf;
 import static com.google.common.base.Predicates.not;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import  javax.annotation.concurrent.GuardedBy;
 import  javax.annotation.concurrent.Immutable;
 import  javax.inject.Inject;
 import  javax.inject.Singleton;

A manager for monitoring and controlling a set of services. This class provides methods for starting, stopping and inspecting a collection of services. Additionally, users can monitor state transitions with the listener mechanism.

While it is recommended that service lifecycles be managed via this class, state transitions initiated via other mechanisms do not impact the correctness of its methods. For example, if the services are started by some mechanism besides startAsync, the listeners will be invoked when appropriate and awaitHealthy will still work as expected.

Here is a simple example of how to use a ServiceManager to start a server.

   class Server {
   public static void main(String[] args) {
     Set<Service> services = ...;
     ServiceManager manager = new ServiceManager(services);
     manager.addListener(new Listener() {
         public void stopped() {
         public void healthy() {
           // Services have been initialized and are healthy, start accepting requests...
         }
         public void failure(Service service) {
           // Something failed, at this point we could log it, notify a load balancer, or take
           // some other action.  For now we will just exit.
           System.exit(1);
         }
       },
       MoreExecutors.sameThreadExecutor());

     Runtime.getRuntime().addShutdownHook(new Thread() {
       public void run() {
         // Give the services 5 seconds to stop to ensure that we are responsive to shutdown 
         // requests.
         try {
           manager.stopAsync().awaitStopped(5, TimeUnit.SECONDS);
         } catch (TimeoutException timeout) {
           // stopping timed out
         }
       }
     });
     manager.startAsync();  // start all the services asynchronously
   }
 }}

This class uses the ServiceManager's methods to start all of its services, to respond to service failure and to ensure that when the JVM is shutting down all the services are stopped.

Author(s):
Luke Sandberg
Since:
14.0
@Singleton
public final class ServiceManager {
  private static final Logger logger = Logger.getLogger(ServiceManager.class.getName());
  
  
A listener for the aggregate state changes of the services that are under management. Users that need to listen to more fine-grained events (such as when each particular service starts, or terminates), should attach service listeners to each individual service.

Author(s):
Luke Sandberg
Since:
15.0 (present as an interface in 14.0)
  @Beta  // Should come out of Beta when ServiceManager does
  public abstract static class Listener {
    
Called when the service initially becomes healthy.

This will be called at most once after all the services have entered the running state. If any services fail during start up or fail/terminate before all other services have started running then this method will not be called.

    public void healthy() {}
    
    
Called when the all of the component services have reached a terminal state, either terminated or failed.
    public void stopped() {}
    
    
Called when a component service has failed.

Parameters:
service The service that failed.
    public void failure(Service service) {}
  }
  
  
An encapsulation of all of the state that is accessed by the service listeners. This is extracted into its own object so that ServiceListener could be made static and its instances can be safely constructed and added in the ServiceManager constructor without having to close over the partially constructed ServiceManager instance (i.e. avoid leaking a pointer to this).
  private final ServiceManagerState state;
  private final ImmutableMap<ServiceServiceListenerservices;
  
  
Constructs a new instance for managing the given services.

Parameters:
services The services to manage
Throws:
IllegalArgumentException if not all services are new or if there are any duplicate services.
  public ServiceManager(Iterable<? extends Serviceservices) {
    ImmutableList<Servicecopy = ImmutableList.copyOf(services);
    if (copy.isEmpty()) {
      // Having no services causes the manager to behave strangely. Notably, listeners are never 
      // fired.  To avoid this we substitute a placeholder service.
      .log(.
          "ServiceManager configured with no services.  Is your application configured properly?"
          new EmptyServiceManagerWarning());
      copy = ImmutableList.<Service>of(new NoOpService());
    }
    this. = new ServiceManagerState(copy.size());
    ImmutableMap.Builder<ServiceServiceListenerbuilder = ImmutableMap.builder();
    Executor executor = MoreExecutors.sameThreadExecutor();
    for (Service service : copy) {
      ServiceListener listener = new ServiceListener(service);
      service.addListener(listenerexecutor);
      // We check the state after adding the listener as a way to ensure that our listener was added
      // to a NEW service.
      checkArgument(service.state() == ."Can only manage NEW services, %s"service);
      builder.put(servicelistener);
    }
    this. = builder.build();
  }
  
  
Constructs a new instance for managing the given services. This constructor is provided so that dependency injection frameworks can inject instances of ServiceManager.

Parameters:
services The services to manage
Throws:
IllegalStateException if not all services are new.
  @Inject ServiceManager(Set<Serviceservices) {
    this((Iterable<Service>) services);
  }
  
  
Registers a Listener to be executed on the given executor. The listener will not have previous state changes replayed, so it is suggested that listeners are added before any of the managed services are started.

There is no guaranteed ordering of execution of listeners, but any listener added through this method is guaranteed to be called whenever there is a state change.

Exceptions thrown by a listener will be propagated up to the executor. Any exception thrown during Executor.execute (e.g., a RejectedExecutionException or an exception thrown by inline execution) will be caught and logged.

For fast, lightweight listeners that would be safe to execute in any thread, consider calling addListener(Listener).

Parameters:
listener the listener to run when the manager changes state
executor the executor in which the listeners callback methods will be run.
  public void addListener(Listener listenerExecutor executor) {
    .addListener(listenerexecutor);
  }

  
Registers a Listener to be run when this ServiceManager changes state. The listener will not have previous state changes replayed, so it is suggested that listeners are added before any of the managed services are started.

There is no guaranteed ordering of execution of listeners, but any listener added through this method is guaranteed to be called whenever there is a state change.

Exceptions thrown by a listener will be will be caught and logged.

Parameters:
listener the listener to run when the manager changes state
  public void addListener(Listener listener) {
    .addListener(listener, MoreExecutors.sameThreadExecutor());
  }

  
Initiates service startup on all the services being managed. It is only valid to call this method if all of the services are new.

Returns:
this
Throws:
IllegalStateException if any of the Services are not new when the method is called.
  public ServiceManager startAsync() {
    for (Map.Entry<ServiceServiceListenerentry : .entrySet()) {
      Service service = entry.getKey();
      State state = service.state();
      checkState(state == ."Service %s is %s, cannot start it."service
          state);
    }
    for (ServiceListener service : .values()) {
      try {
        service.start();
      } catch (IllegalStateException e) {
        // This can happen if the service has already been started or stopped (e.g. by another 
        // service or listener). Our contract says it is safe to call this method if
        // all services were NEW when it was called, and this has already been verified above, so we
        // don't propagate the exception.
        .log(."Unable to start Service " + service.servicee);
      }
    }
    return this;
  }
  
  
Waits for the ServiceManager to become healthy. The manager will become healthy after all the component services have reached the running state.

Throws:
IllegalStateException if the service manager reaches a state from which it cannot become healthy.
  public void awaitHealthy() {
    checkState(isHealthy(), "Expected to be healthy after starting");
  }
  
  
Waits for the ServiceManager to become healthy for no more than the given time. The manager will become healthy after all the component services have reached the running state.

Parameters:
timeout the maximum time to wait
unit the time unit of the timeout argument
Throws:
TimeoutException if not all of the services have finished starting within the deadline
IllegalStateException if the service manager reaches a state from which it cannot become healthy.
  public void awaitHealthy(long timeoutTimeUnit unitthrows TimeoutException {
    if (!.awaitHealthy(timeoutunit)) {
      // It would be nice to tell the caller who we are still waiting on, and this information is 
      // likely to be in servicesByState(), however due to race conditions we can't actually tell 
      // which services are holding up healthiness. The current set of NEW or STARTING services is
      // likely to point out the culprit, but may not.  If we really wanted to solve this we could
      // change state to track exactly which services have started and then we could accurately 
      // report on this. But it is only for logging so we likely don't care.
      throw new TimeoutException("Timeout waiting for the services to become healthy.");
    }
    checkState(isHealthy(), "Expected to be healthy after starting");
  }

  
Initiates service shutdown if necessary on all the services being managed.

Returns:
this
  public ServiceManager stopAsync() {
    for (Service service : .keySet()) {
      service.stop();
    }
    return this;
  }
 
  
Waits for the all the services to reach a terminal state. After this method returns all services will either be terminated or failed
  public void awaitStopped() {
  }
  
  
Waits for the all the services to reach a terminal state for no more than the given time. After this method returns all services will either be terminated or failed

Parameters:
timeout the maximum time to wait
unit the time unit of the timeout argument
Throws:
TimeoutException if not all of the services have stopped within the deadline
  public void awaitStopped(long timeoutTimeUnit unitthrows TimeoutException {
    if (!.awaitStopped(timeoutunit)) {
      throw new TimeoutException("Timeout waiting for the services to stop.");
    }
  }
  
  
Returns true if all services are currently in the running state.

Users who want more detailed information should use the servicesByState method to get detailed information about which services are not running.

  public boolean isHealthy() {
    for (Service service : .keySet()) {
      if (!service.isRunning()) {
        return false;
      }
    }
    return true;
  }
  
  
Provides a snapshot of the current state of all the services under management.

N.B. This snapshot it not guaranteed to be consistent, i.e. the set of states returned may not correspond to any particular point in time view of the services.

    ImmutableMultimap.Builder<StateServicebuilder = ImmutableMultimap.builder();
    for (Service service : .keySet()) {
      if (!(service instanceof NoOpService)) {
        builder.put(service.state(), service);
      }
    }
    return builder.build();
  }
  
  
Returns the service load times. This value will only return startup times for services that have finished starting.

Returns:
Map of services and their corresponding startup time in millis, the map entries will be ordered by startup time.
  public ImmutableMap<ServiceLongstartupTimes() { 
    List<Entry<ServiceLong>> loadTimes = Lists.newArrayListWithCapacity(.size());
    for (Map.Entry<ServiceServiceListenerentry : .entrySet()) {
      Service service = entry.getKey();
      State state = service.state();
      if (state != . & state != . & !(service instanceof NoOpService)) {
        loadTimes.add(Maps.immutableEntry(serviceentry.getValue().startupTimeMillis()));
      }
    }
    Collections.sort(loadTimes, Ordering.<Long>natural()
        .onResultOf(new Function<Entry<ServiceLong>, Long>() {
          @Override public Long apply(Map.Entry<ServiceLonginput) {
            return input.getValue();
          }
        }));
    ImmutableMap.Builder<ServiceLongbuilder = ImmutableMap.builder();
    for (Entry<ServiceLongentry : loadTimes) {
      builder.put(entry);
    }
    return builder.build();
  }
  
  @Override public String toString() {
    return Objects.toStringHelper(ServiceManager.class)
        .add("services", Collections2.filter(.keySet(), not(instanceOf(NoOpService.class))))
        .toString();
  }
  
  
An encapsulation of all the mutable state of the ServiceManager that needs to be accessed by instances of ServiceListener.
  private static final class ServiceManagerState {
    final Monitor monitor = new Monitor();
    final int numberOfServices;
    
The number of services that have not finished starting up.
    @GuardedBy("monitor")
    int unstartedServices;
    
The number of services that have not reached a terminal state.
    @GuardedBy("monitor")
    int unstoppedServices;
    
Controls how long to wait for all the service manager to either become healthy or reach a state where it is guaranteed that it can never become healthy.
    final Monitor.Guard awaitHealthGuard = new Monitor.Guard() {
      @Override public boolean isSatisfied() {
        // All services have started or some service has terminated/failed.
        return  == 0 |  != ;
      }
    };
    
Controls how long to wait for all services to reach a terminal state.
    final Monitor.Guard stoppedGuard = new Monitor.Guard() {
      @Override public boolean isSatisfied() {
        return  == 0;
      }
    };
    
The listeners to notify during a state transition.
    @GuardedBy("monitor")
    final List<ListenerExecutorPairlisteners = Lists.newArrayList();
    
The queue of listeners that are waiting to be executed.

Enqueue operations should be protected by monitor while dequeue operations are not protected. Holding monitor while enqueuing ensures that listeners in the queue are in the correct order and ExecutionQueue ensures that they are executed in the correct order.

    @GuardedBy("monitor")
    
    ServiceManagerState(int numberOfServices) {
      this. = numberOfServices;
      this. = numberOfServices;
      this. = numberOfServices;
    }
    
    void addListener(Listener listenerExecutor executor) {
      checkNotNull(listener"listener");
      checkNotNull(executor"executor");
      .enter();
      try {
        // no point in adding a listener that will never be called
        if ( > 0 ||  > 0) {
          .add(new ListenerExecutorPair(listenerexecutor));
        }
      } finally {
        .leave();
      }
    }
    
    void awaitHealthy() {
      .leave();
    }
    
    boolean awaitHealthy(long timeoutTimeUnit unit) {
      if (.enterWhenUninterruptibly(timeoutunit)) {
        .leave();
        return true;
      }
      return false;
    }
    
    void awaitStopped() {
      .leave();
    }
    
    boolean awaitStopped(long timeoutTimeUnit unit) {
      if (.enterWhenUninterruptibly(timeoutunit)) {
        .leave();
        return true;
      }
      return false;
    }
    
    
This should be called when a service finishes starting up.

Parameters:
currentlyHealthy whether or not the service that finished starting was healthy at the time that it finished starting.
    @GuardedBy("monitor")
    private void serviceFinishedStarting(Service serviceboolean currentlyHealthy) {
      checkState( > 0, 
          "All services should have already finished starting but %s just finished."service);
      --;
      if (currentlyHealthy &&  == 0 &&  == ) {
        // This means that the manager is currently healthy, or at least it should have been
        // healthy at some point from some perspective. Calling isHealthy is not currently
        // guaranteed to return true because any service could fail right now. However, the
        // happens-before relationship enforced by the monitor ensures that this method was called
        // before either serviceTerminated or serviceFailed, so we know that the manager was at 
        // least healthy for some period of time. Furthermore we are guaranteed that this call to
        // healthy() will be before any call to terminated() or failure(Service) on the listener.
        // So it is correct to execute the listener's health() callback.
        for (final ListenerExecutorPair pair : ) {
          .add(new Runnable() {
            @Override public void run() {
              pair.listener.healthy();
            }
          }, pair.executor);
        }
      }
    }
    
    
This should be called when a service is terminated.
    @GuardedBy("monitor")
    private void serviceTerminated(Service service) {
      serviceStopped(service);
    }
    
    
This should be called when a service is failed.
    @GuardedBy("monitor")
    private void serviceFailed(final Service service) {
      for (final ListenerExecutorPair pair : ) {
        .add(new Runnable() {
          @Override public void run() {
            pair.listener.failure(service);
          }
        }, pair.executor);
      }
      serviceStopped(service);
    }
    
    
Should be called whenever a service reaches a terminal state ( terminated or failed).
    @GuardedBy("monitor")
    private void serviceStopped(Service service) {
      checkState( > 0, 
          "All services should have already stopped but %s just stopped."service);
      --;
      if ( == 0) {
        checkState( == 0, 
            "All services are stopped but %d services haven't finished starting"
            );
        for (final ListenerExecutorPair pair : ) {
          .add(new Runnable() {
            @Override public void run() {
              pair.listener.stopped();
            }
          }, pair.executor);
        }
        // no more listeners could possibly be called, so clear them out
        .clear();
      }
    }

    
Attempts to execute all the listeners in queuedListeners.
    private void executeListeners() {
          "It is incorrect to execute listeners with the monitor held.");
    }
  }

  
A Service that wraps another service and times how long it takes for it to start and also calls the ServiceManagerState.serviceFinishedStarting, ServiceManagerState.serviceTerminated and ServiceManagerState.serviceFailed according to its current state.
  private static final class ServiceListener extends Service.Listener {
    @GuardedBy("watch")  // AFAICT Stopwatch is not thread safe so we need to protect accesses
    final Stopwatch watch = Stopwatch.createUnstarted();
    final Service service;
    final ServiceManagerState state;
    
    

Parameters:
service the service that
    ServiceListener(Service serviceServiceManagerState state) {
      this. = service;
      this. = state;
    }
    
    @Override public void starting() {
      // This can happen if someone besides the ServiceManager starts the service, in this case
      // our timings may be inaccurate.
      startTimer();
    }
    
    @Override public void running() {
      ..enter();
      try {
        finishedStarting(true);
      } finally {
        ..leave();
        .executeListeners();
      }
    }
    
    @Override public void stopping(State from) {
      if (from == .) {
        ..enter();
        try {
          finishedStarting(false);
        } finally {
          ..leave();
          .executeListeners();
        }
      }
    }
    
    @Override public void terminated(State from) {
      if (!( instanceof NoOpService)) {
        .log(."Service {0} has terminated. Previous state was: {1}"
            new Object[] {from});
      }
      ..enter();
      try {
        if (from == .) {
          // startTimer is idempotent, so this is safe to call and it may be necessary if no one has
          // started the timer yet.
          startTimer(); 
          finishedStarting(false);
        }
      } finally {
        ..leave();
        .executeListeners();
      }
    }
    
    @Override public void failed(State fromThrowable failure) {
      .log(."Service " +  + " has failed in the " + from + " state."
          failure);
      ..enter();
      try {
        if (from == .) {
          finishedStarting(false);
        }
        .serviceFailed();
      } finally {
        ..leave();
        .executeListeners();
      }
    }
    
    
Stop the stopwatch, log the startup time and decrement the startup latch

Parameters:
currentlyHealthy whether or not the service that finished starting is currently healthy
    @GuardedBy("monitor")
    void finishedStarting(boolean currentlyHealthy) {
      synchronized () {
        .stop();
        if (!( instanceof NoOpService)) {
          .log(."Started {0} in {1} ms."
              new Object[] {startupTimeMillis()});
        }
      }
      .serviceFinishedStarting(currentlyHealthy);
    }
    
    void start() {
      startTimer();
      .startAsync();
    }
  
    
Start the timer if it hasn't been started.
    void startTimer() {
      synchronized () {
        if (!.isRunning()) { // only start the watch once.
          .start();
          if (!( instanceof NoOpService)) {
            .log(."Starting {0}.");
          }
        }
      }
    }
    
    
Returns the amount of time it took for the service to finish starting in milliseconds.
    long startupTimeMillis() {
      synchronized () {
        return .elapsed();
      }
    }
  }
  
  
Simple value object binding a listener to its executor.
  @Immutable private static final class ListenerExecutorPair {
    final Listener listener;
    final Executor executor;
    
    ListenerExecutorPair(Listener listenerExecutor executor) {
      this. = listener;
      this. = executor;
    }
  }
  
  
A Service instance that does nothing. This is only useful as a placeholder to ensure that the ServiceManager functions properly even when it is managing no services.

The use of this class is considered an implementation detail of the class and as such it is excluded from servicesByState, startupTimes, toString and all logging statements.

  private static final class NoOpService extends AbstractService {
    @Override protected void doStart() { notifyStarted(); }
    @Override protected void doStop() { notifyStopped(); }
  }
  
  
This is never thrown but only used for logging.
  private static final class EmptyServiceManagerWarning extends Throwable {}
New to GrepCode? Check out our FAQ X