Start line:  
End line:  

Snippet Preview

Snippet HTML Code

Stack Overflow Questions
  /*
   * Copyright (C) 2012 The Guava Authors
   *
   * Licensed under the Apache License, Version 2.0 (the "License");
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   * http://www.apache.org/licenses/LICENSE-2.0
   *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 package com.google.common.util.concurrent;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Predicates.equalTo;
 import static com.google.common.base.Predicates.in;
 import static com.google.common.base.Predicates.instanceOf;
 import static com.google.common.base.Predicates.not;
 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 import static com.google.common.util.concurrent.Service.State.FAILED;
 import static com.google.common.util.concurrent.Service.State.NEW;
 import static com.google.common.util.concurrent.Service.State.RUNNING;
 import static com.google.common.util.concurrent.Service.State.STARTING;
 import static com.google.common.util.concurrent.Service.State.STOPPING;
 import static com.google.common.util.concurrent.Service.State.TERMINATED;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
A manager for monitoring and controlling a set of services. This class provides methods for starting, stopping and inspecting a collection of services. Additionally, users can monitor state transitions with the listener mechanism.

While it is recommended that service lifecycles be managed via this class, state transitions initiated via other mechanisms do not impact the correctness of its methods. For example, if the services are started by some mechanism besides startAsync(), the listeners will be invoked when appropriate and awaitHealthy() will still work as expected.

Here is a simple example of how to use a ServiceManager to start a server.

   class Server {
   public static void main(String[] args) {
     Set<Service> services = ...;
     ServiceManager manager = new ServiceManager(services);
     manager.addListener(new Listener() {
         public void stopped() {
         public void healthy() {
           // Services have been initialized and are healthy, start accepting requests...
         }
         public void failure(Service service) {
           // Something failed, at this point we could log it, notify a load balancer, or take
           // some other action.  For now we will just exit.
           System.exit(1);
         }
       },
       MoreExecutors.directExecutor());

     Runtime.getRuntime().addShutdownHook(new Thread() {
       public void run() {
         // Give the services 5 seconds to stop to ensure that we are responsive to shutdown
         // requests.
         try {
           manager.stopAsync().awaitStopped(5, TimeUnit.SECONDS);
         } catch (TimeoutException timeout) {
           // stopping timed out
         }
       }
     });
     manager.startAsync();  // start all the services asynchronously
   }
 }}

This class uses the ServiceManager's methods to start all of its services, to respond to service failure and to ensure that when the JVM is shutting down all the services are stopped.

Author(s):
Luke Sandberg
Since:
14.0
public final class ServiceManager {
  private static final Logger logger = Logger.getLogger(ServiceManager.class.getName());
  private static final Callback<ListenerHEALTHY_CALLBACK = new Callback<Listener>("healthy()") {
    @Override void call(Listener listener) {
      listener.healthy();
    }
  };
  private static final Callback<ListenerSTOPPED_CALLBACK = new Callback<Listener>("stopped()") {
    @Override void call(Listener listener) {
      listener.stopped();
    }
  };

  
A listener for the aggregate state changes of the services that are under management. Users that need to listen to more fine-grained events (such as when each particular Service starts, or terminates), should attach service listeners to each individual service.

Author(s):
Luke Sandberg
Since:
15.0 (present as an interface in 14.0)
  @Beta  // Should come out of Beta when ServiceManager does
  public abstract static class Listener {
    
Called when the service initially becomes healthy.

This will be called at most once after all the services have entered the running state. If any services fail during start up or fail/terminate before all other services have started running then this method will not be called.

    public void healthy() {}

    
Called when the all of the component services have reached a terminal state, either terminated or failed.
    public void stopped() {}

    
Called when a component service has failed.

Parameters:
service The service that failed.
    public void failure(Service service) {}
  }

  
An encapsulation of all of the state that is accessed by the service listeners. This is extracted into its own object so that ServiceManager.ServiceListener could be made static and its instances can be safely constructed and added in the ServiceManager constructor without having to close over the partially constructed ServiceManager instance (i.e. avoid leaking a pointer to this).
  private final ServiceManagerState state;
  private final ImmutableList<Serviceservices;

  
Constructs a new instance for managing the given services.

Parameters:
services The services to manage
Throws:
java.lang.IllegalArgumentException if not all services are new or if there are any duplicate services.
  public ServiceManager(Iterable<? extends Serviceservices) {
    ImmutableList<Servicecopy = ImmutableList.copyOf(services);
    if (copy.isEmpty()) {
      // Having no services causes the manager to behave strangely. Notably, listeners are never
      // fired.  To avoid this we substitute a placeholder service.
      .log(.,
          "ServiceManager configured with no services.  Is your application configured properly?",
          new EmptyServiceManagerWarning());
      copy = ImmutableList.<Service>of(new NoOpService());
    }
    this. = new ServiceManagerState(copy);
    this. = copy;
    WeakReference<ServiceManagerStatestateReference =
    for (Service service : copy) {
      service.addListener(new ServiceListener(servicestateReference), directExecutor());
      // We check the state after adding the listener as a way to ensure that our listener was added
      // to a NEW service.
      checkArgument(service.state() == "Can only manage NEW services, %s"service);
    }
    // We have installed all of our listeners and after this point any state transition should be
    // correct.
    this..markReady();
  }

  
Registers a ServiceManager.Listener to be executed on the given executor. The listener will not have previous state changes replayed, so it is suggested that listeners are added before any of the managed services are started.

addListener guarantees execution ordering across calls to a given listener but not across calls to multiple listeners. Specifically, a given listener will have its callbacks invoked in the same order as the underlying service enters those states. Additionally, at most one of the listener's callbacks will execute at once. However, multiple listeners' callbacks may execute concurrently, and listeners may execute in an order different from the one in which they were registered.

RuntimeExceptions thrown by a listener will be caught and logged. Any exception thrown during Executor.execute (e.g., a RejectedExecutionException) will be caught and logged.

For fast, lightweight listeners that would be safe to execute in any thread, consider calling addListener(com.google.common.util.concurrent.ServiceManager.Listener).

Parameters:
listener the listener to run when the manager changes state
executor the executor in which the listeners callback methods will be run.
  public void addListener(Listener listenerExecutor executor) {
    .addListener(listenerexecutor);
  }

  
Registers a ServiceManager.Listener to be run when this ServiceManager changes state. The listener will not have previous state changes replayed, so it is suggested that listeners are added before any of the managed services are started.

addListener guarantees execution ordering across calls to a given listener but not across calls to multiple listeners. Specifically, a given listener will have its callbacks invoked in the same order as the underlying service enters those states. Additionally, at most one of the listener's callbacks will execute at once. However, multiple listeners' callbacks may execute concurrently, and listeners may execute in an order different from the one in which they were registered.

RuntimeExceptions thrown by a listener will be caught and logged.

Parameters:
listener the listener to run when the manager changes state
  public void addListener(Listener listener) {
    .addListener(listenerdirectExecutor());
  }

  
Initiates service startup on all the services being managed. It is only valid to call this method if all of the services are new.

Returns:
this
Throws:
java.lang.IllegalStateException if any of the Services are not new when the method is called.
  public ServiceManager startAsync() {
    for (Service service : ) {
      State state = service.state();
      checkState(state == "Service %s is %s, cannot start it."servicestate);
    }
    for (Service service : ) {
      try {
        .tryStartTiming(service);
        service.startAsync();
      } catch (IllegalStateException e) {
        // This can happen if the service has already been started or stopped (e.g. by another
        // service or listener). Our contract says it is safe to call this method if
        // all services were NEW when it was called, and this has already been verified above, so we
        // don't propagate the exception.
        .log(."Unable to start Service " + servicee);
      }
    }
    return this;
  }

  
Waits for the ServiceManager to become healthy. The manager will become healthy after all the component services have reached the running state.

Throws:
java.lang.IllegalStateException if the service manager reaches a state from which it cannot become healthy.
  public void awaitHealthy() {
  }

  
Waits for the ServiceManager to become healthy for no more than the given time. The manager will become healthy after all the component services have reached the running state.

Parameters:
timeout the maximum time to wait
unit the time unit of the timeout argument
Throws:
java.util.concurrent.TimeoutException if not all of the services have finished starting within the deadline
java.lang.IllegalStateException if the service manager reaches a state from which it cannot become healthy.
  public void awaitHealthy(long timeoutTimeUnit unitthrows TimeoutException {
    .awaitHealthy(timeoutunit);
  }

  
Initiates service shutdown if necessary on all the services being managed.

Returns:
this
  public ServiceManager stopAsync() {
    for (Service service : ) {
      service.stopAsync();
    }
    return this;
  }

  
Waits for the all the services to reach a terminal state. After this method returns all services will either be terminated or Service.State.FAILED.
  public void awaitStopped() {
  }

  
Waits for the all the services to reach a terminal state for no more than the given time. After this method returns all services will either be terminated or failed.

Parameters:
timeout the maximum time to wait
unit the time unit of the timeout argument
Throws:
java.util.concurrent.TimeoutException if not all of the services have stopped within the deadline
  public void awaitStopped(long timeoutTimeUnit unitthrows TimeoutException {
    .awaitStopped(timeoutunit);
  }

  
Returns true if all services are currently in the running state.

Users who want more detailed information should use the servicesByState() method to get detailed information about which services are not running.

  public boolean isHealthy() {
    for (Service service : ) {
      if (!service.isRunning()) {
        return false;
      }
    }
    return true;
  }

  
Provides a snapshot of the current state of all the services under management.

N.B. This snapshot is guaranteed to be consistent, i.e. the set of states returned will correspond to a point in time view of the services.

    return .servicesByState();
  }

  
Returns the service load times. This value will only return startup times for services that have finished starting.

Returns:
Map of services and their corresponding startup time in millis, the map entries will be ordered by startup time.
    return .startupTimes();
  }
  @Override public String toString() {
    return MoreObjects.toStringHelper(ServiceManager.class)
        .add("services", Collections2.filter(not(instanceOf(NoOpService.class))))
        .toString();
  }

  
An encapsulation of all the mutable state of the ServiceManager that needs to be accessed by instances of ServiceManager.ServiceListener.
  private static final class ServiceManagerState {
    final Monitor monitor = new Monitor();
    @GuardedBy("monitor")
        Multimaps.newSetMultimap(new EnumMap<StateCollection<Service>>(State.class),
            new Supplier<Set<Service>>() {
              @Override public Set<Serviceget() {
                return Sets.newLinkedHashSet();
              }
            });
    @GuardedBy("monitor")
    final Multiset<Statestates = .keys();
    @GuardedBy("monitor")
    final Map<ServiceStopwatchstartupTimers = Maps.newIdentityHashMap();

    
These two booleans are used to mark the state as ready to start. ready: is set by markReady() to indicate that all listeners have been correctly installed transitioned: is set by transitionService(com.google.common.util.concurrent.Service,com.google.common.util.concurrent.Service.State,com.google.common.util.concurrent.Service.State) to indicate that some transition has been performed.

Together, they allow us to enforce that all services have their listeners installed prior to any service performing a transition, then we can fail in the ServiceManager constructor rather than in a Service.Listener callback.

    @GuardedBy("monitor")
    boolean ready;
    @GuardedBy("monitor")
    boolean transitioned;
    final int numberOfServices;

    
Controls how long to wait for all the services to either become healthy or reach a state from which it is guaranteed that it can never become healthy.
    final Monitor.Guard awaitHealthGuard = new Monitor.Guard() {
      @Override public boolean isSatisfied() {
        // All services have started or some service has terminated/failed.
        return .count() == 
            || .contains()
            || .contains()
            || .contains();
      }
    };

    
Controls how long to wait for all services to reach a terminal state.
    final Monitor.Guard stoppedGuard = new Monitor.Guard() {
      @Override public boolean isSatisfied() {
        return .count() + .count() == ;
      }
    };

    
The listeners to notify during a state transition.
    @GuardedBy("monitor")
        Collections.synchronizedList(new ArrayList<ListenerCallQueue<Listener>>());

    
It is implicitly assumed that all the services are NEW and that they will all remain NEW until all the Listeners are installed and markReady() is called. It is our caller's responsibility to only call markReady() if all services were new at the time this method was called and when all the listeners were installed.
      this. = services.size();
      .putAll(services);
    }

    
Attempts to start the timer immediately prior to the service being started via Service.startAsync().
    void tryStartTiming(Service service) {
      .enter();
      try {
        Stopwatch stopwatch = .get(service);
        if (stopwatch == null) {
          .put(service, Stopwatch.createStarted());
        }
      } finally {
        .leave();
      }
    }

    
Marks the Service.State as ready to receive transitions. Returns true if no transitions have been observed yet.
    void markReady() {
      .enter();
      try {
        if (!) {
          // nothing has transitioned since construction, good.
           = true;
        } else {
          // This should be an extremely rare race condition.
          List<ServiceservicesInBadStates = Lists.newArrayList();
          for (Service service : servicesByState().values()) {
            if (service.state() != ) {
              servicesInBadStates.add(service);
            }
          }
          throw new IllegalArgumentException("Services started transitioning asynchronously before "
              + "the ServiceManager was constructed: " + servicesInBadStates);
        }
      } finally {
        .leave();
      }
    }
    void addListener(Listener listenerExecutor executor) {
      checkNotNull(listener"listener");
      checkNotNull(executor"executor");
      .enter();
      try {
        // no point in adding a listener that will never be called
        if (!.isSatisfied()) {
          .add(new ListenerCallQueue<Listener>(listenerexecutor));
        }
      } finally {
        .leave();
      }
    }
    void awaitHealthy() {
      try {
        checkHealthy();
      } finally {
        .leave();
      }
    }
    void awaitHealthy(long timeoutTimeUnit unitthrows TimeoutException {
      .enter();
      try {
        if (!.waitForUninterruptibly(timeoutunit)) {
          throw new TimeoutException("Timeout waiting for the services to become healthy. The "
              + "following services have not started: "
              + Multimaps.filterKeys(in(ImmutableSet.of())));
        }
        checkHealthy();
      } finally {
        .leave();
      }
    }
    void awaitStopped() {
      .leave();
    }
    void awaitStopped(long timeoutTimeUnit unitthrows TimeoutException {
      .enter();
      try {
        if (!.waitForUninterruptibly(timeoutunit)) {
          throw new TimeoutException("Timeout waiting for the services to stop. The following "
              + "services have not stopped: "
              + Multimaps.filterKeys(,
                  not(in(ImmutableSet.of()))));
        }
      } finally {
        .leave();
      }
    }
      ImmutableSetMultimap.Builder<StateServicebuilder = ImmutableSetMultimap.builder();
      .enter();
      try {
        for (Entry<StateServiceentry : .entries()) {
          if (!(entry.getValue() instanceof NoOpService)) {
            builder.put(entry.getKey(), entry.getValue());
          }
        }
      } finally {
        .leave();
      }
      return builder.build();
    }
      List<Entry<ServiceLong>> loadTimes;
      .enter();
      try {
        loadTimes = Lists.newArrayListWithCapacity(.size());
        // N.B. There will only be an entry in the map if the service has started
        for (Entry<ServiceStopwatchentry : .entrySet()) {
          Service service = entry.getKey();
          Stopwatch stopWatch = entry.getValue();
          if (!stopWatch.isRunning() && !(service instanceof NoOpService)) {
            loadTimes.add(Maps.immutableEntry(servicestopWatch.elapsed()));
          }
        }
      } finally {
        .leave();
      }
      Collections.sort(loadTimes, Ordering.<Long>natural()
          .onResultOf(new Function<Entry<ServiceLong>, Long>() {
            @Override public Long apply(Map.Entry<ServiceLonginput) {
              return input.getValue();
            }
          }));
      ImmutableMap.Builder<ServiceLongbuilder = ImmutableMap.builder();
      for (Entry<ServiceLongentry : loadTimes) {
        builder.put(entry);
      }
      return builder.build();
    }

    
Updates the state with the given service transition.

This method performs the main logic of ServiceManager in the following steps.

  1. Update the servicesByState()
  2. Update the startupTimers
  3. Based on the new state queue listeners to run
  4. Run the listeners (outside of the lock)
    void transitionService(final Service serviceState fromState to) {
      checkNotNull(service);
      checkArgument(from != to);
      .enter();
      try {
         = true;
        if (!) {
          return;
        }
        // Update state.
        checkState(.remove(fromservice),
            "Service %s not at the expected location in the state map %s"servicefrom);
        checkState(.put(toservice),
            "Service %s in the state map unexpectedly at %s"serviceto);
        // Update the timer
        Stopwatch stopwatch = .get(service);
        if (stopwatch == null) {
          // This means the service was started by some means other than ServiceManager.startAsync
          stopwatch = Stopwatch.createStarted();
          .put(servicestopwatch);
        }
        if (to.compareTo() >= 0 && stopwatch.isRunning()) {
          // N.B. if we miss the STARTING event then we may never record a startup time.
          stopwatch.stop();
          if (!(service instanceof NoOpService)) {
            .log(."Started {0} in {1}."new Object[] {servicestopwatch});
          }
        }
        // Queue our listeners
        // Did a service fail?
        if (to == ) {
          fireFailedListeners(service);
        }
        if (.count() == ) {
          // This means that the manager is currently healthy. N.B. If other threads call isHealthy
          // they are not guaranteed to get 'true', because any service could fail right now.
          fireHealthyListeners();
        } else if (.count() + .count() == ) {
          fireStoppedListeners();
        }
      } finally {
        .leave();
        // Run our executors outside of the lock
        executeListeners();
      }
    }
    @GuardedBy("monitor")
    void fireStoppedListeners() {
    }
    @GuardedBy("monitor")
    void fireHealthyListeners() {
    }
    @GuardedBy("monitor")
    void fireFailedListeners(final Service service) {
      new Callback<Listener>("failed({service=" + service + "})") {
        @Override void call(Listener listener) {
          listener.failure(service);
        }
      }.enqueueOn();
    }

    
Attempts to execute all the listeners in listeners.
    void executeListeners() {
          "It is incorrect to execute listeners with the monitor held.");
      // iterate by index to avoid concurrent modification exceptions
      for (int i = 0; i < .size(); i++) {
        .get(i).execute();
      }
    }
    @GuardedBy("monitor")
    void checkHealthy() {
      if (.count() != ) {
        IllegalStateException exception = new IllegalStateException(
            "Expected to be healthy after starting. The following services are not running: "
                + Multimaps.filterKeys(not(equalTo())));
        throw exception;
      }
    }
  }

  
  private static final class ServiceListener extends Service.Listener {
    final Service service;
    // We store the state in a weak reference to ensure that if something went wrong while
    // constructing the ServiceManager we don't pointlessly keep updating the state.
      this. = service;
      this. = state;
    }
    @Override public void starting() {
      ServiceManagerState state = this..get();
      if (state != null) {
        state.transitionService();
        if (!( instanceof NoOpService)) {
          .log(."Starting {0}.");
        }
      }
    }
    @Override public void running() {
      ServiceManagerState state = this..get();
      if (state != null) {
        state.transitionService();
      }
    }
    @Override public void stopping(State from) {
      ServiceManagerState state = this..get();
      if (state != null) {
        state.transitionService(from);
      }
    }
    @Override public void terminated(State from) {
      ServiceManagerState state = this..get();
      if (state != null) {
        if (!( instanceof NoOpService)) {
          .log(."Service {0} has terminated. Previous state was: {1}",
              new Object[] {from});
        }
        state.transitionService(from);
      }
    }
    @Override public void failed(State fromThrowable failure) {
      ServiceManagerState state = this..get();
      if (state != null) {
        // Log before the transition, so that if the process exits in response to server failure,
        // there is a higher likelihood that the cause will be in the logs.
        if (!( instanceof NoOpService)) {
          .log(."Service " +  + " has failed in the " + from + " state.",
              failure);
        }
        state.transitionService(from);
      }
    }
  }

  
A Service instance that does nothing. This is only useful as a placeholder to ensure that the ServiceManager functions properly even when it is managing no services.

The use of this class is considered an implementation detail of ServiceManager and as such it is excluded from ServiceManager.servicesByState(), ServiceManager.startupTimes(), AbstractService.toString() and all logging statements.

  private static final class NoOpService extends AbstractService {
    @Override protected void doStart() { notifyStarted(); }
    @Override protected void doStop() { notifyStopped(); }
  }

  
This is never thrown but only used for logging.
  private static final class EmptyServiceManagerWarning extends Throwable {}
New to GrepCode? Check out our FAQ X