/* * Copyright (C) 2012 The Guava Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.google.common.util.concurrent; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Predicates.equalTo; import static com.google.common.base.Predicates.in; import static com.google.common.base.Predicates.instanceOf; import static com.google.common.base.Predicates.not; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static com.google.common.util.concurrent.Service.State.FAILED; import static com.google.common.util.concurrent.Service.State.NEW; import static com.google.common.util.concurrent.Service.State.RUNNING; import static com.google.common.util.concurrent.Service.State.STARTING; import static com.google.common.util.concurrent.Service.State.STOPPING; import static com.google.common.util.concurrent.Service.State.TERMINATED; import static java.util.concurrent.TimeUnit.MILLISECONDS; import com.google.common.annotations.Beta; import com.google.common.base.Function; import com.google.common.base.MoreObjects; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableCollection; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSetMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multimaps; import com.google.common.collect.Multiset; import com.google.common.collect.Ordering; import com.google.common.collect.SetMultimap; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenerCallQueue.Callback; import com.google.common.util.concurrent.Service.State; import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.EnumMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.concurrent.GuardedBy; /** * A manager for monitoring and controlling a set of {@linkplain Service services}. This class * provides methods for {@linkplain #startAsync() starting}, {@linkplain #stopAsync() stopping} and * {@linkplain #servicesByState inspecting} a collection of {@linkplain Service services}. * Additionally, users can monitor state transitions with the {@linkplain Listener listener} * mechanism. * *
While it is recommended that service lifecycles be managed via this class, state transitions * initiated via other mechanisms do not impact the correctness of its methods. For example, if the * services are started by some mechanism besides {@link #startAsync}, the listeners will be invoked * when appropriate and {@link #awaitHealthy} will still work as expected. * *
Here is a simple example of how to use a {@code ServiceManager} to start a server. *
{@code * class Server { * public static void main(String[] args) { * Set* *services = ...; * ServiceManager manager = new ServiceManager(services); * manager.addListener(new Listener() { * public void stopped() {} * public void healthy() { * // Services have been initialized and are healthy, start accepting requests... * } * public void failure(Service service) { * // Something failed, at this point we could log it, notify a load balancer, or take * // some other action. For now we will just exit. * System.exit(1); * } * }, * MoreExecutors.directExecutor()); * * Runtime.getRuntime().addShutdownHook(new Thread() { * public void run() { * // Give the services 5 seconds to stop to ensure that we are responsive to shutdown * // requests. * try { * manager.stopAsync().awaitStopped(5, TimeUnit.SECONDS); * } catch (TimeoutException timeout) { * // stopping timed out * } * } * }); * manager.startAsync(); // start all the services asynchronously * } * }}
This class uses the ServiceManager's methods to start all of its services, to respond to
* service failure and to ensure that when the JVM is shutting down all the services are stopped.
*
* @author Luke Sandberg
* @since 14.0
*/
@Beta
public final class ServiceManager {
private static final Logger logger = Logger.getLogger(ServiceManager.class.getName());
private static final Callback This will be called at most once after all the services have entered the
* {@linkplain State#RUNNING running} state. If any services fail during start up or
* {@linkplain State#FAILED fail}/{@linkplain State#TERMINATED terminate} before all other
* services have started {@linkplain State#RUNNING running} then this method will not be called.
*/
public void healthy() {}
/**
* Called when the all of the component services have reached a terminal state, either
* {@linkplain State#TERMINATED terminated} or {@linkplain State#FAILED failed}.
*/
public void stopped() {}
/**
* Called when a component service has {@linkplain State#FAILED failed}.
*
* @param service The service that failed.
*/
public void failure(Service service) {}
}
/**
* An encapsulation of all of the state that is accessed by the {@linkplain ServiceListener
* service listeners}. This is extracted into its own object so that {@link ServiceListener}
* could be made {@code static} and its instances can be safely constructed and added in the
* {@link ServiceManager} constructor without having to close over the partially constructed
* {@link ServiceManager} instance (i.e. avoid leaking a pointer to {@code this}).
*/
private final ServiceManagerState state;
private final ImmutableList {@code addListener} guarantees execution ordering across calls to a given listener but not
* across calls to multiple listeners. Specifically, a given listener will have its callbacks
* invoked in the same order as the underlying service enters those states. Additionally, at most
* one of the listener's callbacks will execute at once. However, multiple listeners' callbacks
* may execute concurrently, and listeners may execute in an order different from the one in which
* they were registered.
*
* RuntimeExceptions thrown by a listener will be caught and logged. Any exception thrown
* during {@code Executor.execute} (e.g., a {@code RejectedExecutionException}) will be caught and
* logged.
*
* For fast, lightweight listeners that would be safe to execute in any thread, consider
* calling {@link #addListener(Listener)}.
*
* @param listener the listener to run when the manager changes state
* @param executor the executor in which the listeners callback methods will be run.
*/
public void addListener(Listener listener, Executor executor) {
state.addListener(listener, executor);
}
/**
* Registers a {@link Listener} to be run when this {@link ServiceManager} changes state. The
* listener will not have previous state changes replayed, so it is suggested that listeners are
* added before any of the managed services are {@linkplain Service#startAsync started}.
*
* {@code addListener} guarantees execution ordering across calls to a given listener but not
* across calls to multiple listeners. Specifically, a given listener will have its callbacks
* invoked in the same order as the underlying service enters those states. Additionally, at most
* one of the listener's callbacks will execute at once. However, multiple listeners' callbacks
* may execute concurrently, and listeners may execute in an order different from the one in which
* they were registered.
*
* RuntimeExceptions thrown by a listener will be caught and logged.
*
* @param listener the listener to run when the manager changes state
*/
public void addListener(Listener listener) {
state.addListener(listener, directExecutor());
}
/**
* Initiates service {@linkplain Service#startAsync startup} on all the services being managed.
* It is only valid to call this method if all of the services are {@linkplain State#NEW new}.
*
* @return this
* @throws IllegalStateException if any of the Services are not {@link State#NEW new} when the
* method is called.
*/
public ServiceManager startAsync() {
for (Service service : services) {
State state = service.state();
checkState(state == NEW, "Service %s is %s, cannot start it.", service, state);
}
for (Service service : services) {
try {
state.tryStartTiming(service);
service.startAsync();
} catch (IllegalStateException e) {
// This can happen if the service has already been started or stopped (e.g. by another
// service or listener). Our contract says it is safe to call this method if
// all services were NEW when it was called, and this has already been verified above, so we
// don't propagate the exception.
logger.log(Level.WARNING, "Unable to start Service " + service, e);
}
}
return this;
}
/**
* Waits for the {@link ServiceManager} to become {@linkplain #isHealthy() healthy}. The manager
* will become healthy after all the component services have reached the {@linkplain State#RUNNING
* running} state.
*
* @throws IllegalStateException if the service manager reaches a state from which it cannot
* become {@linkplain #isHealthy() healthy}.
*/
public void awaitHealthy() {
state.awaitHealthy();
}
/**
* Waits for the {@link ServiceManager} to become {@linkplain #isHealthy() healthy} for no more
* than the given time. The manager will become healthy after all the component services have
* reached the {@linkplain State#RUNNING running} state.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @throws TimeoutException if not all of the services have finished starting within the deadline
* @throws IllegalStateException if the service manager reaches a state from which it cannot
* become {@linkplain #isHealthy() healthy}.
*/
public void awaitHealthy(long timeout, TimeUnit unit) throws TimeoutException {
state.awaitHealthy(timeout, unit);
}
/**
* Initiates service {@linkplain Service#stopAsync shutdown} if necessary on all the services
* being managed.
*
* @return this
*/
public ServiceManager stopAsync() {
for (Service service : services) {
service.stopAsync();
}
return this;
}
/**
* Waits for the all the services to reach a terminal state. After this method returns all
* services will either be {@linkplain Service.State#TERMINATED terminated} or {@linkplain
* Service.State#FAILED failed}.
*/
public void awaitStopped() {
state.awaitStopped();
}
/**
* Waits for the all the services to reach a terminal state for no more than the given time. After
* this method returns all services will either be {@linkplain Service.State#TERMINATED
* terminated} or {@linkplain Service.State#FAILED failed}.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @throws TimeoutException if not all of the services have stopped within the deadline
*/
public void awaitStopped(long timeout, TimeUnit unit) throws TimeoutException {
state.awaitStopped(timeout, unit);
}
/**
* Returns true if all services are currently in the {@linkplain State#RUNNING running} state.
*
* Users who want more detailed information should use the {@link #servicesByState} method to
* get detailed information about which services are not running.
*/
public boolean isHealthy() {
for (Service service : services) {
if (!service.isRunning()) {
return false;
}
}
return true;
}
/**
* Provides a snapshot of the current state of all the services under management.
*
* N.B. This snapshot is guaranteed to be consistent, i.e. the set of states returned will
* correspond to a point in time view of the services.
*/
public ImmutableMultimap Together, they allow us to enforce that all services have their listeners installed prior
* to any service performing a transition, then we can fail in the ServiceManager constructor
* rather than in a Service.Listener callback.
*/
@GuardedBy("monitor")
boolean ready;
@GuardedBy("monitor")
boolean transitioned;
final int numberOfServices;
/**
* Controls how long to wait for all the services to either become healthy or reach a
* state from which it is guaranteed that it can never become healthy.
*/
final Monitor.Guard awaitHealthGuard = new Monitor.Guard(monitor) {
@Override public boolean isSatisfied() {
// All services have started or some service has terminated/failed.
return states.count(RUNNING) == numberOfServices
|| states.contains(STOPPING)
|| states.contains(TERMINATED)
|| states.contains(FAILED);
}
};
/**
* Controls how long to wait for all services to reach a terminal state.
*/
final Monitor.Guard stoppedGuard = new Monitor.Guard(monitor) {
@Override public boolean isSatisfied() {
return states.count(TERMINATED) + states.count(FAILED) == numberOfServices;
}
};
/** The listeners to notify during a state transition. */
@GuardedBy("monitor")
final List This method performs the main logic of ServiceManager in the following steps.
* The use of this class is considered an implementation detail of ServiceManager and as such
* it is excluded from {@link #servicesByState}, {@link #startupTimes}, {@link #toString} and all
* logging statements.
*/
private static final class NoOpService extends AbstractService {
@Override protected void doStart() { notifyStarted(); }
@Override protected void doStop() { notifyStopped(); }
}
/** This is never thrown but only used for logging. */
private static final class EmptyServiceManagerWarning extends Throwable {}
}
*
*/
void transitionService(final Service service, State from, State to) {
checkNotNull(service);
checkArgument(from != to);
monitor.enter();
try {
transitioned = true;
if (!ready) {
return;
}
// Update state.
checkState(servicesByState.remove(from, service),
"Service %s not at the expected location in the state map %s", service, from);
checkState(servicesByState.put(to, service),
"Service %s in the state map unexpectedly at %s", service, to);
// Update the timer
Stopwatch stopwatch = startupTimers.get(service);
if (stopwatch == null) {
// This means the service was started by some means other than ServiceManager.startAsync
stopwatch = Stopwatch.createStarted();
startupTimers.put(service, stopwatch);
}
if (to.compareTo(RUNNING) >= 0 && stopwatch.isRunning()) {
// N.B. if we miss the STARTING event then we may never record a startup time.
stopwatch.stop();
if (!(service instanceof NoOpService)) {
logger.log(Level.FINE, "Started {0} in {1}.", new Object[] {service, stopwatch});
}
}
// Queue our listeners
// Did a service fail?
if (to == FAILED) {
fireFailedListeners(service);
}
if (states.count(RUNNING) == numberOfServices) {
// This means that the manager is currently healthy. N.B. If other threads call isHealthy
// they are not guaranteed to get 'true', because any service could fail right now.
fireHealthyListeners();
} else if (states.count(TERMINATED) + states.count(FAILED) == numberOfServices) {
fireStoppedListeners();
}
} finally {
monitor.leave();
// Run our executors outside of the lock
executeListeners();
}
}
@GuardedBy("monitor")
void fireStoppedListeners() {
STOPPED_CALLBACK.enqueueOn(listeners);
}
@GuardedBy("monitor")
void fireHealthyListeners() {
HEALTHY_CALLBACK.enqueueOn(listeners);
}
@GuardedBy("monitor")
void fireFailedListeners(final Service service) {
new Callback