sequenced_worker_pool.cc revision 5d1f7b1de12d16ceb2c938c56701a3e8bfa558f7
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "base/threading/sequenced_worker_pool.h"
6
7#include <list>
8#include <map>
9#include <set>
10#include <utility>
11#include <vector>
12
13#include "base/atomic_sequence_num.h"
14#include "base/callback.h"
15#include "base/compiler_specific.h"
16#include "base/critical_closure.h"
17#include "base/debug/trace_event.h"
18#include "base/lazy_instance.h"
19#include "base/logging.h"
20#include "base/memory/linked_ptr.h"
21#include "base/message_loop/message_loop_proxy.h"
22#include "base/stl_util.h"
23#include "base/strings/stringprintf.h"
24#include "base/synchronization/condition_variable.h"
25#include "base/synchronization/lock.h"
26#include "base/threading/platform_thread.h"
27#include "base/threading/simple_thread.h"
28#include "base/threading/thread_local.h"
29#include "base/threading/thread_restrictions.h"
30#include "base/time/time.h"
31#include "base/tracked_objects.h"
32
33#if defined(OS_MACOSX)
34#include "base/mac/scoped_nsautorelease_pool.h"
35#endif
36
37#if !defined(OS_NACL)
38#include "base/metrics/histogram.h"
39#endif
40
41namespace base {
42
43namespace {
44
45struct SequencedTask : public TrackingInfo  {
46  SequencedTask()
47      : sequence_token_id(0),
48        trace_id(0),
49        sequence_task_number(0),
50        shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
51
52  explicit SequencedTask(const tracked_objects::Location& from_here)
53      : base::TrackingInfo(from_here, TimeTicks()),
54        sequence_token_id(0),
55        trace_id(0),
56        sequence_task_number(0),
57        shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
58
59  ~SequencedTask() {}
60
61  int sequence_token_id;
62  int trace_id;
63  int64 sequence_task_number;
64  SequencedWorkerPool::WorkerShutdown shutdown_behavior;
65  tracked_objects::Location posted_from;
66  Closure task;
67
68  // Non-delayed tasks and delayed tasks are managed together by time-to-run
69  // order. We calculate the time by adding the posted time and the given delay.
70  TimeTicks time_to_run;
71};
72
73struct SequencedTaskLessThan {
74 public:
75  bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const {
76    if (lhs.time_to_run < rhs.time_to_run)
77      return true;
78
79    if (lhs.time_to_run > rhs.time_to_run)
80      return false;
81
82    // If the time happen to match, then we use the sequence number to decide.
83    return lhs.sequence_task_number < rhs.sequence_task_number;
84  }
85};
86
87// SequencedWorkerPoolTaskRunner ---------------------------------------------
88// A TaskRunner which posts tasks to a SequencedWorkerPool with a
89// fixed ShutdownBehavior.
90//
91// Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
92class SequencedWorkerPoolTaskRunner : public TaskRunner {
93 public:
94  SequencedWorkerPoolTaskRunner(
95      const scoped_refptr<SequencedWorkerPool>& pool,
96      SequencedWorkerPool::WorkerShutdown shutdown_behavior);
97
98  // TaskRunner implementation
99  virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
100                               const Closure& task,
101                               TimeDelta delay) OVERRIDE;
102  virtual bool RunsTasksOnCurrentThread() const OVERRIDE;
103
104 private:
105  virtual ~SequencedWorkerPoolTaskRunner();
106
107  const scoped_refptr<SequencedWorkerPool> pool_;
108
109  const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
110
111  DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner);
112};
113
114SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner(
115    const scoped_refptr<SequencedWorkerPool>& pool,
116    SequencedWorkerPool::WorkerShutdown shutdown_behavior)
117    : pool_(pool),
118      shutdown_behavior_(shutdown_behavior) {
119}
120
121SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() {
122}
123
124bool SequencedWorkerPoolTaskRunner::PostDelayedTask(
125    const tracked_objects::Location& from_here,
126    const Closure& task,
127    TimeDelta delay) {
128  if (delay == TimeDelta()) {
129    return pool_->PostWorkerTaskWithShutdownBehavior(
130        from_here, task, shutdown_behavior_);
131  }
132  return pool_->PostDelayedWorkerTask(from_here, task, delay);
133}
134
135bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const {
136  return pool_->RunsTasksOnCurrentThread();
137}
138
139// SequencedWorkerPoolSequencedTaskRunner ------------------------------------
140// A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a
141// fixed sequence token.
142//
143// Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
144class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner {
145 public:
146  SequencedWorkerPoolSequencedTaskRunner(
147      const scoped_refptr<SequencedWorkerPool>& pool,
148      SequencedWorkerPool::SequenceToken token,
149      SequencedWorkerPool::WorkerShutdown shutdown_behavior);
150
151  // TaskRunner implementation
152  virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
153                               const Closure& task,
154                               TimeDelta delay) OVERRIDE;
155  virtual bool RunsTasksOnCurrentThread() const OVERRIDE;
156
157  // SequencedTaskRunner implementation
158  virtual bool PostNonNestableDelayedTask(
159      const tracked_objects::Location& from_here,
160      const Closure& task,
161      TimeDelta delay) OVERRIDE;
162
163 private:
164  virtual ~SequencedWorkerPoolSequencedTaskRunner();
165
166  const scoped_refptr<SequencedWorkerPool> pool_;
167
168  const SequencedWorkerPool::SequenceToken token_;
169
170  const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
171
172  DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner);
173};
174
175SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner(
176    const scoped_refptr<SequencedWorkerPool>& pool,
177    SequencedWorkerPool::SequenceToken token,
178    SequencedWorkerPool::WorkerShutdown shutdown_behavior)
179    : pool_(pool),
180      token_(token),
181      shutdown_behavior_(shutdown_behavior) {
182}
183
184SequencedWorkerPoolSequencedTaskRunner::
185~SequencedWorkerPoolSequencedTaskRunner() {
186}
187
188bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask(
189    const tracked_objects::Location& from_here,
190    const Closure& task,
191    TimeDelta delay) {
192  if (delay == TimeDelta()) {
193    return pool_->PostSequencedWorkerTaskWithShutdownBehavior(
194        token_, from_here, task, shutdown_behavior_);
195  }
196  return pool_->PostDelayedSequencedWorkerTask(token_, from_here, task, delay);
197}
198
199bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const {
200  return pool_->IsRunningSequenceOnCurrentThread(token_);
201}
202
203bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask(
204    const tracked_objects::Location& from_here,
205    const Closure& task,
206    TimeDelta delay) {
207  // There's no way to run nested tasks, so simply forward to
208  // PostDelayedTask.
209  return PostDelayedTask(from_here, task, delay);
210}
211
212// Create a process-wide unique ID to represent this task in trace events. This
213// will be mangled with a Process ID hash to reduce the likelyhood of colliding
214// with MessageLoop pointers on other processes.
215uint64 GetTaskTraceID(const SequencedTask& task,
216                      void* pool) {
217  return (static_cast<uint64>(task.trace_id) << 32) |
218         static_cast<uint64>(reinterpret_cast<intptr_t>(pool));
219}
220
221base::LazyInstance<base::ThreadLocalPointer<
222    SequencedWorkerPool::SequenceToken> >::Leaky g_lazy_tls_ptr =
223        LAZY_INSTANCE_INITIALIZER;
224
225}  // namespace
226
227// Worker ---------------------------------------------------------------------
228
229class SequencedWorkerPool::Worker : public SimpleThread {
230 public:
231  // Hold a (cyclic) ref to |worker_pool|, since we want to keep it
232  // around as long as we are running.
233  Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool,
234         int thread_number,
235         const std::string& thread_name_prefix);
236  virtual ~Worker();
237
238  // SimpleThread implementation. This actually runs the background thread.
239  virtual void Run() OVERRIDE;
240
241  void set_running_task_info(SequenceToken token,
242                             WorkerShutdown shutdown_behavior) {
243    running_sequence_ = token;
244    running_shutdown_behavior_ = shutdown_behavior;
245  }
246
247  SequenceToken running_sequence() const {
248    return running_sequence_;
249  }
250
251  WorkerShutdown running_shutdown_behavior() const {
252    return running_shutdown_behavior_;
253  }
254
255 private:
256  scoped_refptr<SequencedWorkerPool> worker_pool_;
257  SequenceToken running_sequence_;
258  WorkerShutdown running_shutdown_behavior_;
259
260  DISALLOW_COPY_AND_ASSIGN(Worker);
261};
262
263// Inner ----------------------------------------------------------------------
264
265class SequencedWorkerPool::Inner {
266 public:
267  // Take a raw pointer to |worker| to avoid cycles (since we're owned
268  // by it).
269  Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
270        const std::string& thread_name_prefix,
271        TestingObserver* observer);
272
273  ~Inner();
274
275  SequenceToken GetSequenceToken();
276
277  SequenceToken GetNamedSequenceToken(const std::string& name);
278
279  // This function accepts a name and an ID. If the name is null, the
280  // token ID is used. This allows us to implement the optional name lookup
281  // from a single function without having to enter the lock a separate time.
282  bool PostTask(const std::string* optional_token_name,
283                SequenceToken sequence_token,
284                WorkerShutdown shutdown_behavior,
285                const tracked_objects::Location& from_here,
286                const Closure& task,
287                TimeDelta delay);
288
289  bool RunsTasksOnCurrentThread() const;
290
291  bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
292
293  void CleanupForTesting();
294
295  void SignalHasWorkForTesting();
296
297  int GetWorkSignalCountForTesting() const;
298
299  void Shutdown(int max_blocking_tasks_after_shutdown);
300
301  bool IsShutdownInProgress();
302
303  // Runs the worker loop on the background thread.
304  void ThreadLoop(Worker* this_worker);
305
306 private:
307  enum GetWorkStatus {
308    GET_WORK_FOUND,
309    GET_WORK_NOT_FOUND,
310    GET_WORK_WAIT,
311  };
312
313  enum CleanupState {
314    CLEANUP_REQUESTED,
315    CLEANUP_STARTING,
316    CLEANUP_RUNNING,
317    CLEANUP_FINISHING,
318    CLEANUP_DONE,
319  };
320
321  // Called from within the lock, this converts the given token name into a
322  // token ID, creating a new one if necessary.
323  int LockedGetNamedTokenID(const std::string& name);
324
325  // Called from within the lock, this returns the next sequence task number.
326  int64 LockedGetNextSequenceTaskNumber();
327
328  // Called from within the lock, returns the shutdown behavior of the task
329  // running on the currently executing worker thread. If invoked from a thread
330  // that is not one of the workers, returns CONTINUE_ON_SHUTDOWN.
331  WorkerShutdown LockedCurrentThreadShutdownBehavior() const;
332
333  // Gets new task. There are 3 cases depending on the return value:
334  //
335  // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should
336  //    be run immediately.
337  // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run,
338  //    and |task| is not filled in. In this case, the caller should wait until
339  //    a task is posted.
340  // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run
341  //    immediately, and |task| is not filled in. Likewise, |wait_time| is
342  //    filled in the time to wait until the next task to run. In this case, the
343  //    caller should wait the time.
344  //
345  // In any case, the calling code should clear the given
346  // delete_these_outside_lock vector the next time the lock is released.
347  // See the implementation for a more detailed description.
348  GetWorkStatus GetWork(SequencedTask* task,
349                        TimeDelta* wait_time,
350                        std::vector<Closure>* delete_these_outside_lock);
351
352  void HandleCleanup();
353
354  // Peforms init and cleanup around running the given task. WillRun...
355  // returns the value from PrepareToStartAdditionalThreadIfNecessary.
356  // The calling code should call FinishStartingAdditionalThread once the
357  // lock is released if the return values is nonzero.
358  int WillRunWorkerTask(const SequencedTask& task);
359  void DidRunWorkerTask(const SequencedTask& task);
360
361  // Returns true if there are no threads currently running the given
362  // sequence token.
363  bool IsSequenceTokenRunnable(int sequence_token_id) const;
364
365  // Checks if all threads are busy and the addition of one more could run an
366  // additional task waiting in the queue. This must be called from within
367  // the lock.
368  //
369  // If another thread is helpful, this will mark the thread as being in the
370  // process of starting and returns the index of the new thread which will be
371  // 0 or more. The caller should then call FinishStartingAdditionalThread to
372  // complete initialization once the lock is released.
373  //
374  // If another thread is not necessary, returne 0;
375  //
376  // See the implementedion for more.
377  int PrepareToStartAdditionalThreadIfHelpful();
378
379  // The second part of thread creation after
380  // PrepareToStartAdditionalThreadIfHelpful with the thread number it
381  // generated. This actually creates the thread and should be called outside
382  // the lock to avoid blocking important work starting a thread in the lock.
383  void FinishStartingAdditionalThread(int thread_number);
384
385  // Signal |has_work_| and increment |has_work_signal_count_|.
386  void SignalHasWork();
387
388  // Checks whether there is work left that's blocking shutdown. Must be
389  // called inside the lock.
390  bool CanShutdown() const;
391
392  SequencedWorkerPool* const worker_pool_;
393
394  // The last sequence number used. Managed by GetSequenceToken, since this
395  // only does threadsafe increment operations, you do not need to hold the
396  // lock. This is class-static to make SequenceTokens issued by
397  // GetSequenceToken unique across SequencedWorkerPool instances.
398  static base::StaticAtomicSequenceNumber g_last_sequence_number_;
399
400  // This lock protects |everything in this class|. Do not read or modify
401  // anything without holding this lock. Do not block while holding this
402  // lock.
403  mutable Lock lock_;
404
405  // Condition variable that is waited on by worker threads until new
406  // tasks are posted or shutdown starts.
407  ConditionVariable has_work_cv_;
408
409  // Condition variable that is waited on by non-worker threads (in
410  // Shutdown()) until CanShutdown() goes to true.
411  ConditionVariable can_shutdown_cv_;
412
413  // The maximum number of worker threads we'll create.
414  const size_t max_threads_;
415
416  const std::string thread_name_prefix_;
417
418  // Associates all known sequence token names with their IDs.
419  std::map<std::string, int> named_sequence_tokens_;
420
421  // Owning pointers to all threads we've created so far, indexed by
422  // ID. Since we lazily create threads, this may be less than
423  // max_threads_ and will be initially empty.
424  typedef std::map<PlatformThreadId, linked_ptr<Worker> > ThreadMap;
425  ThreadMap threads_;
426
427  // Set to true when we're in the process of creating another thread.
428  // See PrepareToStartAdditionalThreadIfHelpful for more.
429  bool thread_being_created_;
430
431  // Number of threads currently waiting for work.
432  size_t waiting_thread_count_;
433
434  // Number of threads currently running tasks that have the BLOCK_SHUTDOWN
435  // or SKIP_ON_SHUTDOWN flag set.
436  size_t blocking_shutdown_thread_count_;
437
438  // A set of all pending tasks in time-to-run order. These are tasks that are
439  // either waiting for a thread to run on, waiting for their time to run,
440  // or blocked on a previous task in their sequence. We have to iterate over
441  // the tasks by time-to-run order, so we use the set instead of the
442  // traditional priority_queue.
443  typedef std::set<SequencedTask, SequencedTaskLessThan> PendingTaskSet;
444  PendingTaskSet pending_tasks_;
445
446  // The next sequence number for a new sequenced task.
447  int64 next_sequence_task_number_;
448
449  // Number of tasks in the pending_tasks_ list that are marked as blocking
450  // shutdown.
451  size_t blocking_shutdown_pending_task_count_;
452
453  // Lists all sequence tokens currently executing.
454  std::set<int> current_sequences_;
455
456  // An ID for each posted task to distinguish the task from others in traces.
457  int trace_id_;
458
459  // Set when Shutdown is called and no further tasks should be
460  // allowed, though we may still be running existing tasks.
461  bool shutdown_called_;
462
463  // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown()
464  // has been called.
465  int max_blocking_tasks_after_shutdown_;
466
467  // State used to cleanup for testing, all guarded by lock_.
468  CleanupState cleanup_state_;
469  size_t cleanup_idlers_;
470  ConditionVariable cleanup_cv_;
471
472  TestingObserver* const testing_observer_;
473
474  DISALLOW_COPY_AND_ASSIGN(Inner);
475};
476
477// Worker definitions ---------------------------------------------------------
478
479SequencedWorkerPool::Worker::Worker(
480    const scoped_refptr<SequencedWorkerPool>& worker_pool,
481    int thread_number,
482    const std::string& prefix)
483    : SimpleThread(
484          prefix + StringPrintf("Worker%d", thread_number).c_str()),
485      worker_pool_(worker_pool),
486      running_shutdown_behavior_(CONTINUE_ON_SHUTDOWN) {
487  Start();
488}
489
490SequencedWorkerPool::Worker::~Worker() {
491}
492
493void SequencedWorkerPool::Worker::Run() {
494  // Store a pointer to the running sequence in thread local storage for
495  // static function access.
496  g_lazy_tls_ptr.Get().Set(&running_sequence_);
497
498  // Just jump back to the Inner object to run the thread, since it has all the
499  // tracking information and queues. It might be more natural to implement
500  // using DelegateSimpleThread and have Inner implement the Delegate to avoid
501  // having these worker objects at all, but that method lacks the ability to
502  // send thread-specific information easily to the thread loop.
503  worker_pool_->inner_->ThreadLoop(this);
504  // Release our cyclic reference once we're done.
505  worker_pool_ = NULL;
506}
507
508// Inner definitions ---------------------------------------------------------
509
510SequencedWorkerPool::Inner::Inner(
511    SequencedWorkerPool* worker_pool,
512    size_t max_threads,
513    const std::string& thread_name_prefix,
514    TestingObserver* observer)
515    : worker_pool_(worker_pool),
516      lock_(),
517      has_work_cv_(&lock_),
518      can_shutdown_cv_(&lock_),
519      max_threads_(max_threads),
520      thread_name_prefix_(thread_name_prefix),
521      thread_being_created_(false),
522      waiting_thread_count_(0),
523      blocking_shutdown_thread_count_(0),
524      next_sequence_task_number_(0),
525      blocking_shutdown_pending_task_count_(0),
526      trace_id_(0),
527      shutdown_called_(false),
528      max_blocking_tasks_after_shutdown_(0),
529      cleanup_state_(CLEANUP_DONE),
530      cleanup_idlers_(0),
531      cleanup_cv_(&lock_),
532      testing_observer_(observer) {}
533
534SequencedWorkerPool::Inner::~Inner() {
535  // You must call Shutdown() before destroying the pool.
536  DCHECK(shutdown_called_);
537
538  // Need to explicitly join with the threads before they're destroyed or else
539  // they will be running when our object is half torn down.
540  for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it)
541    it->second->Join();
542  threads_.clear();
543
544  if (testing_observer_)
545    testing_observer_->OnDestruct();
546}
547
548SequencedWorkerPool::SequenceToken
549SequencedWorkerPool::Inner::GetSequenceToken() {
550  // Need to add one because StaticAtomicSequenceNumber starts at zero, which
551  // is used as a sentinel value in SequenceTokens.
552  return SequenceToken(g_last_sequence_number_.GetNext() + 1);
553}
554
555SequencedWorkerPool::SequenceToken
556SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) {
557  AutoLock lock(lock_);
558  return SequenceToken(LockedGetNamedTokenID(name));
559}
560
561bool SequencedWorkerPool::Inner::PostTask(
562    const std::string* optional_token_name,
563    SequenceToken sequence_token,
564    WorkerShutdown shutdown_behavior,
565    const tracked_objects::Location& from_here,
566    const Closure& task,
567    TimeDelta delay) {
568  DCHECK(delay == TimeDelta() || shutdown_behavior == SKIP_ON_SHUTDOWN);
569  SequencedTask sequenced(from_here);
570  sequenced.sequence_token_id = sequence_token.id_;
571  sequenced.shutdown_behavior = shutdown_behavior;
572  sequenced.posted_from = from_here;
573  sequenced.task =
574      shutdown_behavior == BLOCK_SHUTDOWN ?
575      base::MakeCriticalClosure(task) : task;
576  sequenced.time_to_run = TimeTicks::Now() + delay;
577
578  int create_thread_id = 0;
579  {
580    AutoLock lock(lock_);
581    if (shutdown_called_) {
582      if (shutdown_behavior != BLOCK_SHUTDOWN ||
583          LockedCurrentThreadShutdownBehavior() == CONTINUE_ON_SHUTDOWN) {
584        return false;
585      }
586      if (max_blocking_tasks_after_shutdown_ <= 0) {
587        DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed";
588        return false;
589      }
590      max_blocking_tasks_after_shutdown_ -= 1;
591    }
592
593    // The trace_id is used for identifying the task in about:tracing.
594    sequenced.trace_id = trace_id_++;
595
596    TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
597        "SequencedWorkerPool::PostTask",
598        TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))));
599
600    sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber();
601
602    // Now that we have the lock, apply the named token rules.
603    if (optional_token_name)
604      sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
605
606    pending_tasks_.insert(sequenced);
607    if (shutdown_behavior == BLOCK_SHUTDOWN)
608      blocking_shutdown_pending_task_count_++;
609
610    create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
611  }
612
613  // Actually start the additional thread or signal an existing one now that
614  // we're outside the lock.
615  if (create_thread_id)
616    FinishStartingAdditionalThread(create_thread_id);
617  else
618    SignalHasWork();
619
620  return true;
621}
622
623bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
624  AutoLock lock(lock_);
625  return ContainsKey(threads_, PlatformThread::CurrentId());
626}
627
628bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
629    SequenceToken sequence_token) const {
630  AutoLock lock(lock_);
631  ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
632  if (found == threads_.end())
633    return false;
634  return sequence_token.Equals(found->second->running_sequence());
635}
636
637// See https://code.google.com/p/chromium/issues/detail?id=168415
638void SequencedWorkerPool::Inner::CleanupForTesting() {
639  DCHECK(!RunsTasksOnCurrentThread());
640  base::ThreadRestrictions::ScopedAllowWait allow_wait;
641  AutoLock lock(lock_);
642  CHECK_EQ(CLEANUP_DONE, cleanup_state_);
643  if (shutdown_called_)
644    return;
645  if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size())
646    return;
647  cleanup_state_ = CLEANUP_REQUESTED;
648  cleanup_idlers_ = 0;
649  has_work_cv_.Signal();
650  while (cleanup_state_ != CLEANUP_DONE)
651    cleanup_cv_.Wait();
652}
653
654void SequencedWorkerPool::Inner::SignalHasWorkForTesting() {
655  SignalHasWork();
656}
657
658void SequencedWorkerPool::Inner::Shutdown(
659    int max_new_blocking_tasks_after_shutdown) {
660  DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0);
661  {
662    AutoLock lock(lock_);
663    // Cleanup and Shutdown should not be called concurrently.
664    CHECK_EQ(CLEANUP_DONE, cleanup_state_);
665    if (shutdown_called_)
666      return;
667    shutdown_called_ = true;
668    max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
669
670    // Tickle the threads. This will wake up a waiting one so it will know that
671    // it can exit, which in turn will wake up any other waiting ones.
672    SignalHasWork();
673
674    // There are no pending or running tasks blocking shutdown, we're done.
675    if (CanShutdown())
676      return;
677  }
678
679  // If we're here, then something is blocking shutdown.  So wait for
680  // CanShutdown() to go to true.
681
682  if (testing_observer_)
683    testing_observer_->WillWaitForShutdown();
684
685#if !defined(OS_NACL)
686  TimeTicks shutdown_wait_begin = TimeTicks::Now();
687#endif
688
689  {
690    base::ThreadRestrictions::ScopedAllowWait allow_wait;
691    AutoLock lock(lock_);
692    while (!CanShutdown())
693      can_shutdown_cv_.Wait();
694  }
695#if !defined(OS_NACL)
696  UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime",
697                      TimeTicks::Now() - shutdown_wait_begin);
698#endif
699}
700
701bool SequencedWorkerPool::Inner::IsShutdownInProgress() {
702    AutoLock lock(lock_);
703    return shutdown_called_;
704}
705
706void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
707  {
708    AutoLock lock(lock_);
709    DCHECK(thread_being_created_);
710    thread_being_created_ = false;
711    std::pair<ThreadMap::iterator, bool> result =
712        threads_.insert(
713            std::make_pair(this_worker->tid(), make_linked_ptr(this_worker)));
714    DCHECK(result.second);
715
716    while (true) {
717#if defined(OS_MACOSX)
718      base::mac::ScopedNSAutoreleasePool autorelease_pool;
719#endif
720
721      HandleCleanup();
722
723      // See GetWork for what delete_these_outside_lock is doing.
724      SequencedTask task;
725      TimeDelta wait_time;
726      std::vector<Closure> delete_these_outside_lock;
727      GetWorkStatus status =
728          GetWork(&task, &wait_time, &delete_these_outside_lock);
729      if (status == GET_WORK_FOUND) {
730        TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
731            "SequencedWorkerPool::PostTask",
732            TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))));
733        TRACE_EVENT2("toplevel", "SequencedWorkerPool::ThreadLoop",
734                     "src_file", task.posted_from.file_name(),
735                     "src_func", task.posted_from.function_name());
736        int new_thread_id = WillRunWorkerTask(task);
737        {
738          AutoUnlock unlock(lock_);
739          // There may be more work available, so wake up another
740          // worker thread. (Technically not required, since we
741          // already get a signal for each new task, but it doesn't
742          // hurt.)
743          SignalHasWork();
744          delete_these_outside_lock.clear();
745
746          // Complete thread creation outside the lock if necessary.
747          if (new_thread_id)
748            FinishStartingAdditionalThread(new_thread_id);
749
750          this_worker->set_running_task_info(
751              SequenceToken(task.sequence_token_id), task.shutdown_behavior);
752
753          tracked_objects::TrackedTime start_time =
754              tracked_objects::ThreadData::NowForStartOfRun(task.birth_tally);
755
756          task.task.Run();
757
758          tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(task,
759              start_time, tracked_objects::ThreadData::NowForEndOfRun());
760
761          // Make sure our task is erased outside the lock for the
762          // same reason we do this with delete_these_oustide_lock.
763          // Also, do it before calling set_running_task_info() so
764          // that sequence-checking from within the task's destructor
765          // still works.
766          task.task = Closure();
767
768          this_worker->set_running_task_info(
769              SequenceToken(), CONTINUE_ON_SHUTDOWN);
770        }
771        DidRunWorkerTask(task);  // Must be done inside the lock.
772      } else if (cleanup_state_ == CLEANUP_RUNNING) {
773        switch (status) {
774          case GET_WORK_WAIT: {
775              AutoUnlock unlock(lock_);
776              delete_these_outside_lock.clear();
777            }
778            break;
779          case GET_WORK_NOT_FOUND:
780            CHECK(delete_these_outside_lock.empty());
781            cleanup_state_ = CLEANUP_FINISHING;
782            cleanup_cv_.Broadcast();
783            break;
784          default:
785            NOTREACHED();
786        }
787      } else {
788        // When we're terminating and there's no more work, we can
789        // shut down, other workers can complete any pending or new tasks.
790        // We can get additional tasks posted after shutdown_called_ is set
791        // but only worker threads are allowed to post tasks at that time, and
792        // the workers responsible for posting those tasks will be available
793        // to run them. Also, there may be some tasks stuck behind running
794        // ones with the same sequence token, but additional threads won't
795        // help this case.
796        if (shutdown_called_ &&
797            blocking_shutdown_pending_task_count_ == 0)
798          break;
799        waiting_thread_count_++;
800
801        switch (status) {
802          case GET_WORK_NOT_FOUND:
803            has_work_cv_.Wait();
804            break;
805          case GET_WORK_WAIT:
806            has_work_cv_.TimedWait(wait_time);
807            break;
808          default:
809            NOTREACHED();
810        }
811        waiting_thread_count_--;
812      }
813    }
814  }  // Release lock_.
815
816  // We noticed we should exit. Wake up the next worker so it knows it should
817  // exit as well (because the Shutdown() code only signals once).
818  SignalHasWork();
819
820  // Possibly unblock shutdown.
821  can_shutdown_cv_.Signal();
822}
823
824void SequencedWorkerPool::Inner::HandleCleanup() {
825  lock_.AssertAcquired();
826  if (cleanup_state_ == CLEANUP_DONE)
827    return;
828  if (cleanup_state_ == CLEANUP_REQUESTED) {
829    // We win, we get to do the cleanup as soon as the others wise up and idle.
830    cleanup_state_ = CLEANUP_STARTING;
831    while (thread_being_created_ ||
832           cleanup_idlers_ != threads_.size() - 1) {
833      has_work_cv_.Signal();
834      cleanup_cv_.Wait();
835    }
836    cleanup_state_ = CLEANUP_RUNNING;
837    return;
838  }
839  if (cleanup_state_ == CLEANUP_STARTING) {
840    // Another worker thread is cleaning up, we idle here until thats done.
841    ++cleanup_idlers_;
842    cleanup_cv_.Broadcast();
843    while (cleanup_state_ != CLEANUP_FINISHING) {
844      cleanup_cv_.Wait();
845    }
846    --cleanup_idlers_;
847    cleanup_cv_.Broadcast();
848    return;
849  }
850  if (cleanup_state_ == CLEANUP_FINISHING) {
851    // We wait for all idlers to wake up prior to being DONE.
852    while (cleanup_idlers_ != 0) {
853      cleanup_cv_.Broadcast();
854      cleanup_cv_.Wait();
855    }
856    if (cleanup_state_ == CLEANUP_FINISHING) {
857      cleanup_state_ = CLEANUP_DONE;
858      cleanup_cv_.Signal();
859    }
860    return;
861  }
862}
863
864int SequencedWorkerPool::Inner::LockedGetNamedTokenID(
865    const std::string& name) {
866  lock_.AssertAcquired();
867  DCHECK(!name.empty());
868
869  std::map<std::string, int>::const_iterator found =
870      named_sequence_tokens_.find(name);
871  if (found != named_sequence_tokens_.end())
872    return found->second;  // Got an existing one.
873
874  // Create a new one for this name.
875  SequenceToken result = GetSequenceToken();
876  named_sequence_tokens_.insert(std::make_pair(name, result.id_));
877  return result.id_;
878}
879
880int64 SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() {
881  lock_.AssertAcquired();
882  // We assume that we never create enough tasks to wrap around.
883  return next_sequence_task_number_++;
884}
885
886SequencedWorkerPool::WorkerShutdown
887SequencedWorkerPool::Inner::LockedCurrentThreadShutdownBehavior() const {
888  lock_.AssertAcquired();
889  ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
890  if (found == threads_.end())
891    return CONTINUE_ON_SHUTDOWN;
892  return found->second->running_shutdown_behavior();
893}
894
895SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
896    SequencedTask* task,
897    TimeDelta* wait_time,
898    std::vector<Closure>* delete_these_outside_lock) {
899  lock_.AssertAcquired();
900
901#if !defined(OS_NACL)
902  UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount",
903                           static_cast<int>(pending_tasks_.size()));
904#endif
905
906  // Find the next task with a sequence token that's not currently in use.
907  // If the token is in use, that means another thread is running something
908  // in that sequence, and we can't run it without going out-of-order.
909  //
910  // This algorithm is simple and fair, but inefficient in some cases. For
911  // example, say somebody schedules 1000 slow tasks with the same sequence
912  // number. We'll have to go through all those tasks each time we feel like
913  // there might be work to schedule. If this proves to be a problem, we
914  // should make this more efficient.
915  //
916  // One possible enhancement would be to keep a map from sequence ID to a
917  // list of pending but currently blocked SequencedTasks for that ID.
918  // When a worker finishes a task of one sequence token, it can pick up the
919  // next one from that token right away.
920  //
921  // This may lead to starvation if there are sufficient numbers of sequences
922  // in use. To alleviate this, we could add an incrementing priority counter
923  // to each SequencedTask. Then maintain a priority_queue of all runnable
924  // tasks, sorted by priority counter. When a sequenced task is completed
925  // we would pop the head element off of that tasks pending list and add it
926  // to the priority queue. Then we would run the first item in the priority
927  // queue.
928
929  GetWorkStatus status = GET_WORK_NOT_FOUND;
930  int unrunnable_tasks = 0;
931  PendingTaskSet::iterator i = pending_tasks_.begin();
932  // We assume that the loop below doesn't take too long and so we can just do
933  // a single call to TimeTicks::Now().
934  const TimeTicks current_time = TimeTicks::Now();
935  while (i != pending_tasks_.end()) {
936    if (!IsSequenceTokenRunnable(i->sequence_token_id)) {
937      unrunnable_tasks++;
938      ++i;
939      continue;
940    }
941
942    if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) {
943      // We're shutting down and the task we just found isn't blocking
944      // shutdown. Delete it and get more work.
945      //
946      // Note that we do not want to delete unrunnable tasks. Deleting a task
947      // can have side effects (like freeing some objects) and deleting a
948      // task that's supposed to run after one that's currently running could
949      // cause an obscure crash.
950      //
951      // We really want to delete these tasks outside the lock in case the
952      // closures are holding refs to objects that want to post work from
953      // their destructorss (which would deadlock). The closures are
954      // internally refcounted, so we just need to keep a copy of them alive
955      // until the lock is exited. The calling code can just clear() the
956      // vector they passed to us once the lock is exited to make this
957      // happen.
958      delete_these_outside_lock->push_back(i->task);
959      pending_tasks_.erase(i++);
960      continue;
961    }
962
963    if (i->time_to_run > current_time) {
964      // The time to run has not come yet.
965      *wait_time = i->time_to_run - current_time;
966      status = GET_WORK_WAIT;
967      if (cleanup_state_ == CLEANUP_RUNNING) {
968        // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop.
969        delete_these_outside_lock->push_back(i->task);
970        pending_tasks_.erase(i);
971      }
972      break;
973    }
974
975    // Found a runnable task.
976    *task = *i;
977    pending_tasks_.erase(i);
978    if (task->shutdown_behavior == BLOCK_SHUTDOWN) {
979      blocking_shutdown_pending_task_count_--;
980    }
981
982    status = GET_WORK_FOUND;
983    break;
984  }
985
986  // Track the number of tasks we had to skip over to see if we should be
987  // making this more efficient. If this number ever becomes large or is
988  // frequently "some", we should consider the optimization above.
989#if !defined(OS_NACL)
990  UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount",
991                           unrunnable_tasks);
992#endif
993  return status;
994}
995
996int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
997  lock_.AssertAcquired();
998
999  // Mark the task's sequence number as in use.
1000  if (task.sequence_token_id)
1001    current_sequences_.insert(task.sequence_token_id);
1002
1003  // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN
1004  // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread
1005  // completes.
1006  if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN)
1007    blocking_shutdown_thread_count_++;
1008
1009  // We just picked up a task. Since StartAdditionalThreadIfHelpful only
1010  // creates a new thread if there is no free one, there is a race when posting
1011  // tasks that many tasks could have been posted before a thread started
1012  // running them, so only one thread would have been created. So we also check
1013  // whether we should create more threads after removing our task from the
1014  // queue, which also has the nice side effect of creating the workers from
1015  // background threads rather than the main thread of the app.
1016  //
1017  // If another thread wasn't created, we want to wake up an existing thread
1018  // if there is one waiting to pick up the next task.
1019  //
1020  // Note that we really need to do this *before* running the task, not
1021  // after. Otherwise, if more than one task is posted, the creation of the
1022  // second thread (since we only create one at a time) will be blocked by
1023  // the execution of the first task, which could be arbitrarily long.
1024  return PrepareToStartAdditionalThreadIfHelpful();
1025}
1026
1027void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
1028  lock_.AssertAcquired();
1029
1030  if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) {
1031    DCHECK_GT(blocking_shutdown_thread_count_, 0u);
1032    blocking_shutdown_thread_count_--;
1033  }
1034
1035  if (task.sequence_token_id)
1036    current_sequences_.erase(task.sequence_token_id);
1037}
1038
1039bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
1040    int sequence_token_id) const {
1041  lock_.AssertAcquired();
1042  return !sequence_token_id ||
1043      current_sequences_.find(sequence_token_id) ==
1044          current_sequences_.end();
1045}
1046
1047int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
1048  lock_.AssertAcquired();
1049  // How thread creation works:
1050  //
1051  // We'de like to avoid creating threads with the lock held. However, we
1052  // need to be sure that we have an accurate accounting of the threads for
1053  // proper Joining and deltion on shutdown.
1054  //
1055  // We need to figure out if we need another thread with the lock held, which
1056  // is what this function does. It then marks us as in the process of creating
1057  // a thread. When we do shutdown, we wait until the thread_being_created_
1058  // flag is cleared, which ensures that the new thread is properly added to
1059  // all the data structures and we can't leak it. Once shutdown starts, we'll
1060  // refuse to create more threads or they would be leaked.
1061  //
1062  // Note that this creates a mostly benign race condition on shutdown that
1063  // will cause fewer workers to be created than one would expect. It isn't
1064  // much of an issue in real life, but affects some tests. Since we only spawn
1065  // one worker at a time, the following sequence of events can happen:
1066  //
1067  //  1. Main thread posts a bunch of unrelated tasks that would normally be
1068  //     run on separate threads.
1069  //  2. The first task post causes us to start a worker. Other tasks do not
1070  //     cause a worker to start since one is pending.
1071  //  3. Main thread initiates shutdown.
1072  //  4. No more threads are created since the shutdown_called_ flag is set.
1073  //
1074  // The result is that one may expect that max_threads_ workers to be created
1075  // given the workload, but in reality fewer may be created because the
1076  // sequence of thread creation on the background threads is racing with the
1077  // shutdown call.
1078  if (!shutdown_called_ &&
1079      !thread_being_created_ &&
1080      cleanup_state_ == CLEANUP_DONE &&
1081      threads_.size() < max_threads_ &&
1082      waiting_thread_count_ == 0) {
1083    // We could use an additional thread if there's work to be done.
1084    for (PendingTaskSet::const_iterator i = pending_tasks_.begin();
1085         i != pending_tasks_.end(); ++i) {
1086      if (IsSequenceTokenRunnable(i->sequence_token_id)) {
1087        // Found a runnable task, mark the thread as being started.
1088        thread_being_created_ = true;
1089        return static_cast<int>(threads_.size() + 1);
1090      }
1091    }
1092  }
1093  return 0;
1094}
1095
1096void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
1097    int thread_number) {
1098  // Called outside of the lock.
1099  DCHECK(thread_number > 0);
1100
1101  // The worker is assigned to the list when the thread actually starts, which
1102  // will manage the memory of the pointer.
1103  new Worker(worker_pool_, thread_number, thread_name_prefix_);
1104}
1105
1106void SequencedWorkerPool::Inner::SignalHasWork() {
1107  has_work_cv_.Signal();
1108  if (testing_observer_) {
1109    testing_observer_->OnHasWork();
1110  }
1111}
1112
1113bool SequencedWorkerPool::Inner::CanShutdown() const {
1114  lock_.AssertAcquired();
1115  // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
1116  return !thread_being_created_ &&
1117         blocking_shutdown_thread_count_ == 0 &&
1118         blocking_shutdown_pending_task_count_ == 0;
1119}
1120
1121base::StaticAtomicSequenceNumber
1122SequencedWorkerPool::Inner::g_last_sequence_number_;
1123
1124// SequencedWorkerPool --------------------------------------------------------
1125
1126// static
1127SequencedWorkerPool::SequenceToken
1128SequencedWorkerPool::GetSequenceTokenForCurrentThread() {
1129  // Don't construct lazy instance on check.
1130  if (g_lazy_tls_ptr == NULL)
1131    return SequenceToken();
1132
1133  SequencedWorkerPool::SequenceToken* token = g_lazy_tls_ptr.Get().Get();
1134  if (!token)
1135    return SequenceToken();
1136  return *token;
1137}
1138
1139SequencedWorkerPool::SequencedWorkerPool(
1140    size_t max_threads,
1141    const std::string& thread_name_prefix)
1142    : constructor_message_loop_(MessageLoopProxy::current()),
1143      inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) {
1144}
1145
1146SequencedWorkerPool::SequencedWorkerPool(
1147    size_t max_threads,
1148    const std::string& thread_name_prefix,
1149    TestingObserver* observer)
1150    : constructor_message_loop_(MessageLoopProxy::current()),
1151      inner_(new Inner(this, max_threads, thread_name_prefix, observer)) {
1152}
1153
1154SequencedWorkerPool::~SequencedWorkerPool() {}
1155
1156void SequencedWorkerPool::OnDestruct() const {
1157  DCHECK(constructor_message_loop_.get());
1158  // Avoid deleting ourselves on a worker thread (which would
1159  // deadlock).
1160  if (RunsTasksOnCurrentThread()) {
1161    constructor_message_loop_->DeleteSoon(FROM_HERE, this);
1162  } else {
1163    delete this;
1164  }
1165}
1166
1167SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
1168  return inner_->GetSequenceToken();
1169}
1170
1171SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
1172    const std::string& name) {
1173  return inner_->GetNamedSequenceToken(name);
1174}
1175
1176scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner(
1177    SequenceToken token) {
1178  return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN);
1179}
1180
1181scoped_refptr<SequencedTaskRunner>
1182SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior(
1183    SequenceToken token, WorkerShutdown shutdown_behavior) {
1184  return new SequencedWorkerPoolSequencedTaskRunner(
1185      this, token, shutdown_behavior);
1186}
1187
1188scoped_refptr<TaskRunner>
1189SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior(
1190    WorkerShutdown shutdown_behavior) {
1191  return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior);
1192}
1193
1194bool SequencedWorkerPool::PostWorkerTask(
1195    const tracked_objects::Location& from_here,
1196    const Closure& task) {
1197  return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN,
1198                          from_here, task, TimeDelta());
1199}
1200
1201bool SequencedWorkerPool::PostDelayedWorkerTask(
1202    const tracked_objects::Location& from_here,
1203    const Closure& task,
1204    TimeDelta delay) {
1205  WorkerShutdown shutdown_behavior =
1206      delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
1207  return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
1208                          from_here, task, delay);
1209}
1210
1211bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
1212    const tracked_objects::Location& from_here,
1213    const Closure& task,
1214    WorkerShutdown shutdown_behavior) {
1215  return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
1216                          from_here, task, TimeDelta());
1217}
1218
1219bool SequencedWorkerPool::PostSequencedWorkerTask(
1220    SequenceToken sequence_token,
1221    const tracked_objects::Location& from_here,
1222    const Closure& task) {
1223  return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN,
1224                          from_here, task, TimeDelta());
1225}
1226
1227bool SequencedWorkerPool::PostDelayedSequencedWorkerTask(
1228    SequenceToken sequence_token,
1229    const tracked_objects::Location& from_here,
1230    const Closure& task,
1231    TimeDelta delay) {
1232  WorkerShutdown shutdown_behavior =
1233      delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
1234  return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
1235                          from_here, task, delay);
1236}
1237
1238bool SequencedWorkerPool::PostNamedSequencedWorkerTask(
1239    const std::string& token_name,
1240    const tracked_objects::Location& from_here,
1241    const Closure& task) {
1242  DCHECK(!token_name.empty());
1243  return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN,
1244                          from_here, task, TimeDelta());
1245}
1246
1247bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior(
1248    SequenceToken sequence_token,
1249    const tracked_objects::Location& from_here,
1250    const Closure& task,
1251    WorkerShutdown shutdown_behavior) {
1252  return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
1253                          from_here, task, TimeDelta());
1254}
1255
1256bool SequencedWorkerPool::PostDelayedTask(
1257    const tracked_objects::Location& from_here,
1258    const Closure& task,
1259    TimeDelta delay) {
1260  return PostDelayedWorkerTask(from_here, task, delay);
1261}
1262
1263bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
1264  return inner_->RunsTasksOnCurrentThread();
1265}
1266
1267bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
1268    SequenceToken sequence_token) const {
1269  return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
1270}
1271
1272void SequencedWorkerPool::FlushForTesting() {
1273  inner_->CleanupForTesting();
1274}
1275
1276void SequencedWorkerPool::SignalHasWorkForTesting() {
1277  inner_->SignalHasWorkForTesting();
1278}
1279
1280void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1281  DCHECK(constructor_message_loop_->BelongsToCurrentThread());
1282  inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1283}
1284
1285bool SequencedWorkerPool::IsShutdownInProgress() {
1286  return inner_->IsShutdownInProgress();
1287}
1288
1289}  // namespace base
1290