1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#ifndef BASE_THREADING_SEQUENCED_WORKER_POOL_H_
6#define BASE_THREADING_SEQUENCED_WORKER_POOL_H_
7
8#include <cstddef>
9#include <string>
10
11#include "base/base_export.h"
12#include "base/basictypes.h"
13#include "base/callback_forward.h"
14#include "base/memory/ref_counted.h"
15#include "base/memory/scoped_ptr.h"
16#include "base/task_runner.h"
17
18namespace tracked_objects {
19class Location;
20}  // namespace tracked_objects
21
22namespace base {
23
24class MessageLoopProxy;
25
26template <class T> class DeleteHelper;
27
28class SequencedTaskRunner;
29
30// A worker thread pool that enforces ordering between sets of tasks. It also
31// allows you to specify what should happen to your tasks on shutdown.
32//
33// To enforce ordering, get a unique sequence token from the pool and post all
34// tasks you want to order with the token. All tasks with the same token are
35// guaranteed to execute serially, though not necessarily on the same thread.
36// This means that:
37//
38//   - No two tasks with the same token will run at the same time.
39//
40//   - Given two tasks T1 and T2 with the same token such that T2 will
41//     run after T1, then T2 will start after T1 is destroyed.
42//
43//   - If T2 will run after T1, then all memory changes in T1 and T1's
44//     destruction will be visible to T2.
45//
46// Example:
47//   SequencedWorkerPool::SequenceToken token = pool.GetSequenceToken();
48//   pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN,
49//                                FROM_HERE, base::Bind(...));
50//   pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN,
51//                                FROM_HERE, base::Bind(...));
52//
53// You can make named sequence tokens to make it easier to share a token
54// across different components.
55//
56// You can also post tasks to the pool without ordering using PostWorkerTask.
57// These will be executed in an unspecified order. The order of execution
58// between tasks with different sequence tokens is also unspecified.
59//
60// This class may be leaked on shutdown to facilitate fast shutdown. The
61// expected usage, however, is to call Shutdown(), which correctly accounts
62// for CONTINUE_ON_SHUTDOWN behavior and is required for BLOCK_SHUTDOWN
63// behavior.
64//
65// Implementation note: This does not use a base::WorkerPool since that does
66// not enforce shutdown semantics or allow us to specify how many worker
67// threads to run. For the typical use case of random background work, we don't
68// necessarily want to be super aggressive about creating threads.
69//
70// Note that SequencedWorkerPool is RefCountedThreadSafe (inherited
71// from TaskRunner).
72//
73// Test-only code should wrap this in a base::SequencedWorkerPoolOwner to avoid
74// memory leaks. See http://crbug.com/273800
75class BASE_EXPORT SequencedWorkerPool : public TaskRunner {
76 public:
77  // Defines what should happen to a task posted to the worker pool on
78  // shutdown.
79  enum WorkerShutdown {
80    // Tasks posted with this mode which have not run at shutdown will be
81    // deleted rather than run, and any tasks with this mode running at
82    // shutdown will be ignored (the worker thread will not be joined).
83    //
84    // This option provides a nice way to post stuff you don't want blocking
85    // shutdown. For example, you might be doing a slow DNS lookup and if it's
86    // blocked on the OS, you may not want to stop shutdown, since the result
87    // doesn't really matter at that point.
88    //
89    // However, you need to be very careful what you do in your callback when
90    // you use this option. Since the thread will continue to run until the OS
91    // terminates the process, the app can be in the process of tearing down
92    // when you're running. This means any singletons or global objects you
93    // use may suddenly become invalid out from under you. For this reason,
94    // it's best to use this only for slow but simple operations like the DNS
95    // example.
96    CONTINUE_ON_SHUTDOWN,
97
98    // Tasks posted with this mode that have not started executing at
99    // shutdown will be deleted rather than executed. However, any tasks that
100    // have already begun executing when shutdown is called will be allowed
101    // to continue, and will block shutdown until completion.
102    //
103    // Note: Because Shutdown() may block while these tasks are executing,
104    // care must be taken to ensure that they do not block on the thread that
105    // called Shutdown(), as this may lead to deadlock.
106    SKIP_ON_SHUTDOWN,
107
108    // Tasks posted with this mode will block shutdown until they're
109    // executed. Since this can have significant performance implications,
110    // use sparingly.
111    //
112    // Generally, this should be used only for user data, for example, a task
113    // writing a preference file.
114    //
115    // If a task is posted during shutdown, it will not get run since the
116    // workers may already be stopped. In this case, the post operation will
117    // fail (return false) and the task will be deleted.
118    BLOCK_SHUTDOWN,
119  };
120
121  // Opaque identifier that defines sequencing of tasks posted to the worker
122  // pool.
123  class SequenceToken {
124   public:
125    SequenceToken() : id_(0) {}
126    ~SequenceToken() {}
127
128    bool Equals(const SequenceToken& other) const {
129      return id_ == other.id_;
130    }
131
132    // Returns false if current thread is executing an unsequenced task.
133    bool IsValid() const {
134      return id_ != 0;
135    }
136
137   private:
138    friend class SequencedWorkerPool;
139
140    explicit SequenceToken(int id) : id_(id) {}
141
142    int id_;
143  };
144
145  // Allows tests to perform certain actions.
146  class TestingObserver {
147   public:
148    virtual ~TestingObserver() {}
149    virtual void OnHasWork() = 0;
150    virtual void WillWaitForShutdown() = 0;
151    virtual void OnDestruct() = 0;
152  };
153
154  // Gets the SequencedToken of the current thread.
155  // If current thread is not a SequencedWorkerPool worker thread or is running
156  // an unsequenced task, returns an invalid SequenceToken.
157  static SequenceToken GetSequenceTokenForCurrentThread();
158
159  // When constructing a SequencedWorkerPool, there must be a
160  // MessageLoop on the current thread unless you plan to deliberately
161  // leak it.
162
163  // Pass the maximum number of threads (they will be lazily created as needed)
164  // and a prefix for the thread name to aid in debugging.
165  SequencedWorkerPool(size_t max_threads,
166                      const std::string& thread_name_prefix);
167
168  // Like above, but with |observer| for testing.  Does not take
169  // ownership of |observer|.
170  SequencedWorkerPool(size_t max_threads,
171                      const std::string& thread_name_prefix,
172                      TestingObserver* observer);
173
174  // Returns a unique token that can be used to sequence tasks posted to
175  // PostSequencedWorkerTask(). Valid tokens are always nonzero.
176  SequenceToken GetSequenceToken();
177
178  // Returns the sequence token associated with the given name. Calling this
179  // function multiple times with the same string will always produce the
180  // same sequence token. If the name has not been used before, a new token
181  // will be created.
182  SequenceToken GetNamedSequenceToken(const std::string& name);
183
184  // Returns a SequencedTaskRunner wrapper which posts to this
185  // SequencedWorkerPool using the given sequence token. Tasks with nonzero
186  // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay
187  // are posted with BLOCK_SHUTDOWN behavior.
188  scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunner(
189      SequenceToken token);
190
191  // Returns a SequencedTaskRunner wrapper which posts to this
192  // SequencedWorkerPool using the given sequence token. Tasks with nonzero
193  // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay
194  // are posted with the given shutdown behavior.
195  scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunnerWithShutdownBehavior(
196      SequenceToken token,
197      WorkerShutdown shutdown_behavior);
198
199  // Returns a TaskRunner wrapper which posts to this SequencedWorkerPool using
200  // the given shutdown behavior. Tasks with nonzero delay are posted with
201  // SKIP_ON_SHUTDOWN behavior and tasks with zero delay are posted with the
202  // given shutdown behavior.
203  scoped_refptr<TaskRunner> GetTaskRunnerWithShutdownBehavior(
204      WorkerShutdown shutdown_behavior);
205
206  // Posts the given task for execution in the worker pool. Tasks posted with
207  // this function will execute in an unspecified order on a background thread.
208  // Returns true if the task was posted. If your tasks have ordering
209  // requirements, see PostSequencedWorkerTask().
210  //
211  // This class will attempt to delete tasks that aren't run
212  // (non-block-shutdown semantics) but can't guarantee that this happens. If
213  // all worker threads are busy running CONTINUE_ON_SHUTDOWN tasks, there
214  // will be no workers available to delete these tasks. And there may be
215  // tasks with the same sequence token behind those CONTINUE_ON_SHUTDOWN
216  // tasks. Deleting those tasks before the previous one has completed could
217  // cause nondeterministic crashes because the task could be keeping some
218  // objects alive which do work in their destructor, which could voilate the
219  // assumptions of the running task.
220  //
221  // The task will be guaranteed to run to completion before shutdown
222  // (BLOCK_SHUTDOWN semantics).
223  //
224  // Returns true if the task was posted successfully. This may fail during
225  // shutdown regardless of the specified ShutdownBehavior.
226  bool PostWorkerTask(const tracked_objects::Location& from_here,
227                      const Closure& task);
228
229  // Same as PostWorkerTask but allows a delay to be specified (although doing
230  // so changes the shutdown behavior). The task will be run after the given
231  // delay has elapsed.
232  //
233  // If the delay is nonzero, the task won't be guaranteed to run to completion
234  // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs.
235  // If the delay is zero, this behaves exactly like PostWorkerTask, i.e. the
236  // task will be guaranteed to run to completion before shutdown
237  // (BLOCK_SHUTDOWN semantics).
238  bool PostDelayedWorkerTask(const tracked_objects::Location& from_here,
239                             const Closure& task,
240                             TimeDelta delay);
241
242  // Same as PostWorkerTask but allows specification of the shutdown behavior.
243  bool PostWorkerTaskWithShutdownBehavior(
244      const tracked_objects::Location& from_here,
245      const Closure& task,
246      WorkerShutdown shutdown_behavior);
247
248  // Like PostWorkerTask above, but provides sequencing semantics. This means
249  // that tasks posted with the same sequence token (see GetSequenceToken())
250  // are guaranteed to execute in order. This is useful in cases where you're
251  // doing operations that may depend on previous ones, like appending to a
252  // file.
253  //
254  // The task will be guaranteed to run to completion before shutdown
255  // (BLOCK_SHUTDOWN semantics).
256  //
257  // Returns true if the task was posted successfully. This may fail during
258  // shutdown regardless of the specified ShutdownBehavior.
259  bool PostSequencedWorkerTask(SequenceToken sequence_token,
260                               const tracked_objects::Location& from_here,
261                               const Closure& task);
262
263  // Like PostSequencedWorkerTask above, but allows you to specify a named
264  // token, which saves an extra call to GetNamedSequenceToken.
265  bool PostNamedSequencedWorkerTask(const std::string& token_name,
266                                    const tracked_objects::Location& from_here,
267                                    const Closure& task);
268
269  // Same as PostSequencedWorkerTask but allows a delay to be specified
270  // (although doing so changes the shutdown behavior). The task will be run
271  // after the given delay has elapsed.
272  //
273  // If the delay is nonzero, the task won't be guaranteed to run to completion
274  // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs.
275  // If the delay is zero, this behaves exactly like PostSequencedWorkerTask,
276  // i.e. the task will be guaranteed to run to completion before shutdown
277  // (BLOCK_SHUTDOWN semantics).
278  bool PostDelayedSequencedWorkerTask(
279      SequenceToken sequence_token,
280      const tracked_objects::Location& from_here,
281      const Closure& task,
282      TimeDelta delay);
283
284  // Same as PostSequencedWorkerTask but allows specification of the shutdown
285  // behavior.
286  bool PostSequencedWorkerTaskWithShutdownBehavior(
287      SequenceToken sequence_token,
288      const tracked_objects::Location& from_here,
289      const Closure& task,
290      WorkerShutdown shutdown_behavior);
291
292  // TaskRunner implementation. Forwards to PostDelayedWorkerTask().
293  virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
294                               const Closure& task,
295                               TimeDelta delay) OVERRIDE;
296  virtual bool RunsTasksOnCurrentThread() const OVERRIDE;
297
298  // Returns true if the current thread is processing a task with the given
299  // sequence_token.
300  bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
301
302  // Blocks until all pending tasks are complete. This should only be called in
303  // unit tests when you want to validate something that should have happened.
304  // This will not flush delayed tasks; delayed tasks get deleted.
305  //
306  // Note that calling this will not prevent other threads from posting work to
307  // the queue while the calling thread is waiting on Flush(). In this case,
308  // Flush will return only when there's no more work in the queue. Normally,
309  // this doesn't come up since in a test, all the work is being posted from
310  // the main thread.
311  void FlushForTesting();
312
313  // Spuriously signal that there is work to be done.
314  void SignalHasWorkForTesting();
315
316  // Implements the worker pool shutdown. This should be called during app
317  // shutdown, and will discard/join with appropriate tasks before returning.
318  // After this call, subsequent calls to post tasks will fail.
319  //
320  // Must be called from the same thread this object was constructed on.
321  void Shutdown() { Shutdown(0); }
322
323  // A variant that allows an arbitrary number of new blocking tasks to
324  // be posted during shutdown from within tasks that execute during shutdown.
325  // Only tasks designated as BLOCKING_SHUTDOWN will be allowed, and only if
326  // posted by tasks that are not designated as CONTINUE_ON_SHUTDOWN. Once
327  // the limit is reached, subsequent calls to post task fail in all cases.
328  //
329  // Must be called from the same thread this object was constructed on.
330  void Shutdown(int max_new_blocking_tasks_after_shutdown);
331
332  // Check if Shutdown was called for given threading pool. This method is used
333  // for aborting time consuming operation to avoid blocking shutdown.
334  //
335  // Can be called from any thread.
336  bool IsShutdownInProgress();
337
338 protected:
339  virtual ~SequencedWorkerPool();
340
341  virtual void OnDestruct() const OVERRIDE;
342
343 private:
344  friend class RefCountedThreadSafe<SequencedWorkerPool>;
345  friend class DeleteHelper<SequencedWorkerPool>;
346
347  class Inner;
348  class Worker;
349
350  const scoped_refptr<MessageLoopProxy> constructor_message_loop_;
351
352  // Avoid pulling in too many headers by putting (almost) everything
353  // into |inner_|.
354  const scoped_ptr<Inner> inner_;
355
356  DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPool);
357};
358
359}  // namespace base
360
361#endif  // BASE_THREADING_SEQUENCED_WORKER_POOL_H_
362