1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#ifndef BASE_THREADING_SEQUENCED_WORKER_POOL_H_
6#define BASE_THREADING_SEQUENCED_WORKER_POOL_H_
7
8#include <cstddef>
9#include <string>
10
11#include "base/base_export.h"
12#include "base/basictypes.h"
13#include "base/callback_forward.h"
14#include "base/memory/ref_counted.h"
15#include "base/memory/scoped_ptr.h"
16#include "base/task_runner.h"
17
18namespace tracked_objects {
19class Location;
20}  // namespace tracked_objects
21
22namespace base {
23
24class MessageLoopProxy;
25
26template <class T> class DeleteHelper;
27
28class SequencedTaskRunner;
29
30// A worker thread pool that enforces ordering between sets of tasks. It also
31// allows you to specify what should happen to your tasks on shutdown.
32//
33// To enforce ordering, get a unique sequence token from the pool and post all
34// tasks you want to order with the token. All tasks with the same token are
35// guaranteed to execute serially, though not necessarily on the same thread.
36// This means that:
37//
38//   - No two tasks with the same token will run at the same time.
39//
40//   - Given two tasks T1 and T2 with the same token such that T2 will
41//     run after T1, then T2 will start after T1 is destroyed.
42//
43//   - If T2 will run after T1, then all memory changes in T1 and T1's
44//     destruction will be visible to T2.
45//
46// Example:
47//   SequencedWorkerPool::SequenceToken token = pool.GetSequenceToken();
48//   pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN,
49//                                FROM_HERE, base::Bind(...));
50//   pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN,
51//                                FROM_HERE, base::Bind(...));
52//
53// You can make named sequence tokens to make it easier to share a token
54// across different components.
55//
56// You can also post tasks to the pool without ordering using PostWorkerTask.
57// These will be executed in an unspecified order. The order of execution
58// between tasks with different sequence tokens is also unspecified.
59//
60// This class may be leaked on shutdown to facilitate fast shutdown. The
61// expected usage, however, is to call Shutdown(), which correctly accounts
62// for CONTINUE_ON_SHUTDOWN behavior and is required for BLOCK_SHUTDOWN
63// behavior.
64//
65// Implementation note: This does not use a base::WorkerPool since that does
66// not enforce shutdown semantics or allow us to specify how many worker
67// threads to run. For the typical use case of random background work, we don't
68// necessarily want to be super aggressive about creating threads.
69//
70// Note that SequencedWorkerPool is RefCountedThreadSafe (inherited
71// from TaskRunner).
72class BASE_EXPORT SequencedWorkerPool : public TaskRunner {
73 public:
74  // Defines what should happen to a task posted to the worker pool on
75  // shutdown.
76  enum WorkerShutdown {
77    // Tasks posted with this mode which have not run at shutdown will be
78    // deleted rather than run, and any tasks with this mode running at
79    // shutdown will be ignored (the worker thread will not be joined).
80    //
81    // This option provides a nice way to post stuff you don't want blocking
82    // shutdown. For example, you might be doing a slow DNS lookup and if it's
83    // blocked on the OS, you may not want to stop shutdown, since the result
84    // doesn't really matter at that point.
85    //
86    // However, you need to be very careful what you do in your callback when
87    // you use this option. Since the thread will continue to run until the OS
88    // terminates the process, the app can be in the process of tearing down
89    // when you're running. This means any singletons or global objects you
90    // use may suddenly become invalid out from under you. For this reason,
91    // it's best to use this only for slow but simple operations like the DNS
92    // example.
93    CONTINUE_ON_SHUTDOWN,
94
95    // Tasks posted with this mode that have not started executing at
96    // shutdown will be deleted rather than executed. However, any tasks that
97    // have already begun executing when shutdown is called will be allowed
98    // to continue, and will block shutdown until completion.
99    //
100    // Note: Because Shutdown() may block while these tasks are executing,
101    // care must be taken to ensure that they do not block on the thread that
102    // called Shutdown(), as this may lead to deadlock.
103    SKIP_ON_SHUTDOWN,
104
105    // Tasks posted with this mode will block shutdown until they're
106    // executed. Since this can have significant performance implications,
107    // use sparingly.
108    //
109    // Generally, this should be used only for user data, for example, a task
110    // writing a preference file.
111    //
112    // If a task is posted during shutdown, it will not get run since the
113    // workers may already be stopped. In this case, the post operation will
114    // fail (return false) and the task will be deleted.
115    BLOCK_SHUTDOWN,
116  };
117
118  // Opaque identifier that defines sequencing of tasks posted to the worker
119  // pool.
120  class SequenceToken {
121   public:
122    SequenceToken() : id_(0) {}
123    ~SequenceToken() {}
124
125    bool Equals(const SequenceToken& other) const {
126      return id_ == other.id_;
127    }
128
129    // Returns false if current thread is executing an unsequenced task.
130    bool IsValid() const {
131      return id_ != 0;
132    }
133
134   private:
135    friend class SequencedWorkerPool;
136
137    explicit SequenceToken(int id) : id_(id) {}
138
139    int id_;
140  };
141
142  // Allows tests to perform certain actions.
143  class TestingObserver {
144   public:
145    virtual ~TestingObserver() {}
146    virtual void OnHasWork() = 0;
147    virtual void WillWaitForShutdown() = 0;
148    virtual void OnDestruct() = 0;
149  };
150
151  // Gets the SequencedToken of the current thread.
152  // If current thread is not a SequencedWorkerPool worker thread or is running
153  // an unsequenced task, returns an invalid SequenceToken.
154  static SequenceToken GetSequenceTokenForCurrentThread();
155
156  // When constructing a SequencedWorkerPool, there must be a
157  // MessageLoop on the current thread unless you plan to deliberately
158  // leak it.
159
160  // Pass the maximum number of threads (they will be lazily created as needed)
161  // and a prefix for the thread name to aid in debugging.
162  SequencedWorkerPool(size_t max_threads,
163                      const std::string& thread_name_prefix);
164
165  // Like above, but with |observer| for testing.  Does not take
166  // ownership of |observer|.
167  SequencedWorkerPool(size_t max_threads,
168                      const std::string& thread_name_prefix,
169                      TestingObserver* observer);
170
171  // Returns a unique token that can be used to sequence tasks posted to
172  // PostSequencedWorkerTask(). Valid tokens are always nonzero.
173  SequenceToken GetSequenceToken();
174
175  // Returns the sequence token associated with the given name. Calling this
176  // function multiple times with the same string will always produce the
177  // same sequence token. If the name has not been used before, a new token
178  // will be created.
179  SequenceToken GetNamedSequenceToken(const std::string& name);
180
181  // Returns a SequencedTaskRunner wrapper which posts to this
182  // SequencedWorkerPool using the given sequence token. Tasks with nonzero
183  // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay
184  // are posted with BLOCK_SHUTDOWN behavior.
185  scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunner(
186      SequenceToken token);
187
188  // Returns a SequencedTaskRunner wrapper which posts to this
189  // SequencedWorkerPool using the given sequence token. Tasks with nonzero
190  // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay
191  // are posted with the given shutdown behavior.
192  scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunnerWithShutdownBehavior(
193      SequenceToken token,
194      WorkerShutdown shutdown_behavior);
195
196  // Returns a TaskRunner wrapper which posts to this SequencedWorkerPool using
197  // the given shutdown behavior. Tasks with nonzero delay are posted with
198  // SKIP_ON_SHUTDOWN behavior and tasks with zero delay are posted with the
199  // given shutdown behavior.
200  scoped_refptr<TaskRunner> GetTaskRunnerWithShutdownBehavior(
201      WorkerShutdown shutdown_behavior);
202
203  // Posts the given task for execution in the worker pool. Tasks posted with
204  // this function will execute in an unspecified order on a background thread.
205  // Returns true if the task was posted. If your tasks have ordering
206  // requirements, see PostSequencedWorkerTask().
207  //
208  // This class will attempt to delete tasks that aren't run
209  // (non-block-shutdown semantics) but can't guarantee that this happens. If
210  // all worker threads are busy running CONTINUE_ON_SHUTDOWN tasks, there
211  // will be no workers available to delete these tasks. And there may be
212  // tasks with the same sequence token behind those CONTINUE_ON_SHUTDOWN
213  // tasks. Deleting those tasks before the previous one has completed could
214  // cause nondeterministic crashes because the task could be keeping some
215  // objects alive which do work in their destructor, which could voilate the
216  // assumptions of the running task.
217  //
218  // The task will be guaranteed to run to completion before shutdown
219  // (BLOCK_SHUTDOWN semantics).
220  //
221  // Returns true if the task was posted successfully. This may fail during
222  // shutdown regardless of the specified ShutdownBehavior.
223  bool PostWorkerTask(const tracked_objects::Location& from_here,
224                      const Closure& task);
225
226  // Same as PostWorkerTask but allows a delay to be specified (although doing
227  // so changes the shutdown behavior). The task will be run after the given
228  // delay has elapsed.
229  //
230  // If the delay is nonzero, the task won't be guaranteed to run to completion
231  // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs.
232  // If the delay is zero, this behaves exactly like PostWorkerTask, i.e. the
233  // task will be guaranteed to run to completion before shutdown
234  // (BLOCK_SHUTDOWN semantics).
235  bool PostDelayedWorkerTask(const tracked_objects::Location& from_here,
236                             const Closure& task,
237                             TimeDelta delay);
238
239  // Same as PostWorkerTask but allows specification of the shutdown behavior.
240  bool PostWorkerTaskWithShutdownBehavior(
241      const tracked_objects::Location& from_here,
242      const Closure& task,
243      WorkerShutdown shutdown_behavior);
244
245  // Like PostWorkerTask above, but provides sequencing semantics. This means
246  // that tasks posted with the same sequence token (see GetSequenceToken())
247  // are guaranteed to execute in order. This is useful in cases where you're
248  // doing operations that may depend on previous ones, like appending to a
249  // file.
250  //
251  // The task will be guaranteed to run to completion before shutdown
252  // (BLOCK_SHUTDOWN semantics).
253  //
254  // Returns true if the task was posted successfully. This may fail during
255  // shutdown regardless of the specified ShutdownBehavior.
256  bool PostSequencedWorkerTask(SequenceToken sequence_token,
257                               const tracked_objects::Location& from_here,
258                               const Closure& task);
259
260  // Like PostSequencedWorkerTask above, but allows you to specify a named
261  // token, which saves an extra call to GetNamedSequenceToken.
262  bool PostNamedSequencedWorkerTask(const std::string& token_name,
263                                    const tracked_objects::Location& from_here,
264                                    const Closure& task);
265
266  // Same as PostSequencedWorkerTask but allows a delay to be specified
267  // (although doing so changes the shutdown behavior). The task will be run
268  // after the given delay has elapsed.
269  //
270  // If the delay is nonzero, the task won't be guaranteed to run to completion
271  // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs.
272  // If the delay is zero, this behaves exactly like PostSequencedWorkerTask,
273  // i.e. the task will be guaranteed to run to completion before shutdown
274  // (BLOCK_SHUTDOWN semantics).
275  bool PostDelayedSequencedWorkerTask(
276      SequenceToken sequence_token,
277      const tracked_objects::Location& from_here,
278      const Closure& task,
279      TimeDelta delay);
280
281  // Same as PostSequencedWorkerTask but allows specification of the shutdown
282  // behavior.
283  bool PostSequencedWorkerTaskWithShutdownBehavior(
284      SequenceToken sequence_token,
285      const tracked_objects::Location& from_here,
286      const Closure& task,
287      WorkerShutdown shutdown_behavior);
288
289  // TaskRunner implementation. Forwards to PostDelayedWorkerTask().
290  virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
291                               const Closure& task,
292                               TimeDelta delay) OVERRIDE;
293  virtual bool RunsTasksOnCurrentThread() const OVERRIDE;
294
295  // Returns true if the current thread is processing a task with the given
296  // sequence_token.
297  bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
298
299  // Blocks until all pending tasks are complete. This should only be called in
300  // unit tests when you want to validate something that should have happened.
301  // This will not flush delayed tasks; delayed tasks get deleted.
302  //
303  // Note that calling this will not prevent other threads from posting work to
304  // the queue while the calling thread is waiting on Flush(). In this case,
305  // Flush will return only when there's no more work in the queue. Normally,
306  // this doesn't come up since in a test, all the work is being posted from
307  // the main thread.
308  void FlushForTesting();
309
310  // Spuriously signal that there is work to be done.
311  void SignalHasWorkForTesting();
312
313  // Implements the worker pool shutdown. This should be called during app
314  // shutdown, and will discard/join with appropriate tasks before returning.
315  // After this call, subsequent calls to post tasks will fail.
316  //
317  // Must be called from the same thread this object was constructed on.
318  void Shutdown() { Shutdown(0); }
319
320  // A variant that allows an arbitrary number of new blocking tasks to
321  // be posted during shutdown from within tasks that execute during shutdown.
322  // Only tasks designated as BLOCKING_SHUTDOWN will be allowed, and only if
323  // posted by tasks that are not designated as CONTINUE_ON_SHUTDOWN. Once
324  // the limit is reached, subsequent calls to post task fail in all cases.
325  //
326  // Must be called from the same thread this object was constructed on.
327  void Shutdown(int max_new_blocking_tasks_after_shutdown);
328
329  // Check if Shutdown was called for given threading pool. This method is used
330  // for aborting time consuming operation to avoid blocking shutdown.
331  //
332  // Can be called from any thread.
333  bool IsShutdownInProgress();
334
335 protected:
336  virtual ~SequencedWorkerPool();
337
338  virtual void OnDestruct() const OVERRIDE;
339
340 private:
341  friend class RefCountedThreadSafe<SequencedWorkerPool>;
342  friend class DeleteHelper<SequencedWorkerPool>;
343
344  class Inner;
345  class Worker;
346
347  const scoped_refptr<MessageLoopProxy> constructor_message_loop_;
348
349  // Avoid pulling in too many headers by putting (almost) everything
350  // into |inner_|.
351  const scoped_ptr<Inner> inner_;
352
353  DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPool);
354};
355
356}  // namespace base
357
358#endif  // BASE_THREADING_SEQUENCED_WORKER_POOL_H_
359