1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "base/threading/sequenced_worker_pool.h"
6
7#include <stdint.h>
8
9#include <list>
10#include <map>
11#include <set>
12#include <utility>
13#include <vector>
14
15#include "base/atomic_sequence_num.h"
16#include "base/callback.h"
17#include "base/compiler_specific.h"
18#include "base/critical_closure.h"
19#include "base/lazy_instance.h"
20#include "base/logging.h"
21#include "base/macros.h"
22#include "base/memory/linked_ptr.h"
23#include "base/stl_util.h"
24#include "base/strings/stringprintf.h"
25#include "base/synchronization/condition_variable.h"
26#include "base/synchronization/lock.h"
27#include "base/thread_task_runner_handle.h"
28#include "base/threading/platform_thread.h"
29#include "base/threading/simple_thread.h"
30#include "base/threading/thread_local.h"
31#include "base/threading/thread_restrictions.h"
32#include "base/time/time.h"
33#include "base/trace_event/trace_event.h"
34#include "base/tracked_objects.h"
35#include "build/build_config.h"
36
37#if defined(OS_MACOSX)
38#include "base/mac/scoped_nsautorelease_pool.h"
39#elif defined(OS_WIN)
40#include "base/win/scoped_com_initializer.h"
41#endif
42
43#if !defined(OS_NACL)
44#include "base/metrics/histogram.h"
45#endif
46
47namespace base {
48
49namespace {
50
51struct SequencedTask : public TrackingInfo  {
52  SequencedTask()
53      : sequence_token_id(0),
54        trace_id(0),
55        sequence_task_number(0),
56        shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
57
58  explicit SequencedTask(const tracked_objects::Location& from_here)
59      : base::TrackingInfo(from_here, TimeTicks()),
60        sequence_token_id(0),
61        trace_id(0),
62        sequence_task_number(0),
63        shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
64
65  ~SequencedTask() {}
66
67  int sequence_token_id;
68  int trace_id;
69  int64_t sequence_task_number;
70  SequencedWorkerPool::WorkerShutdown shutdown_behavior;
71  tracked_objects::Location posted_from;
72  Closure task;
73
74  // Non-delayed tasks and delayed tasks are managed together by time-to-run
75  // order. We calculate the time by adding the posted time and the given delay.
76  TimeTicks time_to_run;
77};
78
79struct SequencedTaskLessThan {
80 public:
81  bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const {
82    if (lhs.time_to_run < rhs.time_to_run)
83      return true;
84
85    if (lhs.time_to_run > rhs.time_to_run)
86      return false;
87
88    // If the time happen to match, then we use the sequence number to decide.
89    return lhs.sequence_task_number < rhs.sequence_task_number;
90  }
91};
92
93// SequencedWorkerPoolTaskRunner ---------------------------------------------
94// A TaskRunner which posts tasks to a SequencedWorkerPool with a
95// fixed ShutdownBehavior.
96//
97// Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
98class SequencedWorkerPoolTaskRunner : public TaskRunner {
99 public:
100  SequencedWorkerPoolTaskRunner(
101      const scoped_refptr<SequencedWorkerPool>& pool,
102      SequencedWorkerPool::WorkerShutdown shutdown_behavior);
103
104  // TaskRunner implementation
105  bool PostDelayedTask(const tracked_objects::Location& from_here,
106                       const Closure& task,
107                       TimeDelta delay) override;
108  bool RunsTasksOnCurrentThread() const override;
109
110 private:
111  ~SequencedWorkerPoolTaskRunner() override;
112
113  const scoped_refptr<SequencedWorkerPool> pool_;
114
115  const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
116
117  DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner);
118};
119
120SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner(
121    const scoped_refptr<SequencedWorkerPool>& pool,
122    SequencedWorkerPool::WorkerShutdown shutdown_behavior)
123    : pool_(pool),
124      shutdown_behavior_(shutdown_behavior) {
125}
126
127SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() {
128}
129
130bool SequencedWorkerPoolTaskRunner::PostDelayedTask(
131    const tracked_objects::Location& from_here,
132    const Closure& task,
133    TimeDelta delay) {
134  if (delay == TimeDelta()) {
135    return pool_->PostWorkerTaskWithShutdownBehavior(
136        from_here, task, shutdown_behavior_);
137  }
138  return pool_->PostDelayedWorkerTask(from_here, task, delay);
139}
140
141bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const {
142  return pool_->RunsTasksOnCurrentThread();
143}
144
145// SequencedWorkerPoolSequencedTaskRunner ------------------------------------
146// A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a
147// fixed sequence token.
148//
149// Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
150class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner {
151 public:
152  SequencedWorkerPoolSequencedTaskRunner(
153      const scoped_refptr<SequencedWorkerPool>& pool,
154      SequencedWorkerPool::SequenceToken token,
155      SequencedWorkerPool::WorkerShutdown shutdown_behavior);
156
157  // TaskRunner implementation
158  bool PostDelayedTask(const tracked_objects::Location& from_here,
159                       const Closure& task,
160                       TimeDelta delay) override;
161  bool RunsTasksOnCurrentThread() const override;
162
163  // SequencedTaskRunner implementation
164  bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
165                                  const Closure& task,
166                                  TimeDelta delay) override;
167
168 private:
169  ~SequencedWorkerPoolSequencedTaskRunner() override;
170
171  const scoped_refptr<SequencedWorkerPool> pool_;
172
173  const SequencedWorkerPool::SequenceToken token_;
174
175  const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
176
177  DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner);
178};
179
180SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner(
181    const scoped_refptr<SequencedWorkerPool>& pool,
182    SequencedWorkerPool::SequenceToken token,
183    SequencedWorkerPool::WorkerShutdown shutdown_behavior)
184    : pool_(pool),
185      token_(token),
186      shutdown_behavior_(shutdown_behavior) {
187}
188
189SequencedWorkerPoolSequencedTaskRunner::
190~SequencedWorkerPoolSequencedTaskRunner() {
191}
192
193bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask(
194    const tracked_objects::Location& from_here,
195    const Closure& task,
196    TimeDelta delay) {
197  if (delay == TimeDelta()) {
198    return pool_->PostSequencedWorkerTaskWithShutdownBehavior(
199        token_, from_here, task, shutdown_behavior_);
200  }
201  return pool_->PostDelayedSequencedWorkerTask(token_, from_here, task, delay);
202}
203
204bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const {
205  return pool_->IsRunningSequenceOnCurrentThread(token_);
206}
207
208bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask(
209    const tracked_objects::Location& from_here,
210    const Closure& task,
211    TimeDelta delay) {
212  // There's no way to run nested tasks, so simply forward to
213  // PostDelayedTask.
214  return PostDelayedTask(from_here, task, delay);
215}
216
217// Create a process-wide unique ID to represent this task in trace events. This
218// will be mangled with a Process ID hash to reduce the likelyhood of colliding
219// with MessageLoop pointers on other processes.
220uint64_t GetTaskTraceID(const SequencedTask& task, void* pool) {
221  return (static_cast<uint64_t>(task.trace_id) << 32) |
222         static_cast<uint64_t>(reinterpret_cast<intptr_t>(pool));
223}
224
225}  // namespace
226
227// Worker ---------------------------------------------------------------------
228
229class SequencedWorkerPool::Worker : public SimpleThread {
230 public:
231  // Hold a (cyclic) ref to |worker_pool|, since we want to keep it
232  // around as long as we are running.
233  Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool,
234         int thread_number,
235         const std::string& thread_name_prefix);
236  ~Worker() override;
237
238  // SimpleThread implementation. This actually runs the background thread.
239  void Run() override;
240
241  // Gets the worker for the current thread out of thread-local storage.
242  static Worker* GetForCurrentThread();
243
244  // Indicates that a task is about to be run. The parameters provide
245  // additional metainformation about the task being run.
246  void set_running_task_info(SequenceToken token,
247                             WorkerShutdown shutdown_behavior) {
248    is_processing_task_ = true;
249    task_sequence_token_ = token;
250    task_shutdown_behavior_ = shutdown_behavior;
251  }
252
253  // Indicates that the task has finished running.
254  void reset_running_task_info() { is_processing_task_ = false; }
255
256  // Whether the worker is processing a task.
257  bool is_processing_task() { return is_processing_task_; }
258
259  SequenceToken task_sequence_token() const {
260    DCHECK(is_processing_task_);
261    return task_sequence_token_;
262  }
263
264  WorkerShutdown task_shutdown_behavior() const {
265    DCHECK(is_processing_task_);
266    return task_shutdown_behavior_;
267  }
268
269  scoped_refptr<SequencedWorkerPool> worker_pool() const {
270    return worker_pool_;
271  }
272
273 private:
274  static LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky
275      lazy_tls_ptr_;
276
277  scoped_refptr<SequencedWorkerPool> worker_pool_;
278  // The sequence token of the task being processed. Only valid when
279  // is_processing_task_ is true.
280  SequenceToken task_sequence_token_;
281  // The shutdown behavior of the task being processed. Only valid when
282  // is_processing_task_ is true.
283  WorkerShutdown task_shutdown_behavior_;
284  // Whether the Worker is processing a task.
285  bool is_processing_task_;
286
287  DISALLOW_COPY_AND_ASSIGN(Worker);
288};
289
290// Inner ----------------------------------------------------------------------
291
292class SequencedWorkerPool::Inner {
293 public:
294  // Take a raw pointer to |worker| to avoid cycles (since we're owned
295  // by it).
296  Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
297        const std::string& thread_name_prefix,
298        TestingObserver* observer);
299
300  ~Inner();
301
302  static SequenceToken GetSequenceToken();
303
304  SequenceToken GetNamedSequenceToken(const std::string& name);
305
306  // This function accepts a name and an ID. If the name is null, the
307  // token ID is used. This allows us to implement the optional name lookup
308  // from a single function without having to enter the lock a separate time.
309  bool PostTask(const std::string* optional_token_name,
310                SequenceToken sequence_token,
311                WorkerShutdown shutdown_behavior,
312                const tracked_objects::Location& from_here,
313                const Closure& task,
314                TimeDelta delay);
315
316  bool RunsTasksOnCurrentThread() const;
317
318  bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
319
320  bool IsRunningSequence(SequenceToken sequence_token) const;
321
322  void SetRunningTaskInfoForCurrentThread(SequenceToken sequence_token,
323                                          WorkerShutdown shutdown_behavior);
324
325  void CleanupForTesting();
326
327  void SignalHasWorkForTesting();
328
329  int GetWorkSignalCountForTesting() const;
330
331  void Shutdown(int max_blocking_tasks_after_shutdown);
332
333  bool IsShutdownInProgress();
334
335  // Runs the worker loop on the background thread.
336  void ThreadLoop(Worker* this_worker);
337
338 private:
339  enum GetWorkStatus {
340    GET_WORK_FOUND,
341    GET_WORK_NOT_FOUND,
342    GET_WORK_WAIT,
343  };
344
345  enum CleanupState {
346    CLEANUP_REQUESTED,
347    CLEANUP_STARTING,
348    CLEANUP_RUNNING,
349    CLEANUP_FINISHING,
350    CLEANUP_DONE,
351  };
352
353  // Called from within the lock, this converts the given token name into a
354  // token ID, creating a new one if necessary.
355  int LockedGetNamedTokenID(const std::string& name);
356
357  // Called from within the lock, this returns the next sequence task number.
358  int64_t LockedGetNextSequenceTaskNumber();
359
360  // Gets new task. There are 3 cases depending on the return value:
361  //
362  // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should
363  //    be run immediately.
364  // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run,
365  //    and |task| is not filled in. In this case, the caller should wait until
366  //    a task is posted.
367  // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run
368  //    immediately, and |task| is not filled in. Likewise, |wait_time| is
369  //    filled in the time to wait until the next task to run. In this case, the
370  //    caller should wait the time.
371  //
372  // In any case, the calling code should clear the given
373  // delete_these_outside_lock vector the next time the lock is released.
374  // See the implementation for a more detailed description.
375  GetWorkStatus GetWork(SequencedTask* task,
376                        TimeDelta* wait_time,
377                        std::vector<Closure>* delete_these_outside_lock);
378
379  void HandleCleanup();
380
381  // Peforms init and cleanup around running the given task. WillRun...
382  // returns the value from PrepareToStartAdditionalThreadIfNecessary.
383  // The calling code should call FinishStartingAdditionalThread once the
384  // lock is released if the return values is nonzero.
385  int WillRunWorkerTask(const SequencedTask& task);
386  void DidRunWorkerTask(const SequencedTask& task);
387
388  // Returns true if there are no threads currently running the given
389  // sequence token.
390  bool IsSequenceTokenRunnable(int sequence_token_id) const;
391
392  // Checks if all threads are busy and the addition of one more could run an
393  // additional task waiting in the queue. This must be called from within
394  // the lock.
395  //
396  // If another thread is helpful, this will mark the thread as being in the
397  // process of starting and returns the index of the new thread which will be
398  // 0 or more. The caller should then call FinishStartingAdditionalThread to
399  // complete initialization once the lock is released.
400  //
401  // If another thread is not necessary, returne 0;
402  //
403  // See the implementedion for more.
404  int PrepareToStartAdditionalThreadIfHelpful();
405
406  // The second part of thread creation after
407  // PrepareToStartAdditionalThreadIfHelpful with the thread number it
408  // generated. This actually creates the thread and should be called outside
409  // the lock to avoid blocking important work starting a thread in the lock.
410  void FinishStartingAdditionalThread(int thread_number);
411
412  // Signal |has_work_| and increment |has_work_signal_count_|.
413  void SignalHasWork();
414
415  // Checks whether there is work left that's blocking shutdown. Must be
416  // called inside the lock.
417  bool CanShutdown() const;
418
419  SequencedWorkerPool* const worker_pool_;
420
421  // The last sequence number used. Managed by GetSequenceToken, since this
422  // only does threadsafe increment operations, you do not need to hold the
423  // lock. This is class-static to make SequenceTokens issued by
424  // GetSequenceToken unique across SequencedWorkerPool instances.
425  static base::StaticAtomicSequenceNumber g_last_sequence_number_;
426
427  // This lock protects |everything in this class|. Do not read or modify
428  // anything without holding this lock. Do not block while holding this
429  // lock.
430  mutable Lock lock_;
431
432  // Condition variable that is waited on by worker threads until new
433  // tasks are posted or shutdown starts.
434  ConditionVariable has_work_cv_;
435
436  // Condition variable that is waited on by non-worker threads (in
437  // Shutdown()) until CanShutdown() goes to true.
438  ConditionVariable can_shutdown_cv_;
439
440  // The maximum number of worker threads we'll create.
441  const size_t max_threads_;
442
443  const std::string thread_name_prefix_;
444
445  // Associates all known sequence token names with their IDs.
446  std::map<std::string, int> named_sequence_tokens_;
447
448  // Owning pointers to all threads we've created so far, indexed by
449  // ID. Since we lazily create threads, this may be less than
450  // max_threads_ and will be initially empty.
451  typedef std::map<PlatformThreadId, linked_ptr<Worker> > ThreadMap;
452  ThreadMap threads_;
453
454  // Set to true when we're in the process of creating another thread.
455  // See PrepareToStartAdditionalThreadIfHelpful for more.
456  bool thread_being_created_;
457
458  // Number of threads currently waiting for work.
459  size_t waiting_thread_count_;
460
461  // Number of threads currently running tasks that have the BLOCK_SHUTDOWN
462  // or SKIP_ON_SHUTDOWN flag set.
463  size_t blocking_shutdown_thread_count_;
464
465  // A set of all pending tasks in time-to-run order. These are tasks that are
466  // either waiting for a thread to run on, waiting for their time to run,
467  // or blocked on a previous task in their sequence. We have to iterate over
468  // the tasks by time-to-run order, so we use the set instead of the
469  // traditional priority_queue.
470  typedef std::set<SequencedTask, SequencedTaskLessThan> PendingTaskSet;
471  PendingTaskSet pending_tasks_;
472
473  // The next sequence number for a new sequenced task.
474  int64_t next_sequence_task_number_;
475
476  // Number of tasks in the pending_tasks_ list that are marked as blocking
477  // shutdown.
478  size_t blocking_shutdown_pending_task_count_;
479
480  // Lists all sequence tokens currently executing.
481  std::set<int> current_sequences_;
482
483  // An ID for each posted task to distinguish the task from others in traces.
484  int trace_id_;
485
486  // Set when Shutdown is called and no further tasks should be
487  // allowed, though we may still be running existing tasks.
488  bool shutdown_called_;
489
490  // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown()
491  // has been called.
492  int max_blocking_tasks_after_shutdown_;
493
494  // State used to cleanup for testing, all guarded by lock_.
495  CleanupState cleanup_state_;
496  size_t cleanup_idlers_;
497  ConditionVariable cleanup_cv_;
498
499  TestingObserver* const testing_observer_;
500
501  DISALLOW_COPY_AND_ASSIGN(Inner);
502};
503
504// Worker definitions ---------------------------------------------------------
505
506SequencedWorkerPool::Worker::Worker(
507    const scoped_refptr<SequencedWorkerPool>& worker_pool,
508    int thread_number,
509    const std::string& prefix)
510    : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)),
511      worker_pool_(worker_pool),
512      task_shutdown_behavior_(BLOCK_SHUTDOWN),
513      is_processing_task_(false) {
514  Start();
515}
516
517SequencedWorkerPool::Worker::~Worker() {
518}
519
520void SequencedWorkerPool::Worker::Run() {
521#if defined(OS_WIN)
522  win::ScopedCOMInitializer com_initializer;
523#endif
524
525  // Store a pointer to this worker in thread local storage for static function
526  // access.
527  DCHECK(!lazy_tls_ptr_.Get().Get());
528  lazy_tls_ptr_.Get().Set(this);
529
530  // Just jump back to the Inner object to run the thread, since it has all the
531  // tracking information and queues. It might be more natural to implement
532  // using DelegateSimpleThread and have Inner implement the Delegate to avoid
533  // having these worker objects at all, but that method lacks the ability to
534  // send thread-specific information easily to the thread loop.
535  worker_pool_->inner_->ThreadLoop(this);
536  // Release our cyclic reference once we're done.
537  worker_pool_ = nullptr;
538}
539
540// static
541SequencedWorkerPool::Worker*
542SequencedWorkerPool::Worker::GetForCurrentThread() {
543  // Don't construct lazy instance on check.
544  if (lazy_tls_ptr_ == nullptr)
545    return nullptr;
546
547  return lazy_tls_ptr_.Get().Get();
548}
549
550// static
551LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky
552    SequencedWorkerPool::Worker::lazy_tls_ptr_ = LAZY_INSTANCE_INITIALIZER;
553
554// Inner definitions ---------------------------------------------------------
555
556SequencedWorkerPool::Inner::Inner(
557    SequencedWorkerPool* worker_pool,
558    size_t max_threads,
559    const std::string& thread_name_prefix,
560    TestingObserver* observer)
561    : worker_pool_(worker_pool),
562      lock_(),
563      has_work_cv_(&lock_),
564      can_shutdown_cv_(&lock_),
565      max_threads_(max_threads),
566      thread_name_prefix_(thread_name_prefix),
567      thread_being_created_(false),
568      waiting_thread_count_(0),
569      blocking_shutdown_thread_count_(0),
570      next_sequence_task_number_(0),
571      blocking_shutdown_pending_task_count_(0),
572      trace_id_(0),
573      shutdown_called_(false),
574      max_blocking_tasks_after_shutdown_(0),
575      cleanup_state_(CLEANUP_DONE),
576      cleanup_idlers_(0),
577      cleanup_cv_(&lock_),
578      testing_observer_(observer) {}
579
580SequencedWorkerPool::Inner::~Inner() {
581  // You must call Shutdown() before destroying the pool.
582  DCHECK(shutdown_called_);
583
584  // Need to explicitly join with the threads before they're destroyed or else
585  // they will be running when our object is half torn down.
586  for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it)
587    it->second->Join();
588  threads_.clear();
589
590  if (testing_observer_)
591    testing_observer_->OnDestruct();
592}
593
594// static
595SequencedWorkerPool::SequenceToken
596SequencedWorkerPool::Inner::GetSequenceToken() {
597  // Need to add one because StaticAtomicSequenceNumber starts at zero, which
598  // is used as a sentinel value in SequenceTokens.
599  return SequenceToken(g_last_sequence_number_.GetNext() + 1);
600}
601
602SequencedWorkerPool::SequenceToken
603SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) {
604  AutoLock lock(lock_);
605  return SequenceToken(LockedGetNamedTokenID(name));
606}
607
608bool SequencedWorkerPool::Inner::PostTask(
609    const std::string* optional_token_name,
610    SequenceToken sequence_token,
611    WorkerShutdown shutdown_behavior,
612    const tracked_objects::Location& from_here,
613    const Closure& task,
614    TimeDelta delay) {
615  DCHECK(delay == TimeDelta() || shutdown_behavior == SKIP_ON_SHUTDOWN);
616  SequencedTask sequenced(from_here);
617  sequenced.sequence_token_id = sequence_token.id_;
618  sequenced.shutdown_behavior = shutdown_behavior;
619  sequenced.posted_from = from_here;
620  sequenced.task =
621      shutdown_behavior == BLOCK_SHUTDOWN ?
622      base::MakeCriticalClosure(task) : task;
623  sequenced.time_to_run = TimeTicks::Now() + delay;
624
625  int create_thread_id = 0;
626  {
627    AutoLock lock(lock_);
628    if (shutdown_called_) {
629      // Don't allow a new task to be posted if it doesn't block shutdown.
630      if (shutdown_behavior != BLOCK_SHUTDOWN)
631        return false;
632
633      // If the current thread is running a task, and that task doesn't block
634      // shutdown, then it shouldn't be allowed to post any more tasks.
635      ThreadMap::const_iterator found =
636          threads_.find(PlatformThread::CurrentId());
637      if (found != threads_.end() && found->second->is_processing_task() &&
638          found->second->task_shutdown_behavior() != BLOCK_SHUTDOWN) {
639        return false;
640      }
641
642      if (max_blocking_tasks_after_shutdown_ <= 0) {
643        DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed";
644        return false;
645      }
646      max_blocking_tasks_after_shutdown_ -= 1;
647    }
648
649    // The trace_id is used for identifying the task in about:tracing.
650    sequenced.trace_id = trace_id_++;
651
652    TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
653        "SequencedWorkerPool::Inner::PostTask",
654        TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))),
655        TRACE_EVENT_FLAG_FLOW_OUT);
656
657    sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber();
658
659    // Now that we have the lock, apply the named token rules.
660    if (optional_token_name)
661      sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
662
663    pending_tasks_.insert(sequenced);
664    if (shutdown_behavior == BLOCK_SHUTDOWN)
665      blocking_shutdown_pending_task_count_++;
666
667    create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
668  }
669
670  // Actually start the additional thread or signal an existing one now that
671  // we're outside the lock.
672  if (create_thread_id)
673    FinishStartingAdditionalThread(create_thread_id);
674  else
675    SignalHasWork();
676
677  return true;
678}
679
680bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
681  AutoLock lock(lock_);
682  return ContainsKey(threads_, PlatformThread::CurrentId());
683}
684
685bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
686    SequenceToken sequence_token) const {
687  AutoLock lock(lock_);
688  ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
689  if (found == threads_.end())
690    return false;
691  return found->second->is_processing_task() &&
692         sequence_token.Equals(found->second->task_sequence_token());
693}
694
695bool SequencedWorkerPool::Inner::IsRunningSequence(
696    SequenceToken sequence_token) const {
697  DCHECK(sequence_token.IsValid());
698  AutoLock lock(lock_);
699  return !IsSequenceTokenRunnable(sequence_token.id_);
700}
701
702void SequencedWorkerPool::Inner::SetRunningTaskInfoForCurrentThread(
703    SequenceToken sequence_token,
704    WorkerShutdown shutdown_behavior) {
705  AutoLock lock(lock_);
706  ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
707  DCHECK(found != threads_.end());
708  DCHECK(found->second->is_processing_task());
709  DCHECK(!found->second->task_sequence_token().IsValid());
710  found->second->set_running_task_info(sequence_token, shutdown_behavior);
711
712  // Mark the sequence token as in use.
713  bool success = current_sequences_.insert(sequence_token.id_).second;
714  DCHECK(success);
715}
716
717// See https://code.google.com/p/chromium/issues/detail?id=168415
718void SequencedWorkerPool::Inner::CleanupForTesting() {
719  DCHECK(!RunsTasksOnCurrentThread());
720  base::ThreadRestrictions::ScopedAllowWait allow_wait;
721  AutoLock lock(lock_);
722  CHECK_EQ(CLEANUP_DONE, cleanup_state_);
723  if (shutdown_called_)
724    return;
725  if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size())
726    return;
727  cleanup_state_ = CLEANUP_REQUESTED;
728  cleanup_idlers_ = 0;
729  has_work_cv_.Signal();
730  while (cleanup_state_ != CLEANUP_DONE)
731    cleanup_cv_.Wait();
732}
733
734void SequencedWorkerPool::Inner::SignalHasWorkForTesting() {
735  SignalHasWork();
736}
737
738void SequencedWorkerPool::Inner::Shutdown(
739    int max_new_blocking_tasks_after_shutdown) {
740  DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0);
741  {
742    AutoLock lock(lock_);
743    // Cleanup and Shutdown should not be called concurrently.
744    CHECK_EQ(CLEANUP_DONE, cleanup_state_);
745    if (shutdown_called_)
746      return;
747    shutdown_called_ = true;
748    max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
749
750    // Tickle the threads. This will wake up a waiting one so it will know that
751    // it can exit, which in turn will wake up any other waiting ones.
752    SignalHasWork();
753
754    // There are no pending or running tasks blocking shutdown, we're done.
755    if (CanShutdown())
756      return;
757  }
758
759  // If we're here, then something is blocking shutdown.  So wait for
760  // CanShutdown() to go to true.
761
762  if (testing_observer_)
763    testing_observer_->WillWaitForShutdown();
764
765#if !defined(OS_NACL)
766  TimeTicks shutdown_wait_begin = TimeTicks::Now();
767#endif
768
769  {
770    base::ThreadRestrictions::ScopedAllowWait allow_wait;
771    AutoLock lock(lock_);
772    while (!CanShutdown())
773      can_shutdown_cv_.Wait();
774  }
775#if !defined(OS_NACL)
776  UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime",
777                      TimeTicks::Now() - shutdown_wait_begin);
778#endif
779}
780
781bool SequencedWorkerPool::Inner::IsShutdownInProgress() {
782    AutoLock lock(lock_);
783    return shutdown_called_;
784}
785
786void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
787  {
788    AutoLock lock(lock_);
789    DCHECK(thread_being_created_);
790    thread_being_created_ = false;
791    std::pair<ThreadMap::iterator, bool> result =
792        threads_.insert(
793            std::make_pair(this_worker->tid(), make_linked_ptr(this_worker)));
794    DCHECK(result.second);
795
796    while (true) {
797#if defined(OS_MACOSX)
798      base::mac::ScopedNSAutoreleasePool autorelease_pool;
799#endif
800
801      HandleCleanup();
802
803      // See GetWork for what delete_these_outside_lock is doing.
804      SequencedTask task;
805      TimeDelta wait_time;
806      std::vector<Closure> delete_these_outside_lock;
807      GetWorkStatus status =
808          GetWork(&task, &wait_time, &delete_these_outside_lock);
809      if (status == GET_WORK_FOUND) {
810        TRACE_EVENT_WITH_FLOW2(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
811            "SequencedWorkerPool::Inner::ThreadLoop",
812            TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))),
813            TRACE_EVENT_FLAG_FLOW_IN,
814            "src_file", task.posted_from.file_name(),
815            "src_func", task.posted_from.function_name());
816        int new_thread_id = WillRunWorkerTask(task);
817        {
818          AutoUnlock unlock(lock_);
819          // There may be more work available, so wake up another
820          // worker thread. (Technically not required, since we
821          // already get a signal for each new task, but it doesn't
822          // hurt.)
823          SignalHasWork();
824          delete_these_outside_lock.clear();
825
826          // Complete thread creation outside the lock if necessary.
827          if (new_thread_id)
828            FinishStartingAdditionalThread(new_thread_id);
829
830          this_worker->set_running_task_info(
831              SequenceToken(task.sequence_token_id), task.shutdown_behavior);
832
833          tracked_objects::TaskStopwatch stopwatch;
834          stopwatch.Start();
835          task.task.Run();
836          stopwatch.Stop();
837
838          tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(
839              task, stopwatch);
840
841          // Update the sequence token in case it has been set from within the
842          // task, so it can be removed from the set of currently running
843          // sequences in DidRunWorkerTask() below.
844          task.sequence_token_id = this_worker->task_sequence_token().id_;
845
846          // Make sure our task is erased outside the lock for the
847          // same reason we do this with delete_these_oustide_lock.
848          // Also, do it before calling reset_running_task_info() so
849          // that sequence-checking from within the task's destructor
850          // still works.
851          task.task = Closure();
852
853          this_worker->reset_running_task_info();
854        }
855        DidRunWorkerTask(task);  // Must be done inside the lock.
856      } else if (cleanup_state_ == CLEANUP_RUNNING) {
857        switch (status) {
858          case GET_WORK_WAIT: {
859              AutoUnlock unlock(lock_);
860              delete_these_outside_lock.clear();
861            }
862            break;
863          case GET_WORK_NOT_FOUND:
864            CHECK(delete_these_outside_lock.empty());
865            cleanup_state_ = CLEANUP_FINISHING;
866            cleanup_cv_.Broadcast();
867            break;
868          default:
869            NOTREACHED();
870        }
871      } else {
872        // When we're terminating and there's no more work, we can
873        // shut down, other workers can complete any pending or new tasks.
874        // We can get additional tasks posted after shutdown_called_ is set
875        // but only worker threads are allowed to post tasks at that time, and
876        // the workers responsible for posting those tasks will be available
877        // to run them. Also, there may be some tasks stuck behind running
878        // ones with the same sequence token, but additional threads won't
879        // help this case.
880        if (shutdown_called_ && blocking_shutdown_pending_task_count_ == 0) {
881          AutoUnlock unlock(lock_);
882          delete_these_outside_lock.clear();
883          break;
884        }
885
886        // No work was found, but there are tasks that need deletion. The
887        // deletion must happen outside of the lock.
888        if (delete_these_outside_lock.size()) {
889          AutoUnlock unlock(lock_);
890          delete_these_outside_lock.clear();
891
892          // Since the lock has been released, |status| may no longer be
893          // accurate. It might read GET_WORK_WAIT even if there are tasks
894          // ready to perform work. Jump to the top of the loop to recalculate
895          // |status|.
896          continue;
897        }
898
899        waiting_thread_count_++;
900
901        switch (status) {
902          case GET_WORK_NOT_FOUND:
903            has_work_cv_.Wait();
904            break;
905          case GET_WORK_WAIT:
906            has_work_cv_.TimedWait(wait_time);
907            break;
908          default:
909            NOTREACHED();
910        }
911        waiting_thread_count_--;
912      }
913    }
914  }  // Release lock_.
915
916  // We noticed we should exit. Wake up the next worker so it knows it should
917  // exit as well (because the Shutdown() code only signals once).
918  SignalHasWork();
919
920  // Possibly unblock shutdown.
921  can_shutdown_cv_.Signal();
922}
923
924void SequencedWorkerPool::Inner::HandleCleanup() {
925  lock_.AssertAcquired();
926  if (cleanup_state_ == CLEANUP_DONE)
927    return;
928  if (cleanup_state_ == CLEANUP_REQUESTED) {
929    // We win, we get to do the cleanup as soon as the others wise up and idle.
930    cleanup_state_ = CLEANUP_STARTING;
931    while (thread_being_created_ ||
932           cleanup_idlers_ != threads_.size() - 1) {
933      has_work_cv_.Signal();
934      cleanup_cv_.Wait();
935    }
936    cleanup_state_ = CLEANUP_RUNNING;
937    return;
938  }
939  if (cleanup_state_ == CLEANUP_STARTING) {
940    // Another worker thread is cleaning up, we idle here until thats done.
941    ++cleanup_idlers_;
942    cleanup_cv_.Broadcast();
943    while (cleanup_state_ != CLEANUP_FINISHING) {
944      cleanup_cv_.Wait();
945    }
946    --cleanup_idlers_;
947    cleanup_cv_.Broadcast();
948    return;
949  }
950  if (cleanup_state_ == CLEANUP_FINISHING) {
951    // We wait for all idlers to wake up prior to being DONE.
952    while (cleanup_idlers_ != 0) {
953      cleanup_cv_.Broadcast();
954      cleanup_cv_.Wait();
955    }
956    if (cleanup_state_ == CLEANUP_FINISHING) {
957      cleanup_state_ = CLEANUP_DONE;
958      cleanup_cv_.Signal();
959    }
960    return;
961  }
962}
963
964int SequencedWorkerPool::Inner::LockedGetNamedTokenID(
965    const std::string& name) {
966  lock_.AssertAcquired();
967  DCHECK(!name.empty());
968
969  std::map<std::string, int>::const_iterator found =
970      named_sequence_tokens_.find(name);
971  if (found != named_sequence_tokens_.end())
972    return found->second;  // Got an existing one.
973
974  // Create a new one for this name.
975  SequenceToken result = GetSequenceToken();
976  named_sequence_tokens_.insert(std::make_pair(name, result.id_));
977  return result.id_;
978}
979
980int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() {
981  lock_.AssertAcquired();
982  // We assume that we never create enough tasks to wrap around.
983  return next_sequence_task_number_++;
984}
985
986SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
987    SequencedTask* task,
988    TimeDelta* wait_time,
989    std::vector<Closure>* delete_these_outside_lock) {
990  lock_.AssertAcquired();
991
992  // Find the next task with a sequence token that's not currently in use.
993  // If the token is in use, that means another thread is running something
994  // in that sequence, and we can't run it without going out-of-order.
995  //
996  // This algorithm is simple and fair, but inefficient in some cases. For
997  // example, say somebody schedules 1000 slow tasks with the same sequence
998  // number. We'll have to go through all those tasks each time we feel like
999  // there might be work to schedule. If this proves to be a problem, we
1000  // should make this more efficient.
1001  //
1002  // One possible enhancement would be to keep a map from sequence ID to a
1003  // list of pending but currently blocked SequencedTasks for that ID.
1004  // When a worker finishes a task of one sequence token, it can pick up the
1005  // next one from that token right away.
1006  //
1007  // This may lead to starvation if there are sufficient numbers of sequences
1008  // in use. To alleviate this, we could add an incrementing priority counter
1009  // to each SequencedTask. Then maintain a priority_queue of all runnable
1010  // tasks, sorted by priority counter. When a sequenced task is completed
1011  // we would pop the head element off of that tasks pending list and add it
1012  // to the priority queue. Then we would run the first item in the priority
1013  // queue.
1014
1015  GetWorkStatus status = GET_WORK_NOT_FOUND;
1016  int unrunnable_tasks = 0;
1017  PendingTaskSet::iterator i = pending_tasks_.begin();
1018  // We assume that the loop below doesn't take too long and so we can just do
1019  // a single call to TimeTicks::Now().
1020  const TimeTicks current_time = TimeTicks::Now();
1021  while (i != pending_tasks_.end()) {
1022    if (!IsSequenceTokenRunnable(i->sequence_token_id)) {
1023      unrunnable_tasks++;
1024      ++i;
1025      continue;
1026    }
1027
1028    if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) {
1029      // We're shutting down and the task we just found isn't blocking
1030      // shutdown. Delete it and get more work.
1031      //
1032      // Note that we do not want to delete unrunnable tasks. Deleting a task
1033      // can have side effects (like freeing some objects) and deleting a
1034      // task that's supposed to run after one that's currently running could
1035      // cause an obscure crash.
1036      //
1037      // We really want to delete these tasks outside the lock in case the
1038      // closures are holding refs to objects that want to post work from
1039      // their destructorss (which would deadlock). The closures are
1040      // internally refcounted, so we just need to keep a copy of them alive
1041      // until the lock is exited. The calling code can just clear() the
1042      // vector they passed to us once the lock is exited to make this
1043      // happen.
1044      delete_these_outside_lock->push_back(i->task);
1045      pending_tasks_.erase(i++);
1046      continue;
1047    }
1048
1049    if (i->time_to_run > current_time) {
1050      // The time to run has not come yet.
1051      *wait_time = i->time_to_run - current_time;
1052      status = GET_WORK_WAIT;
1053      if (cleanup_state_ == CLEANUP_RUNNING) {
1054        // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop.
1055        delete_these_outside_lock->push_back(i->task);
1056        pending_tasks_.erase(i);
1057      }
1058      break;
1059    }
1060
1061    // Found a runnable task.
1062    *task = *i;
1063    pending_tasks_.erase(i);
1064    if (task->shutdown_behavior == BLOCK_SHUTDOWN) {
1065      blocking_shutdown_pending_task_count_--;
1066    }
1067
1068    status = GET_WORK_FOUND;
1069    break;
1070  }
1071
1072  return status;
1073}
1074
1075int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
1076  lock_.AssertAcquired();
1077
1078  // Mark the task's sequence number as in use.
1079  if (task.sequence_token_id)
1080    current_sequences_.insert(task.sequence_token_id);
1081
1082  // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN
1083  // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread
1084  // completes.
1085  if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN)
1086    blocking_shutdown_thread_count_++;
1087
1088  // We just picked up a task. Since StartAdditionalThreadIfHelpful only
1089  // creates a new thread if there is no free one, there is a race when posting
1090  // tasks that many tasks could have been posted before a thread started
1091  // running them, so only one thread would have been created. So we also check
1092  // whether we should create more threads after removing our task from the
1093  // queue, which also has the nice side effect of creating the workers from
1094  // background threads rather than the main thread of the app.
1095  //
1096  // If another thread wasn't created, we want to wake up an existing thread
1097  // if there is one waiting to pick up the next task.
1098  //
1099  // Note that we really need to do this *before* running the task, not
1100  // after. Otherwise, if more than one task is posted, the creation of the
1101  // second thread (since we only create one at a time) will be blocked by
1102  // the execution of the first task, which could be arbitrarily long.
1103  return PrepareToStartAdditionalThreadIfHelpful();
1104}
1105
1106void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
1107  lock_.AssertAcquired();
1108
1109  if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) {
1110    DCHECK_GT(blocking_shutdown_thread_count_, 0u);
1111    blocking_shutdown_thread_count_--;
1112  }
1113
1114  if (task.sequence_token_id)
1115    current_sequences_.erase(task.sequence_token_id);
1116}
1117
1118bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
1119    int sequence_token_id) const {
1120  lock_.AssertAcquired();
1121  return !sequence_token_id ||
1122      current_sequences_.find(sequence_token_id) ==
1123          current_sequences_.end();
1124}
1125
1126int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
1127  lock_.AssertAcquired();
1128  // How thread creation works:
1129  //
1130  // We'de like to avoid creating threads with the lock held. However, we
1131  // need to be sure that we have an accurate accounting of the threads for
1132  // proper Joining and deltion on shutdown.
1133  //
1134  // We need to figure out if we need another thread with the lock held, which
1135  // is what this function does. It then marks us as in the process of creating
1136  // a thread. When we do shutdown, we wait until the thread_being_created_
1137  // flag is cleared, which ensures that the new thread is properly added to
1138  // all the data structures and we can't leak it. Once shutdown starts, we'll
1139  // refuse to create more threads or they would be leaked.
1140  //
1141  // Note that this creates a mostly benign race condition on shutdown that
1142  // will cause fewer workers to be created than one would expect. It isn't
1143  // much of an issue in real life, but affects some tests. Since we only spawn
1144  // one worker at a time, the following sequence of events can happen:
1145  //
1146  //  1. Main thread posts a bunch of unrelated tasks that would normally be
1147  //     run on separate threads.
1148  //  2. The first task post causes us to start a worker. Other tasks do not
1149  //     cause a worker to start since one is pending.
1150  //  3. Main thread initiates shutdown.
1151  //  4. No more threads are created since the shutdown_called_ flag is set.
1152  //
1153  // The result is that one may expect that max_threads_ workers to be created
1154  // given the workload, but in reality fewer may be created because the
1155  // sequence of thread creation on the background threads is racing with the
1156  // shutdown call.
1157  if (!shutdown_called_ &&
1158      !thread_being_created_ &&
1159      cleanup_state_ == CLEANUP_DONE &&
1160      threads_.size() < max_threads_ &&
1161      waiting_thread_count_ == 0) {
1162    // We could use an additional thread if there's work to be done.
1163    for (PendingTaskSet::const_iterator i = pending_tasks_.begin();
1164         i != pending_tasks_.end(); ++i) {
1165      if (IsSequenceTokenRunnable(i->sequence_token_id)) {
1166        // Found a runnable task, mark the thread as being started.
1167        thread_being_created_ = true;
1168        return static_cast<int>(threads_.size() + 1);
1169      }
1170    }
1171  }
1172  return 0;
1173}
1174
1175void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
1176    int thread_number) {
1177  // Called outside of the lock.
1178  DCHECK_GT(thread_number, 0);
1179
1180  // The worker is assigned to the list when the thread actually starts, which
1181  // will manage the memory of the pointer.
1182  new Worker(worker_pool_, thread_number, thread_name_prefix_);
1183}
1184
1185void SequencedWorkerPool::Inner::SignalHasWork() {
1186  has_work_cv_.Signal();
1187  if (testing_observer_) {
1188    testing_observer_->OnHasWork();
1189  }
1190}
1191
1192bool SequencedWorkerPool::Inner::CanShutdown() const {
1193  lock_.AssertAcquired();
1194  // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
1195  return !thread_being_created_ &&
1196         blocking_shutdown_thread_count_ == 0 &&
1197         blocking_shutdown_pending_task_count_ == 0;
1198}
1199
1200base::StaticAtomicSequenceNumber
1201SequencedWorkerPool::Inner::g_last_sequence_number_;
1202
1203// SequencedWorkerPool --------------------------------------------------------
1204
1205std::string SequencedWorkerPool::SequenceToken::ToString() const {
1206  return base::StringPrintf("[%d]", id_);
1207}
1208
1209// static
1210SequencedWorkerPool::SequenceToken
1211SequencedWorkerPool::GetSequenceTokenForCurrentThread() {
1212  Worker* worker = Worker::GetForCurrentThread();
1213  if (!worker)
1214    return SequenceToken();
1215
1216  return worker->task_sequence_token();
1217}
1218
1219// static
1220scoped_refptr<SequencedWorkerPool>
1221SequencedWorkerPool::GetWorkerPoolForCurrentThread() {
1222  Worker* worker = Worker::GetForCurrentThread();
1223  if (!worker)
1224    return nullptr;
1225
1226  return worker->worker_pool();
1227}
1228
1229// static
1230scoped_refptr<SequencedTaskRunner>
1231SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread() {
1232  Worker* worker = Worker::GetForCurrentThread();
1233
1234  // If there is no worker, this thread is not a worker thread. Otherwise, it is
1235  // currently running a task (sequenced or unsequenced).
1236  if (!worker)
1237    return nullptr;
1238
1239  scoped_refptr<SequencedWorkerPool> pool = worker->worker_pool();
1240  SequenceToken sequence_token = worker->task_sequence_token();
1241  WorkerShutdown shutdown_behavior = worker->task_shutdown_behavior();
1242  if (!sequence_token.IsValid()) {
1243    // Create a new sequence token and bind this thread to it, to make sure that
1244    // a task posted to the SequencedTaskRunner we are going to return is not
1245    // immediately going to run on a different thread.
1246    sequence_token = Inner::GetSequenceToken();
1247    pool->inner_->SetRunningTaskInfoForCurrentThread(sequence_token,
1248                                                     shutdown_behavior);
1249  }
1250
1251  DCHECK(pool->IsRunningSequenceOnCurrentThread(sequence_token));
1252  return new SequencedWorkerPoolSequencedTaskRunner(
1253      std::move(pool), sequence_token, shutdown_behavior);
1254}
1255
1256SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1257                                         const std::string& thread_name_prefix)
1258    : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1259      inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) {
1260}
1261
1262SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1263                                         const std::string& thread_name_prefix,
1264                                         TestingObserver* observer)
1265    : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1266      inner_(new Inner(this, max_threads, thread_name_prefix, observer)) {
1267}
1268
1269SequencedWorkerPool::~SequencedWorkerPool() {}
1270
1271void SequencedWorkerPool::OnDestruct() const {
1272  // Avoid deleting ourselves on a worker thread (which would deadlock).
1273  if (RunsTasksOnCurrentThread()) {
1274    constructor_task_runner_->DeleteSoon(FROM_HERE, this);
1275  } else {
1276    delete this;
1277  }
1278}
1279
1280// static
1281SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
1282  return Inner::GetSequenceToken();
1283}
1284
1285SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
1286    const std::string& name) {
1287  return inner_->GetNamedSequenceToken(name);
1288}
1289
1290scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner(
1291    SequenceToken token) {
1292  return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN);
1293}
1294
1295scoped_refptr<SequencedTaskRunner>
1296SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior(
1297    SequenceToken token, WorkerShutdown shutdown_behavior) {
1298  return new SequencedWorkerPoolSequencedTaskRunner(
1299      this, token, shutdown_behavior);
1300}
1301
1302scoped_refptr<TaskRunner>
1303SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior(
1304    WorkerShutdown shutdown_behavior) {
1305  return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior);
1306}
1307
1308bool SequencedWorkerPool::PostWorkerTask(
1309    const tracked_objects::Location& from_here,
1310    const Closure& task) {
1311  return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN,
1312                          from_here, task, TimeDelta());
1313}
1314
1315bool SequencedWorkerPool::PostDelayedWorkerTask(
1316    const tracked_objects::Location& from_here,
1317    const Closure& task,
1318    TimeDelta delay) {
1319  WorkerShutdown shutdown_behavior =
1320      delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
1321  return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
1322                          from_here, task, delay);
1323}
1324
1325bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
1326    const tracked_objects::Location& from_here,
1327    const Closure& task,
1328    WorkerShutdown shutdown_behavior) {
1329  return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
1330                          from_here, task, TimeDelta());
1331}
1332
1333bool SequencedWorkerPool::PostSequencedWorkerTask(
1334    SequenceToken sequence_token,
1335    const tracked_objects::Location& from_here,
1336    const Closure& task) {
1337  return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN,
1338                          from_here, task, TimeDelta());
1339}
1340
1341bool SequencedWorkerPool::PostDelayedSequencedWorkerTask(
1342    SequenceToken sequence_token,
1343    const tracked_objects::Location& from_here,
1344    const Closure& task,
1345    TimeDelta delay) {
1346  WorkerShutdown shutdown_behavior =
1347      delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
1348  return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
1349                          from_here, task, delay);
1350}
1351
1352bool SequencedWorkerPool::PostNamedSequencedWorkerTask(
1353    const std::string& token_name,
1354    const tracked_objects::Location& from_here,
1355    const Closure& task) {
1356  DCHECK(!token_name.empty());
1357  return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN,
1358                          from_here, task, TimeDelta());
1359}
1360
1361bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior(
1362    SequenceToken sequence_token,
1363    const tracked_objects::Location& from_here,
1364    const Closure& task,
1365    WorkerShutdown shutdown_behavior) {
1366  return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
1367                          from_here, task, TimeDelta());
1368}
1369
1370bool SequencedWorkerPool::PostDelayedTask(
1371    const tracked_objects::Location& from_here,
1372    const Closure& task,
1373    TimeDelta delay) {
1374  return PostDelayedWorkerTask(from_here, task, delay);
1375}
1376
1377bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
1378  return inner_->RunsTasksOnCurrentThread();
1379}
1380
1381bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
1382    SequenceToken sequence_token) const {
1383  return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
1384}
1385
1386bool SequencedWorkerPool::IsRunningSequence(
1387    SequenceToken sequence_token) const {
1388  return inner_->IsRunningSequence(sequence_token);
1389}
1390
1391void SequencedWorkerPool::FlushForTesting() {
1392  inner_->CleanupForTesting();
1393}
1394
1395void SequencedWorkerPool::SignalHasWorkForTesting() {
1396  inner_->SignalHasWorkForTesting();
1397}
1398
1399void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1400  DCHECK(constructor_task_runner_->BelongsToCurrentThread());
1401  inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1402}
1403
1404bool SequencedWorkerPool::IsShutdownInProgress() {
1405  return inner_->IsShutdownInProgress();
1406}
1407
1408}  // namespace base
1409