1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "base/threading/sequenced_worker_pool.h"
6
7#include <list>
8#include <map>
9#include <set>
10#include <utility>
11#include <vector>
12
13#include "base/atomic_sequence_num.h"
14#include "base/callback.h"
15#include "base/compiler_specific.h"
16#include "base/critical_closure.h"
17#include "base/debug/trace_event.h"
18#include "base/lazy_instance.h"
19#include "base/logging.h"
20#include "base/memory/linked_ptr.h"
21#include "base/message_loop/message_loop_proxy.h"
22#include "base/stl_util.h"
23#include "base/strings/stringprintf.h"
24#include "base/synchronization/condition_variable.h"
25#include "base/synchronization/lock.h"
26#include "base/threading/platform_thread.h"
27#include "base/threading/simple_thread.h"
28#include "base/threading/thread_local.h"
29#include "base/threading/thread_restrictions.h"
30#include "base/time/time.h"
31#include "base/tracked_objects.h"
32
33#if defined(OS_MACOSX)
34#include "base/mac/scoped_nsautorelease_pool.h"
35#elif defined(OS_WIN)
36#include "base/win/scoped_com_initializer.h"
37#endif
38
39#if !defined(OS_NACL)
40#include "base/metrics/histogram.h"
41#endif
42
43namespace base {
44
45namespace {
46
47struct SequencedTask : public TrackingInfo  {
48  SequencedTask()
49      : sequence_token_id(0),
50        trace_id(0),
51        sequence_task_number(0),
52        shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
53
54  explicit SequencedTask(const tracked_objects::Location& from_here)
55      : base::TrackingInfo(from_here, TimeTicks()),
56        sequence_token_id(0),
57        trace_id(0),
58        sequence_task_number(0),
59        shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
60
61  ~SequencedTask() {}
62
63  int sequence_token_id;
64  int trace_id;
65  int64 sequence_task_number;
66  SequencedWorkerPool::WorkerShutdown shutdown_behavior;
67  tracked_objects::Location posted_from;
68  Closure task;
69
70  // Non-delayed tasks and delayed tasks are managed together by time-to-run
71  // order. We calculate the time by adding the posted time and the given delay.
72  TimeTicks time_to_run;
73};
74
75struct SequencedTaskLessThan {
76 public:
77  bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const {
78    if (lhs.time_to_run < rhs.time_to_run)
79      return true;
80
81    if (lhs.time_to_run > rhs.time_to_run)
82      return false;
83
84    // If the time happen to match, then we use the sequence number to decide.
85    return lhs.sequence_task_number < rhs.sequence_task_number;
86  }
87};
88
89// SequencedWorkerPoolTaskRunner ---------------------------------------------
90// A TaskRunner which posts tasks to a SequencedWorkerPool with a
91// fixed ShutdownBehavior.
92//
93// Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
94class SequencedWorkerPoolTaskRunner : public TaskRunner {
95 public:
96  SequencedWorkerPoolTaskRunner(
97      const scoped_refptr<SequencedWorkerPool>& pool,
98      SequencedWorkerPool::WorkerShutdown shutdown_behavior);
99
100  // TaskRunner implementation
101  virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
102                               const Closure& task,
103                               TimeDelta delay) OVERRIDE;
104  virtual bool RunsTasksOnCurrentThread() const OVERRIDE;
105
106 private:
107  virtual ~SequencedWorkerPoolTaskRunner();
108
109  const scoped_refptr<SequencedWorkerPool> pool_;
110
111  const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
112
113  DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner);
114};
115
116SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner(
117    const scoped_refptr<SequencedWorkerPool>& pool,
118    SequencedWorkerPool::WorkerShutdown shutdown_behavior)
119    : pool_(pool),
120      shutdown_behavior_(shutdown_behavior) {
121}
122
123SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() {
124}
125
126bool SequencedWorkerPoolTaskRunner::PostDelayedTask(
127    const tracked_objects::Location& from_here,
128    const Closure& task,
129    TimeDelta delay) {
130  if (delay == TimeDelta()) {
131    return pool_->PostWorkerTaskWithShutdownBehavior(
132        from_here, task, shutdown_behavior_);
133  }
134  return pool_->PostDelayedWorkerTask(from_here, task, delay);
135}
136
137bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const {
138  return pool_->RunsTasksOnCurrentThread();
139}
140
141// SequencedWorkerPoolSequencedTaskRunner ------------------------------------
142// A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a
143// fixed sequence token.
144//
145// Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
146class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner {
147 public:
148  SequencedWorkerPoolSequencedTaskRunner(
149      const scoped_refptr<SequencedWorkerPool>& pool,
150      SequencedWorkerPool::SequenceToken token,
151      SequencedWorkerPool::WorkerShutdown shutdown_behavior);
152
153  // TaskRunner implementation
154  virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
155                               const Closure& task,
156                               TimeDelta delay) OVERRIDE;
157  virtual bool RunsTasksOnCurrentThread() const OVERRIDE;
158
159  // SequencedTaskRunner implementation
160  virtual bool PostNonNestableDelayedTask(
161      const tracked_objects::Location& from_here,
162      const Closure& task,
163      TimeDelta delay) OVERRIDE;
164
165 private:
166  virtual ~SequencedWorkerPoolSequencedTaskRunner();
167
168  const scoped_refptr<SequencedWorkerPool> pool_;
169
170  const SequencedWorkerPool::SequenceToken token_;
171
172  const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
173
174  DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner);
175};
176
177SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner(
178    const scoped_refptr<SequencedWorkerPool>& pool,
179    SequencedWorkerPool::SequenceToken token,
180    SequencedWorkerPool::WorkerShutdown shutdown_behavior)
181    : pool_(pool),
182      token_(token),
183      shutdown_behavior_(shutdown_behavior) {
184}
185
186SequencedWorkerPoolSequencedTaskRunner::
187~SequencedWorkerPoolSequencedTaskRunner() {
188}
189
190bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask(
191    const tracked_objects::Location& from_here,
192    const Closure& task,
193    TimeDelta delay) {
194  if (delay == TimeDelta()) {
195    return pool_->PostSequencedWorkerTaskWithShutdownBehavior(
196        token_, from_here, task, shutdown_behavior_);
197  }
198  return pool_->PostDelayedSequencedWorkerTask(token_, from_here, task, delay);
199}
200
201bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const {
202  return pool_->IsRunningSequenceOnCurrentThread(token_);
203}
204
205bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask(
206    const tracked_objects::Location& from_here,
207    const Closure& task,
208    TimeDelta delay) {
209  // There's no way to run nested tasks, so simply forward to
210  // PostDelayedTask.
211  return PostDelayedTask(from_here, task, delay);
212}
213
214// Create a process-wide unique ID to represent this task in trace events. This
215// will be mangled with a Process ID hash to reduce the likelyhood of colliding
216// with MessageLoop pointers on other processes.
217uint64 GetTaskTraceID(const SequencedTask& task,
218                      void* pool) {
219  return (static_cast<uint64>(task.trace_id) << 32) |
220         static_cast<uint64>(reinterpret_cast<intptr_t>(pool));
221}
222
223base::LazyInstance<base::ThreadLocalPointer<
224    SequencedWorkerPool::SequenceToken> >::Leaky g_lazy_tls_ptr =
225        LAZY_INSTANCE_INITIALIZER;
226
227}  // namespace
228
229// Worker ---------------------------------------------------------------------
230
231class SequencedWorkerPool::Worker : public SimpleThread {
232 public:
233  // Hold a (cyclic) ref to |worker_pool|, since we want to keep it
234  // around as long as we are running.
235  Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool,
236         int thread_number,
237         const std::string& thread_name_prefix);
238  virtual ~Worker();
239
240  // SimpleThread implementation. This actually runs the background thread.
241  virtual void Run() OVERRIDE;
242
243  void set_running_task_info(SequenceToken token,
244                             WorkerShutdown shutdown_behavior) {
245    running_sequence_ = token;
246    running_shutdown_behavior_ = shutdown_behavior;
247  }
248
249  SequenceToken running_sequence() const {
250    return running_sequence_;
251  }
252
253  WorkerShutdown running_shutdown_behavior() const {
254    return running_shutdown_behavior_;
255  }
256
257 private:
258  scoped_refptr<SequencedWorkerPool> worker_pool_;
259  SequenceToken running_sequence_;
260  WorkerShutdown running_shutdown_behavior_;
261
262  DISALLOW_COPY_AND_ASSIGN(Worker);
263};
264
265// Inner ----------------------------------------------------------------------
266
267class SequencedWorkerPool::Inner {
268 public:
269  // Take a raw pointer to |worker| to avoid cycles (since we're owned
270  // by it).
271  Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
272        const std::string& thread_name_prefix,
273        TestingObserver* observer);
274
275  ~Inner();
276
277  SequenceToken GetSequenceToken();
278
279  SequenceToken GetNamedSequenceToken(const std::string& name);
280
281  // This function accepts a name and an ID. If the name is null, the
282  // token ID is used. This allows us to implement the optional name lookup
283  // from a single function without having to enter the lock a separate time.
284  bool PostTask(const std::string* optional_token_name,
285                SequenceToken sequence_token,
286                WorkerShutdown shutdown_behavior,
287                const tracked_objects::Location& from_here,
288                const Closure& task,
289                TimeDelta delay);
290
291  bool RunsTasksOnCurrentThread() const;
292
293  bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
294
295  void CleanupForTesting();
296
297  void SignalHasWorkForTesting();
298
299  int GetWorkSignalCountForTesting() const;
300
301  void Shutdown(int max_blocking_tasks_after_shutdown);
302
303  bool IsShutdownInProgress();
304
305  // Runs the worker loop on the background thread.
306  void ThreadLoop(Worker* this_worker);
307
308 private:
309  enum GetWorkStatus {
310    GET_WORK_FOUND,
311    GET_WORK_NOT_FOUND,
312    GET_WORK_WAIT,
313  };
314
315  enum CleanupState {
316    CLEANUP_REQUESTED,
317    CLEANUP_STARTING,
318    CLEANUP_RUNNING,
319    CLEANUP_FINISHING,
320    CLEANUP_DONE,
321  };
322
323  // Called from within the lock, this converts the given token name into a
324  // token ID, creating a new one if necessary.
325  int LockedGetNamedTokenID(const std::string& name);
326
327  // Called from within the lock, this returns the next sequence task number.
328  int64 LockedGetNextSequenceTaskNumber();
329
330  // Called from within the lock, returns the shutdown behavior of the task
331  // running on the currently executing worker thread. If invoked from a thread
332  // that is not one of the workers, returns CONTINUE_ON_SHUTDOWN.
333  WorkerShutdown LockedCurrentThreadShutdownBehavior() const;
334
335  // Gets new task. There are 3 cases depending on the return value:
336  //
337  // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should
338  //    be run immediately.
339  // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run,
340  //    and |task| is not filled in. In this case, the caller should wait until
341  //    a task is posted.
342  // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run
343  //    immediately, and |task| is not filled in. Likewise, |wait_time| is
344  //    filled in the time to wait until the next task to run. In this case, the
345  //    caller should wait the time.
346  //
347  // In any case, the calling code should clear the given
348  // delete_these_outside_lock vector the next time the lock is released.
349  // See the implementation for a more detailed description.
350  GetWorkStatus GetWork(SequencedTask* task,
351                        TimeDelta* wait_time,
352                        std::vector<Closure>* delete_these_outside_lock);
353
354  void HandleCleanup();
355
356  // Peforms init and cleanup around running the given task. WillRun...
357  // returns the value from PrepareToStartAdditionalThreadIfNecessary.
358  // The calling code should call FinishStartingAdditionalThread once the
359  // lock is released if the return values is nonzero.
360  int WillRunWorkerTask(const SequencedTask& task);
361  void DidRunWorkerTask(const SequencedTask& task);
362
363  // Returns true if there are no threads currently running the given
364  // sequence token.
365  bool IsSequenceTokenRunnable(int sequence_token_id) const;
366
367  // Checks if all threads are busy and the addition of one more could run an
368  // additional task waiting in the queue. This must be called from within
369  // the lock.
370  //
371  // If another thread is helpful, this will mark the thread as being in the
372  // process of starting and returns the index of the new thread which will be
373  // 0 or more. The caller should then call FinishStartingAdditionalThread to
374  // complete initialization once the lock is released.
375  //
376  // If another thread is not necessary, returne 0;
377  //
378  // See the implementedion for more.
379  int PrepareToStartAdditionalThreadIfHelpful();
380
381  // The second part of thread creation after
382  // PrepareToStartAdditionalThreadIfHelpful with the thread number it
383  // generated. This actually creates the thread and should be called outside
384  // the lock to avoid blocking important work starting a thread in the lock.
385  void FinishStartingAdditionalThread(int thread_number);
386
387  // Signal |has_work_| and increment |has_work_signal_count_|.
388  void SignalHasWork();
389
390  // Checks whether there is work left that's blocking shutdown. Must be
391  // called inside the lock.
392  bool CanShutdown() const;
393
394  SequencedWorkerPool* const worker_pool_;
395
396  // The last sequence number used. Managed by GetSequenceToken, since this
397  // only does threadsafe increment operations, you do not need to hold the
398  // lock. This is class-static to make SequenceTokens issued by
399  // GetSequenceToken unique across SequencedWorkerPool instances.
400  static base::StaticAtomicSequenceNumber g_last_sequence_number_;
401
402  // This lock protects |everything in this class|. Do not read or modify
403  // anything without holding this lock. Do not block while holding this
404  // lock.
405  mutable Lock lock_;
406
407  // Condition variable that is waited on by worker threads until new
408  // tasks are posted or shutdown starts.
409  ConditionVariable has_work_cv_;
410
411  // Condition variable that is waited on by non-worker threads (in
412  // Shutdown()) until CanShutdown() goes to true.
413  ConditionVariable can_shutdown_cv_;
414
415  // The maximum number of worker threads we'll create.
416  const size_t max_threads_;
417
418  const std::string thread_name_prefix_;
419
420  // Associates all known sequence token names with their IDs.
421  std::map<std::string, int> named_sequence_tokens_;
422
423  // Owning pointers to all threads we've created so far, indexed by
424  // ID. Since we lazily create threads, this may be less than
425  // max_threads_ and will be initially empty.
426  typedef std::map<PlatformThreadId, linked_ptr<Worker> > ThreadMap;
427  ThreadMap threads_;
428
429  // Set to true when we're in the process of creating another thread.
430  // See PrepareToStartAdditionalThreadIfHelpful for more.
431  bool thread_being_created_;
432
433  // Number of threads currently waiting for work.
434  size_t waiting_thread_count_;
435
436  // Number of threads currently running tasks that have the BLOCK_SHUTDOWN
437  // or SKIP_ON_SHUTDOWN flag set.
438  size_t blocking_shutdown_thread_count_;
439
440  // A set of all pending tasks in time-to-run order. These are tasks that are
441  // either waiting for a thread to run on, waiting for their time to run,
442  // or blocked on a previous task in their sequence. We have to iterate over
443  // the tasks by time-to-run order, so we use the set instead of the
444  // traditional priority_queue.
445  typedef std::set<SequencedTask, SequencedTaskLessThan> PendingTaskSet;
446  PendingTaskSet pending_tasks_;
447
448  // The next sequence number for a new sequenced task.
449  int64 next_sequence_task_number_;
450
451  // Number of tasks in the pending_tasks_ list that are marked as blocking
452  // shutdown.
453  size_t blocking_shutdown_pending_task_count_;
454
455  // Lists all sequence tokens currently executing.
456  std::set<int> current_sequences_;
457
458  // An ID for each posted task to distinguish the task from others in traces.
459  int trace_id_;
460
461  // Set when Shutdown is called and no further tasks should be
462  // allowed, though we may still be running existing tasks.
463  bool shutdown_called_;
464
465  // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown()
466  // has been called.
467  int max_blocking_tasks_after_shutdown_;
468
469  // State used to cleanup for testing, all guarded by lock_.
470  CleanupState cleanup_state_;
471  size_t cleanup_idlers_;
472  ConditionVariable cleanup_cv_;
473
474  TestingObserver* const testing_observer_;
475
476  DISALLOW_COPY_AND_ASSIGN(Inner);
477};
478
479// Worker definitions ---------------------------------------------------------
480
481SequencedWorkerPool::Worker::Worker(
482    const scoped_refptr<SequencedWorkerPool>& worker_pool,
483    int thread_number,
484    const std::string& prefix)
485    : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)),
486      worker_pool_(worker_pool),
487      running_shutdown_behavior_(CONTINUE_ON_SHUTDOWN) {
488  Start();
489}
490
491SequencedWorkerPool::Worker::~Worker() {
492}
493
494void SequencedWorkerPool::Worker::Run() {
495#if defined(OS_WIN)
496  win::ScopedCOMInitializer com_initializer;
497#endif
498
499  // Store a pointer to the running sequence in thread local storage for
500  // static function access.
501  g_lazy_tls_ptr.Get().Set(&running_sequence_);
502
503  // Just jump back to the Inner object to run the thread, since it has all the
504  // tracking information and queues. It might be more natural to implement
505  // using DelegateSimpleThread and have Inner implement the Delegate to avoid
506  // having these worker objects at all, but that method lacks the ability to
507  // send thread-specific information easily to the thread loop.
508  worker_pool_->inner_->ThreadLoop(this);
509  // Release our cyclic reference once we're done.
510  worker_pool_ = NULL;
511}
512
513// Inner definitions ---------------------------------------------------------
514
515SequencedWorkerPool::Inner::Inner(
516    SequencedWorkerPool* worker_pool,
517    size_t max_threads,
518    const std::string& thread_name_prefix,
519    TestingObserver* observer)
520    : worker_pool_(worker_pool),
521      lock_(),
522      has_work_cv_(&lock_),
523      can_shutdown_cv_(&lock_),
524      max_threads_(max_threads),
525      thread_name_prefix_(thread_name_prefix),
526      thread_being_created_(false),
527      waiting_thread_count_(0),
528      blocking_shutdown_thread_count_(0),
529      next_sequence_task_number_(0),
530      blocking_shutdown_pending_task_count_(0),
531      trace_id_(0),
532      shutdown_called_(false),
533      max_blocking_tasks_after_shutdown_(0),
534      cleanup_state_(CLEANUP_DONE),
535      cleanup_idlers_(0),
536      cleanup_cv_(&lock_),
537      testing_observer_(observer) {}
538
539SequencedWorkerPool::Inner::~Inner() {
540  // You must call Shutdown() before destroying the pool.
541  DCHECK(shutdown_called_);
542
543  // Need to explicitly join with the threads before they're destroyed or else
544  // they will be running when our object is half torn down.
545  for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it)
546    it->second->Join();
547  threads_.clear();
548
549  if (testing_observer_)
550    testing_observer_->OnDestruct();
551}
552
553SequencedWorkerPool::SequenceToken
554SequencedWorkerPool::Inner::GetSequenceToken() {
555  // Need to add one because StaticAtomicSequenceNumber starts at zero, which
556  // is used as a sentinel value in SequenceTokens.
557  return SequenceToken(g_last_sequence_number_.GetNext() + 1);
558}
559
560SequencedWorkerPool::SequenceToken
561SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) {
562  AutoLock lock(lock_);
563  return SequenceToken(LockedGetNamedTokenID(name));
564}
565
566bool SequencedWorkerPool::Inner::PostTask(
567    const std::string* optional_token_name,
568    SequenceToken sequence_token,
569    WorkerShutdown shutdown_behavior,
570    const tracked_objects::Location& from_here,
571    const Closure& task,
572    TimeDelta delay) {
573  DCHECK(delay == TimeDelta() || shutdown_behavior == SKIP_ON_SHUTDOWN);
574  SequencedTask sequenced(from_here);
575  sequenced.sequence_token_id = sequence_token.id_;
576  sequenced.shutdown_behavior = shutdown_behavior;
577  sequenced.posted_from = from_here;
578  sequenced.task =
579      shutdown_behavior == BLOCK_SHUTDOWN ?
580      base::MakeCriticalClosure(task) : task;
581  sequenced.time_to_run = TimeTicks::Now() + delay;
582
583  int create_thread_id = 0;
584  {
585    AutoLock lock(lock_);
586    if (shutdown_called_) {
587      if (shutdown_behavior != BLOCK_SHUTDOWN ||
588          LockedCurrentThreadShutdownBehavior() == CONTINUE_ON_SHUTDOWN) {
589        return false;
590      }
591      if (max_blocking_tasks_after_shutdown_ <= 0) {
592        DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed";
593        return false;
594      }
595      max_blocking_tasks_after_shutdown_ -= 1;
596    }
597
598    // The trace_id is used for identifying the task in about:tracing.
599    sequenced.trace_id = trace_id_++;
600
601    TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
602        "SequencedWorkerPool::PostTask",
603        TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))));
604
605    sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber();
606
607    // Now that we have the lock, apply the named token rules.
608    if (optional_token_name)
609      sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
610
611    pending_tasks_.insert(sequenced);
612    if (shutdown_behavior == BLOCK_SHUTDOWN)
613      blocking_shutdown_pending_task_count_++;
614
615    create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
616  }
617
618  // Actually start the additional thread or signal an existing one now that
619  // we're outside the lock.
620  if (create_thread_id)
621    FinishStartingAdditionalThread(create_thread_id);
622  else
623    SignalHasWork();
624
625  return true;
626}
627
628bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
629  AutoLock lock(lock_);
630  return ContainsKey(threads_, PlatformThread::CurrentId());
631}
632
633bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
634    SequenceToken sequence_token) const {
635  AutoLock lock(lock_);
636  ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
637  if (found == threads_.end())
638    return false;
639  return sequence_token.Equals(found->second->running_sequence());
640}
641
642// See https://code.google.com/p/chromium/issues/detail?id=168415
643void SequencedWorkerPool::Inner::CleanupForTesting() {
644  DCHECK(!RunsTasksOnCurrentThread());
645  base::ThreadRestrictions::ScopedAllowWait allow_wait;
646  AutoLock lock(lock_);
647  CHECK_EQ(CLEANUP_DONE, cleanup_state_);
648  if (shutdown_called_)
649    return;
650  if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size())
651    return;
652  cleanup_state_ = CLEANUP_REQUESTED;
653  cleanup_idlers_ = 0;
654  has_work_cv_.Signal();
655  while (cleanup_state_ != CLEANUP_DONE)
656    cleanup_cv_.Wait();
657}
658
659void SequencedWorkerPool::Inner::SignalHasWorkForTesting() {
660  SignalHasWork();
661}
662
663void SequencedWorkerPool::Inner::Shutdown(
664    int max_new_blocking_tasks_after_shutdown) {
665  DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0);
666  {
667    AutoLock lock(lock_);
668    // Cleanup and Shutdown should not be called concurrently.
669    CHECK_EQ(CLEANUP_DONE, cleanup_state_);
670    if (shutdown_called_)
671      return;
672    shutdown_called_ = true;
673    max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
674
675    // Tickle the threads. This will wake up a waiting one so it will know that
676    // it can exit, which in turn will wake up any other waiting ones.
677    SignalHasWork();
678
679    // There are no pending or running tasks blocking shutdown, we're done.
680    if (CanShutdown())
681      return;
682  }
683
684  // If we're here, then something is blocking shutdown.  So wait for
685  // CanShutdown() to go to true.
686
687  if (testing_observer_)
688    testing_observer_->WillWaitForShutdown();
689
690#if !defined(OS_NACL)
691  TimeTicks shutdown_wait_begin = TimeTicks::Now();
692#endif
693
694  {
695    base::ThreadRestrictions::ScopedAllowWait allow_wait;
696    AutoLock lock(lock_);
697    while (!CanShutdown())
698      can_shutdown_cv_.Wait();
699  }
700#if !defined(OS_NACL)
701  UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime",
702                      TimeTicks::Now() - shutdown_wait_begin);
703#endif
704}
705
706bool SequencedWorkerPool::Inner::IsShutdownInProgress() {
707    AutoLock lock(lock_);
708    return shutdown_called_;
709}
710
711void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
712  {
713    AutoLock lock(lock_);
714    DCHECK(thread_being_created_);
715    thread_being_created_ = false;
716    std::pair<ThreadMap::iterator, bool> result =
717        threads_.insert(
718            std::make_pair(this_worker->tid(), make_linked_ptr(this_worker)));
719    DCHECK(result.second);
720
721    while (true) {
722#if defined(OS_MACOSX)
723      base::mac::ScopedNSAutoreleasePool autorelease_pool;
724#endif
725
726      HandleCleanup();
727
728      // See GetWork for what delete_these_outside_lock is doing.
729      SequencedTask task;
730      TimeDelta wait_time;
731      std::vector<Closure> delete_these_outside_lock;
732      GetWorkStatus status =
733          GetWork(&task, &wait_time, &delete_these_outside_lock);
734      if (status == GET_WORK_FOUND) {
735        TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
736            "SequencedWorkerPool::PostTask",
737            TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))));
738        TRACE_EVENT2("toplevel", "SequencedWorkerPool::ThreadLoop",
739                     "src_file", task.posted_from.file_name(),
740                     "src_func", task.posted_from.function_name());
741        int new_thread_id = WillRunWorkerTask(task);
742        {
743          AutoUnlock unlock(lock_);
744          // There may be more work available, so wake up another
745          // worker thread. (Technically not required, since we
746          // already get a signal for each new task, but it doesn't
747          // hurt.)
748          SignalHasWork();
749          delete_these_outside_lock.clear();
750
751          // Complete thread creation outside the lock if necessary.
752          if (new_thread_id)
753            FinishStartingAdditionalThread(new_thread_id);
754
755          this_worker->set_running_task_info(
756              SequenceToken(task.sequence_token_id), task.shutdown_behavior);
757
758          tracked_objects::ThreadData::PrepareForStartOfRun(task.birth_tally);
759          tracked_objects::TaskStopwatch stopwatch;
760          task.task.Run();
761          stopwatch.Stop();
762
763          tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(
764              task, stopwatch);
765
766          // Make sure our task is erased outside the lock for the
767          // same reason we do this with delete_these_oustide_lock.
768          // Also, do it before calling set_running_task_info() so
769          // that sequence-checking from within the task's destructor
770          // still works.
771          task.task = Closure();
772
773          this_worker->set_running_task_info(
774              SequenceToken(), CONTINUE_ON_SHUTDOWN);
775        }
776        DidRunWorkerTask(task);  // Must be done inside the lock.
777      } else if (cleanup_state_ == CLEANUP_RUNNING) {
778        switch (status) {
779          case GET_WORK_WAIT: {
780              AutoUnlock unlock(lock_);
781              delete_these_outside_lock.clear();
782            }
783            break;
784          case GET_WORK_NOT_FOUND:
785            CHECK(delete_these_outside_lock.empty());
786            cleanup_state_ = CLEANUP_FINISHING;
787            cleanup_cv_.Broadcast();
788            break;
789          default:
790            NOTREACHED();
791        }
792      } else {
793        // When we're terminating and there's no more work, we can
794        // shut down, other workers can complete any pending or new tasks.
795        // We can get additional tasks posted after shutdown_called_ is set
796        // but only worker threads are allowed to post tasks at that time, and
797        // the workers responsible for posting those tasks will be available
798        // to run them. Also, there may be some tasks stuck behind running
799        // ones with the same sequence token, but additional threads won't
800        // help this case.
801        if (shutdown_called_ &&
802            blocking_shutdown_pending_task_count_ == 0)
803          break;
804        waiting_thread_count_++;
805
806        switch (status) {
807          case GET_WORK_NOT_FOUND:
808            has_work_cv_.Wait();
809            break;
810          case GET_WORK_WAIT:
811            has_work_cv_.TimedWait(wait_time);
812            break;
813          default:
814            NOTREACHED();
815        }
816        waiting_thread_count_--;
817      }
818    }
819  }  // Release lock_.
820
821  // We noticed we should exit. Wake up the next worker so it knows it should
822  // exit as well (because the Shutdown() code only signals once).
823  SignalHasWork();
824
825  // Possibly unblock shutdown.
826  can_shutdown_cv_.Signal();
827}
828
829void SequencedWorkerPool::Inner::HandleCleanup() {
830  lock_.AssertAcquired();
831  if (cleanup_state_ == CLEANUP_DONE)
832    return;
833  if (cleanup_state_ == CLEANUP_REQUESTED) {
834    // We win, we get to do the cleanup as soon as the others wise up and idle.
835    cleanup_state_ = CLEANUP_STARTING;
836    while (thread_being_created_ ||
837           cleanup_idlers_ != threads_.size() - 1) {
838      has_work_cv_.Signal();
839      cleanup_cv_.Wait();
840    }
841    cleanup_state_ = CLEANUP_RUNNING;
842    return;
843  }
844  if (cleanup_state_ == CLEANUP_STARTING) {
845    // Another worker thread is cleaning up, we idle here until thats done.
846    ++cleanup_idlers_;
847    cleanup_cv_.Broadcast();
848    while (cleanup_state_ != CLEANUP_FINISHING) {
849      cleanup_cv_.Wait();
850    }
851    --cleanup_idlers_;
852    cleanup_cv_.Broadcast();
853    return;
854  }
855  if (cleanup_state_ == CLEANUP_FINISHING) {
856    // We wait for all idlers to wake up prior to being DONE.
857    while (cleanup_idlers_ != 0) {
858      cleanup_cv_.Broadcast();
859      cleanup_cv_.Wait();
860    }
861    if (cleanup_state_ == CLEANUP_FINISHING) {
862      cleanup_state_ = CLEANUP_DONE;
863      cleanup_cv_.Signal();
864    }
865    return;
866  }
867}
868
869int SequencedWorkerPool::Inner::LockedGetNamedTokenID(
870    const std::string& name) {
871  lock_.AssertAcquired();
872  DCHECK(!name.empty());
873
874  std::map<std::string, int>::const_iterator found =
875      named_sequence_tokens_.find(name);
876  if (found != named_sequence_tokens_.end())
877    return found->second;  // Got an existing one.
878
879  // Create a new one for this name.
880  SequenceToken result = GetSequenceToken();
881  named_sequence_tokens_.insert(std::make_pair(name, result.id_));
882  return result.id_;
883}
884
885int64 SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() {
886  lock_.AssertAcquired();
887  // We assume that we never create enough tasks to wrap around.
888  return next_sequence_task_number_++;
889}
890
891SequencedWorkerPool::WorkerShutdown
892SequencedWorkerPool::Inner::LockedCurrentThreadShutdownBehavior() const {
893  lock_.AssertAcquired();
894  ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
895  if (found == threads_.end())
896    return CONTINUE_ON_SHUTDOWN;
897  return found->second->running_shutdown_behavior();
898}
899
900SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
901    SequencedTask* task,
902    TimeDelta* wait_time,
903    std::vector<Closure>* delete_these_outside_lock) {
904  lock_.AssertAcquired();
905
906#if !defined(OS_NACL)
907  UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount",
908                           static_cast<int>(pending_tasks_.size()));
909#endif
910
911  // Find the next task with a sequence token that's not currently in use.
912  // If the token is in use, that means another thread is running something
913  // in that sequence, and we can't run it without going out-of-order.
914  //
915  // This algorithm is simple and fair, but inefficient in some cases. For
916  // example, say somebody schedules 1000 slow tasks with the same sequence
917  // number. We'll have to go through all those tasks each time we feel like
918  // there might be work to schedule. If this proves to be a problem, we
919  // should make this more efficient.
920  //
921  // One possible enhancement would be to keep a map from sequence ID to a
922  // list of pending but currently blocked SequencedTasks for that ID.
923  // When a worker finishes a task of one sequence token, it can pick up the
924  // next one from that token right away.
925  //
926  // This may lead to starvation if there are sufficient numbers of sequences
927  // in use. To alleviate this, we could add an incrementing priority counter
928  // to each SequencedTask. Then maintain a priority_queue of all runnable
929  // tasks, sorted by priority counter. When a sequenced task is completed
930  // we would pop the head element off of that tasks pending list and add it
931  // to the priority queue. Then we would run the first item in the priority
932  // queue.
933
934  GetWorkStatus status = GET_WORK_NOT_FOUND;
935  int unrunnable_tasks = 0;
936  PendingTaskSet::iterator i = pending_tasks_.begin();
937  // We assume that the loop below doesn't take too long and so we can just do
938  // a single call to TimeTicks::Now().
939  const TimeTicks current_time = TimeTicks::Now();
940  while (i != pending_tasks_.end()) {
941    if (!IsSequenceTokenRunnable(i->sequence_token_id)) {
942      unrunnable_tasks++;
943      ++i;
944      continue;
945    }
946
947    if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) {
948      // We're shutting down and the task we just found isn't blocking
949      // shutdown. Delete it and get more work.
950      //
951      // Note that we do not want to delete unrunnable tasks. Deleting a task
952      // can have side effects (like freeing some objects) and deleting a
953      // task that's supposed to run after one that's currently running could
954      // cause an obscure crash.
955      //
956      // We really want to delete these tasks outside the lock in case the
957      // closures are holding refs to objects that want to post work from
958      // their destructorss (which would deadlock). The closures are
959      // internally refcounted, so we just need to keep a copy of them alive
960      // until the lock is exited. The calling code can just clear() the
961      // vector they passed to us once the lock is exited to make this
962      // happen.
963      delete_these_outside_lock->push_back(i->task);
964      pending_tasks_.erase(i++);
965      continue;
966    }
967
968    if (i->time_to_run > current_time) {
969      // The time to run has not come yet.
970      *wait_time = i->time_to_run - current_time;
971      status = GET_WORK_WAIT;
972      if (cleanup_state_ == CLEANUP_RUNNING) {
973        // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop.
974        delete_these_outside_lock->push_back(i->task);
975        pending_tasks_.erase(i);
976      }
977      break;
978    }
979
980    // Found a runnable task.
981    *task = *i;
982    pending_tasks_.erase(i);
983    if (task->shutdown_behavior == BLOCK_SHUTDOWN) {
984      blocking_shutdown_pending_task_count_--;
985    }
986
987    status = GET_WORK_FOUND;
988    break;
989  }
990
991  // Track the number of tasks we had to skip over to see if we should be
992  // making this more efficient. If this number ever becomes large or is
993  // frequently "some", we should consider the optimization above.
994#if !defined(OS_NACL)
995  UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount",
996                           unrunnable_tasks);
997#endif
998  return status;
999}
1000
1001int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
1002  lock_.AssertAcquired();
1003
1004  // Mark the task's sequence number as in use.
1005  if (task.sequence_token_id)
1006    current_sequences_.insert(task.sequence_token_id);
1007
1008  // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN
1009  // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread
1010  // completes.
1011  if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN)
1012    blocking_shutdown_thread_count_++;
1013
1014  // We just picked up a task. Since StartAdditionalThreadIfHelpful only
1015  // creates a new thread if there is no free one, there is a race when posting
1016  // tasks that many tasks could have been posted before a thread started
1017  // running them, so only one thread would have been created. So we also check
1018  // whether we should create more threads after removing our task from the
1019  // queue, which also has the nice side effect of creating the workers from
1020  // background threads rather than the main thread of the app.
1021  //
1022  // If another thread wasn't created, we want to wake up an existing thread
1023  // if there is one waiting to pick up the next task.
1024  //
1025  // Note that we really need to do this *before* running the task, not
1026  // after. Otherwise, if more than one task is posted, the creation of the
1027  // second thread (since we only create one at a time) will be blocked by
1028  // the execution of the first task, which could be arbitrarily long.
1029  return PrepareToStartAdditionalThreadIfHelpful();
1030}
1031
1032void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
1033  lock_.AssertAcquired();
1034
1035  if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) {
1036    DCHECK_GT(blocking_shutdown_thread_count_, 0u);
1037    blocking_shutdown_thread_count_--;
1038  }
1039
1040  if (task.sequence_token_id)
1041    current_sequences_.erase(task.sequence_token_id);
1042}
1043
1044bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
1045    int sequence_token_id) const {
1046  lock_.AssertAcquired();
1047  return !sequence_token_id ||
1048      current_sequences_.find(sequence_token_id) ==
1049          current_sequences_.end();
1050}
1051
1052int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
1053  lock_.AssertAcquired();
1054  // How thread creation works:
1055  //
1056  // We'de like to avoid creating threads with the lock held. However, we
1057  // need to be sure that we have an accurate accounting of the threads for
1058  // proper Joining and deltion on shutdown.
1059  //
1060  // We need to figure out if we need another thread with the lock held, which
1061  // is what this function does. It then marks us as in the process of creating
1062  // a thread. When we do shutdown, we wait until the thread_being_created_
1063  // flag is cleared, which ensures that the new thread is properly added to
1064  // all the data structures and we can't leak it. Once shutdown starts, we'll
1065  // refuse to create more threads or they would be leaked.
1066  //
1067  // Note that this creates a mostly benign race condition on shutdown that
1068  // will cause fewer workers to be created than one would expect. It isn't
1069  // much of an issue in real life, but affects some tests. Since we only spawn
1070  // one worker at a time, the following sequence of events can happen:
1071  //
1072  //  1. Main thread posts a bunch of unrelated tasks that would normally be
1073  //     run on separate threads.
1074  //  2. The first task post causes us to start a worker. Other tasks do not
1075  //     cause a worker to start since one is pending.
1076  //  3. Main thread initiates shutdown.
1077  //  4. No more threads are created since the shutdown_called_ flag is set.
1078  //
1079  // The result is that one may expect that max_threads_ workers to be created
1080  // given the workload, but in reality fewer may be created because the
1081  // sequence of thread creation on the background threads is racing with the
1082  // shutdown call.
1083  if (!shutdown_called_ &&
1084      !thread_being_created_ &&
1085      cleanup_state_ == CLEANUP_DONE &&
1086      threads_.size() < max_threads_ &&
1087      waiting_thread_count_ == 0) {
1088    // We could use an additional thread if there's work to be done.
1089    for (PendingTaskSet::const_iterator i = pending_tasks_.begin();
1090         i != pending_tasks_.end(); ++i) {
1091      if (IsSequenceTokenRunnable(i->sequence_token_id)) {
1092        // Found a runnable task, mark the thread as being started.
1093        thread_being_created_ = true;
1094        return static_cast<int>(threads_.size() + 1);
1095      }
1096    }
1097  }
1098  return 0;
1099}
1100
1101void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
1102    int thread_number) {
1103  // Called outside of the lock.
1104  DCHECK(thread_number > 0);
1105
1106  // The worker is assigned to the list when the thread actually starts, which
1107  // will manage the memory of the pointer.
1108  new Worker(worker_pool_, thread_number, thread_name_prefix_);
1109}
1110
1111void SequencedWorkerPool::Inner::SignalHasWork() {
1112  has_work_cv_.Signal();
1113  if (testing_observer_) {
1114    testing_observer_->OnHasWork();
1115  }
1116}
1117
1118bool SequencedWorkerPool::Inner::CanShutdown() const {
1119  lock_.AssertAcquired();
1120  // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
1121  return !thread_being_created_ &&
1122         blocking_shutdown_thread_count_ == 0 &&
1123         blocking_shutdown_pending_task_count_ == 0;
1124}
1125
1126base::StaticAtomicSequenceNumber
1127SequencedWorkerPool::Inner::g_last_sequence_number_;
1128
1129// SequencedWorkerPool --------------------------------------------------------
1130
1131// static
1132SequencedWorkerPool::SequenceToken
1133SequencedWorkerPool::GetSequenceTokenForCurrentThread() {
1134  // Don't construct lazy instance on check.
1135  if (g_lazy_tls_ptr == NULL)
1136    return SequenceToken();
1137
1138  SequencedWorkerPool::SequenceToken* token = g_lazy_tls_ptr.Get().Get();
1139  if (!token)
1140    return SequenceToken();
1141  return *token;
1142}
1143
1144SequencedWorkerPool::SequencedWorkerPool(
1145    size_t max_threads,
1146    const std::string& thread_name_prefix)
1147    : constructor_message_loop_(MessageLoopProxy::current()),
1148      inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) {
1149}
1150
1151SequencedWorkerPool::SequencedWorkerPool(
1152    size_t max_threads,
1153    const std::string& thread_name_prefix,
1154    TestingObserver* observer)
1155    : constructor_message_loop_(MessageLoopProxy::current()),
1156      inner_(new Inner(this, max_threads, thread_name_prefix, observer)) {
1157}
1158
1159SequencedWorkerPool::~SequencedWorkerPool() {}
1160
1161void SequencedWorkerPool::OnDestruct() const {
1162  DCHECK(constructor_message_loop_.get());
1163  // Avoid deleting ourselves on a worker thread (which would
1164  // deadlock).
1165  if (RunsTasksOnCurrentThread()) {
1166    constructor_message_loop_->DeleteSoon(FROM_HERE, this);
1167  } else {
1168    delete this;
1169  }
1170}
1171
1172SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
1173  return inner_->GetSequenceToken();
1174}
1175
1176SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
1177    const std::string& name) {
1178  return inner_->GetNamedSequenceToken(name);
1179}
1180
1181scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner(
1182    SequenceToken token) {
1183  return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN);
1184}
1185
1186scoped_refptr<SequencedTaskRunner>
1187SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior(
1188    SequenceToken token, WorkerShutdown shutdown_behavior) {
1189  return new SequencedWorkerPoolSequencedTaskRunner(
1190      this, token, shutdown_behavior);
1191}
1192
1193scoped_refptr<TaskRunner>
1194SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior(
1195    WorkerShutdown shutdown_behavior) {
1196  return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior);
1197}
1198
1199bool SequencedWorkerPool::PostWorkerTask(
1200    const tracked_objects::Location& from_here,
1201    const Closure& task) {
1202  return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN,
1203                          from_here, task, TimeDelta());
1204}
1205
1206bool SequencedWorkerPool::PostDelayedWorkerTask(
1207    const tracked_objects::Location& from_here,
1208    const Closure& task,
1209    TimeDelta delay) {
1210  WorkerShutdown shutdown_behavior =
1211      delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
1212  return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
1213                          from_here, task, delay);
1214}
1215
1216bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
1217    const tracked_objects::Location& from_here,
1218    const Closure& task,
1219    WorkerShutdown shutdown_behavior) {
1220  return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
1221                          from_here, task, TimeDelta());
1222}
1223
1224bool SequencedWorkerPool::PostSequencedWorkerTask(
1225    SequenceToken sequence_token,
1226    const tracked_objects::Location& from_here,
1227    const Closure& task) {
1228  return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN,
1229                          from_here, task, TimeDelta());
1230}
1231
1232bool SequencedWorkerPool::PostDelayedSequencedWorkerTask(
1233    SequenceToken sequence_token,
1234    const tracked_objects::Location& from_here,
1235    const Closure& task,
1236    TimeDelta delay) {
1237  WorkerShutdown shutdown_behavior =
1238      delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
1239  return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
1240                          from_here, task, delay);
1241}
1242
1243bool SequencedWorkerPool::PostNamedSequencedWorkerTask(
1244    const std::string& token_name,
1245    const tracked_objects::Location& from_here,
1246    const Closure& task) {
1247  DCHECK(!token_name.empty());
1248  return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN,
1249                          from_here, task, TimeDelta());
1250}
1251
1252bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior(
1253    SequenceToken sequence_token,
1254    const tracked_objects::Location& from_here,
1255    const Closure& task,
1256    WorkerShutdown shutdown_behavior) {
1257  return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
1258                          from_here, task, TimeDelta());
1259}
1260
1261bool SequencedWorkerPool::PostDelayedTask(
1262    const tracked_objects::Location& from_here,
1263    const Closure& task,
1264    TimeDelta delay) {
1265  return PostDelayedWorkerTask(from_here, task, delay);
1266}
1267
1268bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
1269  return inner_->RunsTasksOnCurrentThread();
1270}
1271
1272bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
1273    SequenceToken sequence_token) const {
1274  return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
1275}
1276
1277void SequencedWorkerPool::FlushForTesting() {
1278  inner_->CleanupForTesting();
1279}
1280
1281void SequencedWorkerPool::SignalHasWorkForTesting() {
1282  inner_->SignalHasWorkForTesting();
1283}
1284
1285void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1286  DCHECK(constructor_message_loop_->BelongsToCurrentThread());
1287  inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1288}
1289
1290bool SequencedWorkerPool::IsShutdownInProgress() {
1291  return inner_->IsShutdownInProgress();
1292}
1293
1294}  // namespace base
1295