sequenced_worker_pool.cc revision eb525c5499e34cc9c4b825d6d9e75bb07cc06ace
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "base/threading/sequenced_worker_pool.h"
6
7#include <list>
8#include <map>
9#include <set>
10#include <utility>
11#include <vector>
12
13#include "base/atomicops.h"
14#include "base/callback.h"
15#include "base/compiler_specific.h"
16#include "base/critical_closure.h"
17#include "base/debug/trace_event.h"
18#include "base/logging.h"
19#include "base/memory/linked_ptr.h"
20#include "base/message_loop/message_loop_proxy.h"
21#include "base/metrics/histogram.h"
22#include "base/stl_util.h"
23#include "base/strings/stringprintf.h"
24#include "base/synchronization/condition_variable.h"
25#include "base/synchronization/lock.h"
26#include "base/threading/platform_thread.h"
27#include "base/threading/simple_thread.h"
28#include "base/threading/thread_restrictions.h"
29#include "base/time/time.h"
30#include "base/tracked_objects.h"
31
32#if defined(OS_MACOSX)
33#include "base/mac/scoped_nsautorelease_pool.h"
34#endif
35
36namespace base {
37
38namespace {
39
40struct SequencedTask : public TrackingInfo  {
41  SequencedTask()
42      : sequence_token_id(0),
43        trace_id(0),
44        sequence_task_number(0),
45        shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
46
47  explicit SequencedTask(const tracked_objects::Location& from_here)
48      : base::TrackingInfo(from_here, TimeTicks()),
49        sequence_token_id(0),
50        trace_id(0),
51        sequence_task_number(0),
52        shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
53
54  ~SequencedTask() {}
55
56  int sequence_token_id;
57  int trace_id;
58  int64 sequence_task_number;
59  SequencedWorkerPool::WorkerShutdown shutdown_behavior;
60  tracked_objects::Location posted_from;
61  Closure task;
62
63  // Non-delayed tasks and delayed tasks are managed together by time-to-run
64  // order. We calculate the time by adding the posted time and the given delay.
65  TimeTicks time_to_run;
66};
67
68struct SequencedTaskLessThan {
69 public:
70  bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const {
71    if (lhs.time_to_run < rhs.time_to_run)
72      return true;
73
74    if (lhs.time_to_run > rhs.time_to_run)
75      return false;
76
77    // If the time happen to match, then we use the sequence number to decide.
78    return lhs.sequence_task_number < rhs.sequence_task_number;
79  }
80};
81
82// SequencedWorkerPoolTaskRunner ---------------------------------------------
83// A TaskRunner which posts tasks to a SequencedWorkerPool with a
84// fixed ShutdownBehavior.
85//
86// Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
87class SequencedWorkerPoolTaskRunner : public TaskRunner {
88 public:
89  SequencedWorkerPoolTaskRunner(
90      const scoped_refptr<SequencedWorkerPool>& pool,
91      SequencedWorkerPool::WorkerShutdown shutdown_behavior);
92
93  // TaskRunner implementation
94  virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
95                               const Closure& task,
96                               TimeDelta delay) OVERRIDE;
97  virtual bool RunsTasksOnCurrentThread() const OVERRIDE;
98
99 private:
100  virtual ~SequencedWorkerPoolTaskRunner();
101
102  const scoped_refptr<SequencedWorkerPool> pool_;
103
104  const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
105
106  DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner);
107};
108
109SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner(
110    const scoped_refptr<SequencedWorkerPool>& pool,
111    SequencedWorkerPool::WorkerShutdown shutdown_behavior)
112    : pool_(pool),
113      shutdown_behavior_(shutdown_behavior) {
114}
115
116SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() {
117}
118
119bool SequencedWorkerPoolTaskRunner::PostDelayedTask(
120    const tracked_objects::Location& from_here,
121    const Closure& task,
122    TimeDelta delay) {
123  if (delay == TimeDelta()) {
124    return pool_->PostWorkerTaskWithShutdownBehavior(
125        from_here, task, shutdown_behavior_);
126  }
127  return pool_->PostDelayedWorkerTask(from_here, task, delay);
128}
129
130bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const {
131  return pool_->RunsTasksOnCurrentThread();
132}
133
134// SequencedWorkerPoolSequencedTaskRunner ------------------------------------
135// A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a
136// fixed sequence token.
137//
138// Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
139class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner {
140 public:
141  SequencedWorkerPoolSequencedTaskRunner(
142      const scoped_refptr<SequencedWorkerPool>& pool,
143      SequencedWorkerPool::SequenceToken token,
144      SequencedWorkerPool::WorkerShutdown shutdown_behavior);
145
146  // TaskRunner implementation
147  virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
148                               const Closure& task,
149                               TimeDelta delay) OVERRIDE;
150  virtual bool RunsTasksOnCurrentThread() const OVERRIDE;
151
152  // SequencedTaskRunner implementation
153  virtual bool PostNonNestableDelayedTask(
154      const tracked_objects::Location& from_here,
155      const Closure& task,
156      TimeDelta delay) OVERRIDE;
157
158 private:
159  virtual ~SequencedWorkerPoolSequencedTaskRunner();
160
161  const scoped_refptr<SequencedWorkerPool> pool_;
162
163  const SequencedWorkerPool::SequenceToken token_;
164
165  const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
166
167  DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner);
168};
169
170SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner(
171    const scoped_refptr<SequencedWorkerPool>& pool,
172    SequencedWorkerPool::SequenceToken token,
173    SequencedWorkerPool::WorkerShutdown shutdown_behavior)
174    : pool_(pool),
175      token_(token),
176      shutdown_behavior_(shutdown_behavior) {
177}
178
179SequencedWorkerPoolSequencedTaskRunner::
180~SequencedWorkerPoolSequencedTaskRunner() {
181}
182
183bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask(
184    const tracked_objects::Location& from_here,
185    const Closure& task,
186    TimeDelta delay) {
187  if (delay == TimeDelta()) {
188    return pool_->PostSequencedWorkerTaskWithShutdownBehavior(
189        token_, from_here, task, shutdown_behavior_);
190  }
191  return pool_->PostDelayedSequencedWorkerTask(token_, from_here, task, delay);
192}
193
194bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const {
195  return pool_->IsRunningSequenceOnCurrentThread(token_);
196}
197
198bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask(
199    const tracked_objects::Location& from_here,
200    const Closure& task,
201    TimeDelta delay) {
202  // There's no way to run nested tasks, so simply forward to
203  // PostDelayedTask.
204  return PostDelayedTask(from_here, task, delay);
205}
206
207// Create a process-wide unique ID to represent this task in trace events. This
208// will be mangled with a Process ID hash to reduce the likelyhood of colliding
209// with MessageLoop pointers on other processes.
210uint64 GetTaskTraceID(const SequencedTask& task,
211                      void* pool) {
212  return (static_cast<uint64>(task.trace_id) << 32) |
213         static_cast<uint64>(reinterpret_cast<intptr_t>(pool));
214}
215
216}  // namespace
217
218// Worker ---------------------------------------------------------------------
219
220class SequencedWorkerPool::Worker : public SimpleThread {
221 public:
222  // Hold a (cyclic) ref to |worker_pool|, since we want to keep it
223  // around as long as we are running.
224  Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool,
225         int thread_number,
226         const std::string& thread_name_prefix);
227  virtual ~Worker();
228
229  // SimpleThread implementation. This actually runs the background thread.
230  virtual void Run() OVERRIDE;
231
232  void set_running_task_info(SequenceToken token,
233                             WorkerShutdown shutdown_behavior) {
234    running_sequence_ = token;
235    running_shutdown_behavior_ = shutdown_behavior;
236  }
237
238  SequenceToken running_sequence() const {
239    return running_sequence_;
240  }
241
242  WorkerShutdown running_shutdown_behavior() const {
243    return running_shutdown_behavior_;
244  }
245
246 private:
247  scoped_refptr<SequencedWorkerPool> worker_pool_;
248  SequenceToken running_sequence_;
249  WorkerShutdown running_shutdown_behavior_;
250
251  DISALLOW_COPY_AND_ASSIGN(Worker);
252};
253
254// Inner ----------------------------------------------------------------------
255
256class SequencedWorkerPool::Inner {
257 public:
258  // Take a raw pointer to |worker| to avoid cycles (since we're owned
259  // by it).
260  Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
261        const std::string& thread_name_prefix,
262        TestingObserver* observer);
263
264  ~Inner();
265
266  SequenceToken GetSequenceToken();
267
268  SequenceToken GetNamedSequenceToken(const std::string& name);
269
270  // This function accepts a name and an ID. If the name is null, the
271  // token ID is used. This allows us to implement the optional name lookup
272  // from a single function without having to enter the lock a separate time.
273  bool PostTask(const std::string* optional_token_name,
274                SequenceToken sequence_token,
275                WorkerShutdown shutdown_behavior,
276                const tracked_objects::Location& from_here,
277                const Closure& task,
278                TimeDelta delay);
279
280  bool RunsTasksOnCurrentThread() const;
281
282  bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
283
284  void CleanupForTesting();
285
286  void SignalHasWorkForTesting();
287
288  int GetWorkSignalCountForTesting() const;
289
290  void Shutdown(int max_blocking_tasks_after_shutdown);
291
292  // Runs the worker loop on the background thread.
293  void ThreadLoop(Worker* this_worker);
294
295 private:
296  enum GetWorkStatus {
297    GET_WORK_FOUND,
298    GET_WORK_NOT_FOUND,
299    GET_WORK_WAIT,
300  };
301
302  enum CleanupState {
303    CLEANUP_REQUESTED,
304    CLEANUP_STARTING,
305    CLEANUP_RUNNING,
306    CLEANUP_FINISHING,
307    CLEANUP_DONE,
308  };
309
310  // Called from within the lock, this converts the given token name into a
311  // token ID, creating a new one if necessary.
312  int LockedGetNamedTokenID(const std::string& name);
313
314  // Called from within the lock, this returns the next sequence task number.
315  int64 LockedGetNextSequenceTaskNumber();
316
317  // Called from within the lock, returns the shutdown behavior of the task
318  // running on the currently executing worker thread. If invoked from a thread
319  // that is not one of the workers, returns CONTINUE_ON_SHUTDOWN.
320  WorkerShutdown LockedCurrentThreadShutdownBehavior() const;
321
322  // Gets new task. There are 3 cases depending on the return value:
323  //
324  // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should
325  //    be run immediately.
326  // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run,
327  //    and |task| is not filled in. In this case, the caller should wait until
328  //    a task is posted.
329  // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run
330  //    immediately, and |task| is not filled in. Likewise, |wait_time| is
331  //    filled in the time to wait until the next task to run. In this case, the
332  //    caller should wait the time.
333  //
334  // In any case, the calling code should clear the given
335  // delete_these_outside_lock vector the next time the lock is released.
336  // See the implementation for a more detailed description.
337  GetWorkStatus GetWork(SequencedTask* task,
338                        TimeDelta* wait_time,
339                        std::vector<Closure>* delete_these_outside_lock);
340
341  void HandleCleanup();
342
343  // Peforms init and cleanup around running the given task. WillRun...
344  // returns the value from PrepareToStartAdditionalThreadIfNecessary.
345  // The calling code should call FinishStartingAdditionalThread once the
346  // lock is released if the return values is nonzero.
347  int WillRunWorkerTask(const SequencedTask& task);
348  void DidRunWorkerTask(const SequencedTask& task);
349
350  // Returns true if there are no threads currently running the given
351  // sequence token.
352  bool IsSequenceTokenRunnable(int sequence_token_id) const;
353
354  // Checks if all threads are busy and the addition of one more could run an
355  // additional task waiting in the queue. This must be called from within
356  // the lock.
357  //
358  // If another thread is helpful, this will mark the thread as being in the
359  // process of starting and returns the index of the new thread which will be
360  // 0 or more. The caller should then call FinishStartingAdditionalThread to
361  // complete initialization once the lock is released.
362  //
363  // If another thread is not necessary, returne 0;
364  //
365  // See the implementedion for more.
366  int PrepareToStartAdditionalThreadIfHelpful();
367
368  // The second part of thread creation after
369  // PrepareToStartAdditionalThreadIfHelpful with the thread number it
370  // generated. This actually creates the thread and should be called outside
371  // the lock to avoid blocking important work starting a thread in the lock.
372  void FinishStartingAdditionalThread(int thread_number);
373
374  // Signal |has_work_| and increment |has_work_signal_count_|.
375  void SignalHasWork();
376
377  // Checks whether there is work left that's blocking shutdown. Must be
378  // called inside the lock.
379  bool CanShutdown() const;
380
381  SequencedWorkerPool* const worker_pool_;
382
383  // The last sequence number used. Managed by GetSequenceToken, since this
384  // only does threadsafe increment operations, you do not need to hold the
385  // lock.
386  volatile subtle::Atomic32 last_sequence_number_;
387
388  // This lock protects |everything in this class|. Do not read or modify
389  // anything without holding this lock. Do not block while holding this
390  // lock.
391  mutable Lock lock_;
392
393  // Condition variable that is waited on by worker threads until new
394  // tasks are posted or shutdown starts.
395  ConditionVariable has_work_cv_;
396
397  // Condition variable that is waited on by non-worker threads (in
398  // Shutdown()) until CanShutdown() goes to true.
399  ConditionVariable can_shutdown_cv_;
400
401  // The maximum number of worker threads we'll create.
402  const size_t max_threads_;
403
404  const std::string thread_name_prefix_;
405
406  // Associates all known sequence token names with their IDs.
407  std::map<std::string, int> named_sequence_tokens_;
408
409  // Owning pointers to all threads we've created so far, indexed by
410  // ID. Since we lazily create threads, this may be less than
411  // max_threads_ and will be initially empty.
412  typedef std::map<PlatformThreadId, linked_ptr<Worker> > ThreadMap;
413  ThreadMap threads_;
414
415  // Set to true when we're in the process of creating another thread.
416  // See PrepareToStartAdditionalThreadIfHelpful for more.
417  bool thread_being_created_;
418
419  // Number of threads currently waiting for work.
420  size_t waiting_thread_count_;
421
422  // Number of threads currently running tasks that have the BLOCK_SHUTDOWN
423  // or SKIP_ON_SHUTDOWN flag set.
424  size_t blocking_shutdown_thread_count_;
425
426  // A set of all pending tasks in time-to-run order. These are tasks that are
427  // either waiting for a thread to run on, waiting for their time to run,
428  // or blocked on a previous task in their sequence. We have to iterate over
429  // the tasks by time-to-run order, so we use the set instead of the
430  // traditional priority_queue.
431  typedef std::set<SequencedTask, SequencedTaskLessThan> PendingTaskSet;
432  PendingTaskSet pending_tasks_;
433
434  // The next sequence number for a new sequenced task.
435  int64 next_sequence_task_number_;
436
437  // Number of tasks in the pending_tasks_ list that are marked as blocking
438  // shutdown.
439  size_t blocking_shutdown_pending_task_count_;
440
441  // Lists all sequence tokens currently executing.
442  std::set<int> current_sequences_;
443
444  // An ID for each posted task to distinguish the task from others in traces.
445  int trace_id_;
446
447  // Set when Shutdown is called and no further tasks should be
448  // allowed, though we may still be running existing tasks.
449  bool shutdown_called_;
450
451  // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown()
452  // has been called.
453  int max_blocking_tasks_after_shutdown_;
454
455  // State used to cleanup for testing, all guarded by lock_.
456  CleanupState cleanup_state_;
457  size_t cleanup_idlers_;
458  ConditionVariable cleanup_cv_;
459
460  TestingObserver* const testing_observer_;
461
462  DISALLOW_COPY_AND_ASSIGN(Inner);
463};
464
465// Worker definitions ---------------------------------------------------------
466
467SequencedWorkerPool::Worker::Worker(
468    const scoped_refptr<SequencedWorkerPool>& worker_pool,
469    int thread_number,
470    const std::string& prefix)
471    : SimpleThread(
472          prefix + StringPrintf("Worker%d", thread_number).c_str()),
473      worker_pool_(worker_pool),
474      running_shutdown_behavior_(CONTINUE_ON_SHUTDOWN) {
475  Start();
476}
477
478SequencedWorkerPool::Worker::~Worker() {
479}
480
481void SequencedWorkerPool::Worker::Run() {
482  // Just jump back to the Inner object to run the thread, since it has all the
483  // tracking information and queues. It might be more natural to implement
484  // using DelegateSimpleThread and have Inner implement the Delegate to avoid
485  // having these worker objects at all, but that method lacks the ability to
486  // send thread-specific information easily to the thread loop.
487  worker_pool_->inner_->ThreadLoop(this);
488  // Release our cyclic reference once we're done.
489  worker_pool_ = NULL;
490}
491
492// Inner definitions ---------------------------------------------------------
493
494SequencedWorkerPool::Inner::Inner(
495    SequencedWorkerPool* worker_pool,
496    size_t max_threads,
497    const std::string& thread_name_prefix,
498    TestingObserver* observer)
499    : worker_pool_(worker_pool),
500      last_sequence_number_(0),
501      lock_(),
502      has_work_cv_(&lock_),
503      can_shutdown_cv_(&lock_),
504      max_threads_(max_threads),
505      thread_name_prefix_(thread_name_prefix),
506      thread_being_created_(false),
507      waiting_thread_count_(0),
508      blocking_shutdown_thread_count_(0),
509      next_sequence_task_number_(0),
510      blocking_shutdown_pending_task_count_(0),
511      trace_id_(0),
512      shutdown_called_(false),
513      max_blocking_tasks_after_shutdown_(0),
514      cleanup_state_(CLEANUP_DONE),
515      cleanup_idlers_(0),
516      cleanup_cv_(&lock_),
517      testing_observer_(observer) {}
518
519SequencedWorkerPool::Inner::~Inner() {
520  // You must call Shutdown() before destroying the pool.
521  DCHECK(shutdown_called_);
522
523  // Need to explicitly join with the threads before they're destroyed or else
524  // they will be running when our object is half torn down.
525  for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it)
526    it->second->Join();
527  threads_.clear();
528
529  if (testing_observer_)
530    testing_observer_->OnDestruct();
531}
532
533SequencedWorkerPool::SequenceToken
534SequencedWorkerPool::Inner::GetSequenceToken() {
535  subtle::Atomic32 result =
536      subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1);
537  return SequenceToken(static_cast<int>(result));
538}
539
540SequencedWorkerPool::SequenceToken
541SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) {
542  AutoLock lock(lock_);
543  return SequenceToken(LockedGetNamedTokenID(name));
544}
545
546bool SequencedWorkerPool::Inner::PostTask(
547    const std::string* optional_token_name,
548    SequenceToken sequence_token,
549    WorkerShutdown shutdown_behavior,
550    const tracked_objects::Location& from_here,
551    const Closure& task,
552    TimeDelta delay) {
553  DCHECK(delay == TimeDelta() || shutdown_behavior == SKIP_ON_SHUTDOWN);
554  SequencedTask sequenced(from_here);
555  sequenced.sequence_token_id = sequence_token.id_;
556  sequenced.shutdown_behavior = shutdown_behavior;
557  sequenced.posted_from = from_here;
558  sequenced.task =
559      shutdown_behavior == BLOCK_SHUTDOWN ?
560      base::MakeCriticalClosure(task) : task;
561  sequenced.time_to_run = TimeTicks::Now() + delay;
562
563  int create_thread_id = 0;
564  {
565    AutoLock lock(lock_);
566    if (shutdown_called_) {
567      if (shutdown_behavior != BLOCK_SHUTDOWN ||
568          LockedCurrentThreadShutdownBehavior() == CONTINUE_ON_SHUTDOWN) {
569        return false;
570      }
571      if (max_blocking_tasks_after_shutdown_ <= 0) {
572        DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed";
573        return false;
574      }
575      max_blocking_tasks_after_shutdown_ -= 1;
576    }
577
578    // The trace_id is used for identifying the task in about:tracing.
579    sequenced.trace_id = trace_id_++;
580
581    TRACE_EVENT_FLOW_BEGIN0("task", "SequencedWorkerPool::PostTask",
582        TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))));
583
584    sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber();
585
586    // Now that we have the lock, apply the named token rules.
587    if (optional_token_name)
588      sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
589
590    pending_tasks_.insert(sequenced);
591    if (shutdown_behavior == BLOCK_SHUTDOWN)
592      blocking_shutdown_pending_task_count_++;
593
594    create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
595  }
596
597  // Actually start the additional thread or signal an existing one now that
598  // we're outside the lock.
599  if (create_thread_id)
600    FinishStartingAdditionalThread(create_thread_id);
601  else
602    SignalHasWork();
603
604  return true;
605}
606
607bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
608  AutoLock lock(lock_);
609  return ContainsKey(threads_, PlatformThread::CurrentId());
610}
611
612bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
613    SequenceToken sequence_token) const {
614  AutoLock lock(lock_);
615  ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
616  if (found == threads_.end())
617    return false;
618  return found->second->running_sequence().Equals(sequence_token);
619}
620
621// See https://code.google.com/p/chromium/issues/detail?id=168415
622void SequencedWorkerPool::Inner::CleanupForTesting() {
623  DCHECK(!RunsTasksOnCurrentThread());
624  base::ThreadRestrictions::ScopedAllowWait allow_wait;
625  AutoLock lock(lock_);
626  CHECK_EQ(CLEANUP_DONE, cleanup_state_);
627  if (shutdown_called_)
628    return;
629  if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size())
630    return;
631  cleanup_state_ = CLEANUP_REQUESTED;
632  cleanup_idlers_ = 0;
633  has_work_cv_.Signal();
634  while (cleanup_state_ != CLEANUP_DONE)
635    cleanup_cv_.Wait();
636}
637
638void SequencedWorkerPool::Inner::SignalHasWorkForTesting() {
639  SignalHasWork();
640}
641
642void SequencedWorkerPool::Inner::Shutdown(
643    int max_new_blocking_tasks_after_shutdown) {
644  DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0);
645  {
646    AutoLock lock(lock_);
647    // Cleanup and Shutdown should not be called concurrently.
648    CHECK_EQ(CLEANUP_DONE, cleanup_state_);
649    if (shutdown_called_)
650      return;
651    shutdown_called_ = true;
652    max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
653
654    // Tickle the threads. This will wake up a waiting one so it will know that
655    // it can exit, which in turn will wake up any other waiting ones.
656    SignalHasWork();
657
658    // There are no pending or running tasks blocking shutdown, we're done.
659    if (CanShutdown())
660      return;
661  }
662
663  // If we're here, then something is blocking shutdown.  So wait for
664  // CanShutdown() to go to true.
665
666  if (testing_observer_)
667    testing_observer_->WillWaitForShutdown();
668
669  TimeTicks shutdown_wait_begin = TimeTicks::Now();
670
671  {
672    base::ThreadRestrictions::ScopedAllowWait allow_wait;
673    AutoLock lock(lock_);
674    while (!CanShutdown())
675      can_shutdown_cv_.Wait();
676  }
677  UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime",
678                      TimeTicks::Now() - shutdown_wait_begin);
679}
680
681void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
682  {
683    AutoLock lock(lock_);
684    DCHECK(thread_being_created_);
685    thread_being_created_ = false;
686    std::pair<ThreadMap::iterator, bool> result =
687        threads_.insert(
688            std::make_pair(this_worker->tid(), make_linked_ptr(this_worker)));
689    DCHECK(result.second);
690
691    while (true) {
692#if defined(OS_MACOSX)
693      base::mac::ScopedNSAutoreleasePool autorelease_pool;
694#endif
695
696      HandleCleanup();
697
698      // See GetWork for what delete_these_outside_lock is doing.
699      SequencedTask task;
700      TimeDelta wait_time;
701      std::vector<Closure> delete_these_outside_lock;
702      GetWorkStatus status =
703          GetWork(&task, &wait_time, &delete_these_outside_lock);
704      if (status == GET_WORK_FOUND) {
705        TRACE_EVENT_FLOW_END0("task", "SequencedWorkerPool::PostTask",
706            TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))));
707        TRACE_EVENT2("task", "SequencedWorkerPool::ThreadLoop",
708                     "src_file", task.posted_from.file_name(),
709                     "src_func", task.posted_from.function_name());
710        int new_thread_id = WillRunWorkerTask(task);
711        {
712          AutoUnlock unlock(lock_);
713          // There may be more work available, so wake up another
714          // worker thread. (Technically not required, since we
715          // already get a signal for each new task, but it doesn't
716          // hurt.)
717          SignalHasWork();
718          delete_these_outside_lock.clear();
719
720          // Complete thread creation outside the lock if necessary.
721          if (new_thread_id)
722            FinishStartingAdditionalThread(new_thread_id);
723
724          this_worker->set_running_task_info(
725              SequenceToken(task.sequence_token_id), task.shutdown_behavior);
726
727          tracked_objects::TrackedTime start_time =
728              tracked_objects::ThreadData::NowForStartOfRun(task.birth_tally);
729
730          task.task.Run();
731
732          tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(task,
733              start_time, tracked_objects::ThreadData::NowForEndOfRun());
734
735          // Make sure our task is erased outside the lock for the
736          // same reason we do this with delete_these_oustide_lock.
737          // Also, do it before calling set_running_task_info() so
738          // that sequence-checking from within the task's destructor
739          // still works.
740          task.task = Closure();
741
742          this_worker->set_running_task_info(
743              SequenceToken(), CONTINUE_ON_SHUTDOWN);
744        }
745        DidRunWorkerTask(task);  // Must be done inside the lock.
746      } else if (cleanup_state_ == CLEANUP_RUNNING) {
747        switch (status) {
748          case GET_WORK_WAIT: {
749              AutoUnlock unlock(lock_);
750              delete_these_outside_lock.clear();
751            }
752            break;
753          case GET_WORK_NOT_FOUND:
754            CHECK(delete_these_outside_lock.empty());
755            cleanup_state_ = CLEANUP_FINISHING;
756            cleanup_cv_.Broadcast();
757            break;
758          default:
759            NOTREACHED();
760        }
761      } else {
762        // When we're terminating and there's no more work, we can
763        // shut down, other workers can complete any pending or new tasks.
764        // We can get additional tasks posted after shutdown_called_ is set
765        // but only worker threads are allowed to post tasks at that time, and
766        // the workers responsible for posting those tasks will be available
767        // to run them. Also, there may be some tasks stuck behind running
768        // ones with the same sequence token, but additional threads won't
769        // help this case.
770        if (shutdown_called_ &&
771            blocking_shutdown_pending_task_count_ == 0)
772          break;
773        waiting_thread_count_++;
774
775        switch (status) {
776          case GET_WORK_NOT_FOUND:
777            has_work_cv_.Wait();
778            break;
779          case GET_WORK_WAIT:
780            has_work_cv_.TimedWait(wait_time);
781            break;
782          default:
783            NOTREACHED();
784        }
785        waiting_thread_count_--;
786      }
787    }
788  }  // Release lock_.
789
790  // We noticed we should exit. Wake up the next worker so it knows it should
791  // exit as well (because the Shutdown() code only signals once).
792  SignalHasWork();
793
794  // Possibly unblock shutdown.
795  can_shutdown_cv_.Signal();
796}
797
798void SequencedWorkerPool::Inner::HandleCleanup() {
799  lock_.AssertAcquired();
800  if (cleanup_state_ == CLEANUP_DONE)
801    return;
802  if (cleanup_state_ == CLEANUP_REQUESTED) {
803    // We win, we get to do the cleanup as soon as the others wise up and idle.
804    cleanup_state_ = CLEANUP_STARTING;
805    while (thread_being_created_ ||
806           cleanup_idlers_ != threads_.size() - 1) {
807      has_work_cv_.Signal();
808      cleanup_cv_.Wait();
809    }
810    cleanup_state_ = CLEANUP_RUNNING;
811    return;
812  }
813  if (cleanup_state_ == CLEANUP_STARTING) {
814    // Another worker thread is cleaning up, we idle here until thats done.
815    ++cleanup_idlers_;
816    cleanup_cv_.Broadcast();
817    while (cleanup_state_ != CLEANUP_FINISHING) {
818      cleanup_cv_.Wait();
819    }
820    --cleanup_idlers_;
821    cleanup_cv_.Broadcast();
822    return;
823  }
824  if (cleanup_state_ == CLEANUP_FINISHING) {
825    // We wait for all idlers to wake up prior to being DONE.
826    while (cleanup_idlers_ != 0) {
827      cleanup_cv_.Broadcast();
828      cleanup_cv_.Wait();
829    }
830    if (cleanup_state_ == CLEANUP_FINISHING) {
831      cleanup_state_ = CLEANUP_DONE;
832      cleanup_cv_.Signal();
833    }
834    return;
835  }
836}
837
838int SequencedWorkerPool::Inner::LockedGetNamedTokenID(
839    const std::string& name) {
840  lock_.AssertAcquired();
841  DCHECK(!name.empty());
842
843  std::map<std::string, int>::const_iterator found =
844      named_sequence_tokens_.find(name);
845  if (found != named_sequence_tokens_.end())
846    return found->second;  // Got an existing one.
847
848  // Create a new one for this name.
849  SequenceToken result = GetSequenceToken();
850  named_sequence_tokens_.insert(std::make_pair(name, result.id_));
851  return result.id_;
852}
853
854int64 SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() {
855  lock_.AssertAcquired();
856  // We assume that we never create enough tasks to wrap around.
857  return next_sequence_task_number_++;
858}
859
860SequencedWorkerPool::WorkerShutdown
861SequencedWorkerPool::Inner::LockedCurrentThreadShutdownBehavior() const {
862  lock_.AssertAcquired();
863  ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
864  if (found == threads_.end())
865    return CONTINUE_ON_SHUTDOWN;
866  return found->second->running_shutdown_behavior();
867}
868
869SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
870    SequencedTask* task,
871    TimeDelta* wait_time,
872    std::vector<Closure>* delete_these_outside_lock) {
873  lock_.AssertAcquired();
874
875  UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount",
876                           static_cast<int>(pending_tasks_.size()));
877
878  // Find the next task with a sequence token that's not currently in use.
879  // If the token is in use, that means another thread is running something
880  // in that sequence, and we can't run it without going out-of-order.
881  //
882  // This algorithm is simple and fair, but inefficient in some cases. For
883  // example, say somebody schedules 1000 slow tasks with the same sequence
884  // number. We'll have to go through all those tasks each time we feel like
885  // there might be work to schedule. If this proves to be a problem, we
886  // should make this more efficient.
887  //
888  // One possible enhancement would be to keep a map from sequence ID to a
889  // list of pending but currently blocked SequencedTasks for that ID.
890  // When a worker finishes a task of one sequence token, it can pick up the
891  // next one from that token right away.
892  //
893  // This may lead to starvation if there are sufficient numbers of sequences
894  // in use. To alleviate this, we could add an incrementing priority counter
895  // to each SequencedTask. Then maintain a priority_queue of all runnable
896  // tasks, sorted by priority counter. When a sequenced task is completed
897  // we would pop the head element off of that tasks pending list and add it
898  // to the priority queue. Then we would run the first item in the priority
899  // queue.
900
901  GetWorkStatus status = GET_WORK_NOT_FOUND;
902  int unrunnable_tasks = 0;
903  PendingTaskSet::iterator i = pending_tasks_.begin();
904  // We assume that the loop below doesn't take too long and so we can just do
905  // a single call to TimeTicks::Now().
906  const TimeTicks current_time = TimeTicks::Now();
907  while (i != pending_tasks_.end()) {
908    if (!IsSequenceTokenRunnable(i->sequence_token_id)) {
909      unrunnable_tasks++;
910      ++i;
911      continue;
912    }
913
914    if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) {
915      // We're shutting down and the task we just found isn't blocking
916      // shutdown. Delete it and get more work.
917      //
918      // Note that we do not want to delete unrunnable tasks. Deleting a task
919      // can have side effects (like freeing some objects) and deleting a
920      // task that's supposed to run after one that's currently running could
921      // cause an obscure crash.
922      //
923      // We really want to delete these tasks outside the lock in case the
924      // closures are holding refs to objects that want to post work from
925      // their destructorss (which would deadlock). The closures are
926      // internally refcounted, so we just need to keep a copy of them alive
927      // until the lock is exited. The calling code can just clear() the
928      // vector they passed to us once the lock is exited to make this
929      // happen.
930      delete_these_outside_lock->push_back(i->task);
931      pending_tasks_.erase(i++);
932      continue;
933    }
934
935    if (i->time_to_run > current_time) {
936      // The time to run has not come yet.
937      *wait_time = i->time_to_run - current_time;
938      status = GET_WORK_WAIT;
939      if (cleanup_state_ == CLEANUP_RUNNING) {
940        // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop.
941        delete_these_outside_lock->push_back(i->task);
942        pending_tasks_.erase(i);
943      }
944      break;
945    }
946
947    // Found a runnable task.
948    *task = *i;
949    pending_tasks_.erase(i);
950    if (task->shutdown_behavior == BLOCK_SHUTDOWN) {
951      blocking_shutdown_pending_task_count_--;
952    }
953
954    status = GET_WORK_FOUND;
955    break;
956  }
957
958  // Track the number of tasks we had to skip over to see if we should be
959  // making this more efficient. If this number ever becomes large or is
960  // frequently "some", we should consider the optimization above.
961  UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount",
962                           unrunnable_tasks);
963  return status;
964}
965
966int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
967  lock_.AssertAcquired();
968
969  // Mark the task's sequence number as in use.
970  if (task.sequence_token_id)
971    current_sequences_.insert(task.sequence_token_id);
972
973  // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN
974  // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread
975  // completes.
976  if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN)
977    blocking_shutdown_thread_count_++;
978
979  // We just picked up a task. Since StartAdditionalThreadIfHelpful only
980  // creates a new thread if there is no free one, there is a race when posting
981  // tasks that many tasks could have been posted before a thread started
982  // running them, so only one thread would have been created. So we also check
983  // whether we should create more threads after removing our task from the
984  // queue, which also has the nice side effect of creating the workers from
985  // background threads rather than the main thread of the app.
986  //
987  // If another thread wasn't created, we want to wake up an existing thread
988  // if there is one waiting to pick up the next task.
989  //
990  // Note that we really need to do this *before* running the task, not
991  // after. Otherwise, if more than one task is posted, the creation of the
992  // second thread (since we only create one at a time) will be blocked by
993  // the execution of the first task, which could be arbitrarily long.
994  return PrepareToStartAdditionalThreadIfHelpful();
995}
996
997void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
998  lock_.AssertAcquired();
999
1000  if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) {
1001    DCHECK_GT(blocking_shutdown_thread_count_, 0u);
1002    blocking_shutdown_thread_count_--;
1003  }
1004
1005  if (task.sequence_token_id)
1006    current_sequences_.erase(task.sequence_token_id);
1007}
1008
1009bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
1010    int sequence_token_id) const {
1011  lock_.AssertAcquired();
1012  return !sequence_token_id ||
1013      current_sequences_.find(sequence_token_id) ==
1014          current_sequences_.end();
1015}
1016
1017int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
1018  lock_.AssertAcquired();
1019  // How thread creation works:
1020  //
1021  // We'de like to avoid creating threads with the lock held. However, we
1022  // need to be sure that we have an accurate accounting of the threads for
1023  // proper Joining and deltion on shutdown.
1024  //
1025  // We need to figure out if we need another thread with the lock held, which
1026  // is what this function does. It then marks us as in the process of creating
1027  // a thread. When we do shutdown, we wait until the thread_being_created_
1028  // flag is cleared, which ensures that the new thread is properly added to
1029  // all the data structures and we can't leak it. Once shutdown starts, we'll
1030  // refuse to create more threads or they would be leaked.
1031  //
1032  // Note that this creates a mostly benign race condition on shutdown that
1033  // will cause fewer workers to be created than one would expect. It isn't
1034  // much of an issue in real life, but affects some tests. Since we only spawn
1035  // one worker at a time, the following sequence of events can happen:
1036  //
1037  //  1. Main thread posts a bunch of unrelated tasks that would normally be
1038  //     run on separate threads.
1039  //  2. The first task post causes us to start a worker. Other tasks do not
1040  //     cause a worker to start since one is pending.
1041  //  3. Main thread initiates shutdown.
1042  //  4. No more threads are created since the shutdown_called_ flag is set.
1043  //
1044  // The result is that one may expect that max_threads_ workers to be created
1045  // given the workload, but in reality fewer may be created because the
1046  // sequence of thread creation on the background threads is racing with the
1047  // shutdown call.
1048  if (!shutdown_called_ &&
1049      !thread_being_created_ &&
1050      cleanup_state_ == CLEANUP_DONE &&
1051      threads_.size() < max_threads_ &&
1052      waiting_thread_count_ == 0) {
1053    // We could use an additional thread if there's work to be done.
1054    for (PendingTaskSet::const_iterator i = pending_tasks_.begin();
1055         i != pending_tasks_.end(); ++i) {
1056      if (IsSequenceTokenRunnable(i->sequence_token_id)) {
1057        // Found a runnable task, mark the thread as being started.
1058        thread_being_created_ = true;
1059        return static_cast<int>(threads_.size() + 1);
1060      }
1061    }
1062  }
1063  return 0;
1064}
1065
1066void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
1067    int thread_number) {
1068  // Called outside of the lock.
1069  DCHECK(thread_number > 0);
1070
1071  // The worker is assigned to the list when the thread actually starts, which
1072  // will manage the memory of the pointer.
1073  new Worker(worker_pool_, thread_number, thread_name_prefix_);
1074}
1075
1076void SequencedWorkerPool::Inner::SignalHasWork() {
1077  has_work_cv_.Signal();
1078  if (testing_observer_) {
1079    testing_observer_->OnHasWork();
1080  }
1081}
1082
1083bool SequencedWorkerPool::Inner::CanShutdown() const {
1084  lock_.AssertAcquired();
1085  // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
1086  return !thread_being_created_ &&
1087         blocking_shutdown_thread_count_ == 0 &&
1088         blocking_shutdown_pending_task_count_ == 0;
1089}
1090
1091// SequencedWorkerPool --------------------------------------------------------
1092
1093SequencedWorkerPool::SequencedWorkerPool(
1094    size_t max_threads,
1095    const std::string& thread_name_prefix)
1096    : constructor_message_loop_(MessageLoopProxy::current()),
1097      inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) {
1098}
1099
1100SequencedWorkerPool::SequencedWorkerPool(
1101    size_t max_threads,
1102    const std::string& thread_name_prefix,
1103    TestingObserver* observer)
1104    : constructor_message_loop_(MessageLoopProxy::current()),
1105      inner_(new Inner(this, max_threads, thread_name_prefix, observer)) {
1106}
1107
1108SequencedWorkerPool::~SequencedWorkerPool() {}
1109
1110void SequencedWorkerPool::OnDestruct() const {
1111  DCHECK(constructor_message_loop_.get());
1112  // Avoid deleting ourselves on a worker thread (which would
1113  // deadlock).
1114  if (RunsTasksOnCurrentThread()) {
1115    constructor_message_loop_->DeleteSoon(FROM_HERE, this);
1116  } else {
1117    delete this;
1118  }
1119}
1120
1121SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
1122  return inner_->GetSequenceToken();
1123}
1124
1125SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
1126    const std::string& name) {
1127  return inner_->GetNamedSequenceToken(name);
1128}
1129
1130scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner(
1131    SequenceToken token) {
1132  return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN);
1133}
1134
1135scoped_refptr<SequencedTaskRunner>
1136SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior(
1137    SequenceToken token, WorkerShutdown shutdown_behavior) {
1138  return new SequencedWorkerPoolSequencedTaskRunner(
1139      this, token, shutdown_behavior);
1140}
1141
1142scoped_refptr<TaskRunner>
1143SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior(
1144    WorkerShutdown shutdown_behavior) {
1145  return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior);
1146}
1147
1148bool SequencedWorkerPool::PostWorkerTask(
1149    const tracked_objects::Location& from_here,
1150    const Closure& task) {
1151  return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN,
1152                          from_here, task, TimeDelta());
1153}
1154
1155bool SequencedWorkerPool::PostDelayedWorkerTask(
1156    const tracked_objects::Location& from_here,
1157    const Closure& task,
1158    TimeDelta delay) {
1159  WorkerShutdown shutdown_behavior =
1160      delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
1161  return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
1162                          from_here, task, delay);
1163}
1164
1165bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
1166    const tracked_objects::Location& from_here,
1167    const Closure& task,
1168    WorkerShutdown shutdown_behavior) {
1169  return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
1170                          from_here, task, TimeDelta());
1171}
1172
1173bool SequencedWorkerPool::PostSequencedWorkerTask(
1174    SequenceToken sequence_token,
1175    const tracked_objects::Location& from_here,
1176    const Closure& task) {
1177  return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN,
1178                          from_here, task, TimeDelta());
1179}
1180
1181bool SequencedWorkerPool::PostDelayedSequencedWorkerTask(
1182    SequenceToken sequence_token,
1183    const tracked_objects::Location& from_here,
1184    const Closure& task,
1185    TimeDelta delay) {
1186  WorkerShutdown shutdown_behavior =
1187      delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
1188  return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
1189                          from_here, task, delay);
1190}
1191
1192bool SequencedWorkerPool::PostNamedSequencedWorkerTask(
1193    const std::string& token_name,
1194    const tracked_objects::Location& from_here,
1195    const Closure& task) {
1196  DCHECK(!token_name.empty());
1197  return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN,
1198                          from_here, task, TimeDelta());
1199}
1200
1201bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior(
1202    SequenceToken sequence_token,
1203    const tracked_objects::Location& from_here,
1204    const Closure& task,
1205    WorkerShutdown shutdown_behavior) {
1206  return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
1207                          from_here, task, TimeDelta());
1208}
1209
1210bool SequencedWorkerPool::PostDelayedTask(
1211    const tracked_objects::Location& from_here,
1212    const Closure& task,
1213    TimeDelta delay) {
1214  return PostDelayedWorkerTask(from_here, task, delay);
1215}
1216
1217bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
1218  return inner_->RunsTasksOnCurrentThread();
1219}
1220
1221bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
1222    SequenceToken sequence_token) const {
1223  return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
1224}
1225
1226void SequencedWorkerPool::FlushForTesting() {
1227  inner_->CleanupForTesting();
1228}
1229
1230void SequencedWorkerPool::SignalHasWorkForTesting() {
1231  inner_->SignalHasWorkForTesting();
1232}
1233
1234void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1235  DCHECK(constructor_message_loop_->BelongsToCurrentThread());
1236  inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1237}
1238
1239}  // namespace base
1240