1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "base/threading/worker_pool_posix.h"
6
7#include <stddef.h>
8
9#include "base/bind.h"
10#include "base/callback.h"
11#include "base/lazy_instance.h"
12#include "base/logging.h"
13#include "base/macros.h"
14#include "base/memory/ref_counted.h"
15#include "base/strings/stringprintf.h"
16#include "base/threading/platform_thread.h"
17#include "base/threading/thread_local.h"
18#include "base/threading/worker_pool.h"
19#include "base/trace_event/trace_event.h"
20#include "base/tracked_objects.h"
21
22using tracked_objects::TrackedTime;
23
24namespace base {
25
26namespace {
27
28base::LazyInstance<ThreadLocalBoolean>::Leaky
29    g_worker_pool_running_on_this_thread = LAZY_INSTANCE_INITIALIZER;
30
31const int kIdleSecondsBeforeExit = 10 * 60;
32
33class WorkerPoolImpl {
34 public:
35  WorkerPoolImpl();
36  ~WorkerPoolImpl();
37
38  void PostTask(const tracked_objects::Location& from_here,
39                const base::Closure& task,
40                bool task_is_slow);
41
42 private:
43  scoped_refptr<base::PosixDynamicThreadPool> pool_;
44};
45
46WorkerPoolImpl::WorkerPoolImpl()
47    : pool_(new base::PosixDynamicThreadPool("WorkerPool",
48                                             kIdleSecondsBeforeExit)) {}
49
50WorkerPoolImpl::~WorkerPoolImpl() {
51  pool_->Terminate();
52}
53
54void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here,
55                              const base::Closure& task,
56                              bool /*task_is_slow*/) {
57  pool_->PostTask(from_here, task);
58}
59
60base::LazyInstance<WorkerPoolImpl> g_lazy_worker_pool =
61    LAZY_INSTANCE_INITIALIZER;
62
63class WorkerThread : public PlatformThread::Delegate {
64 public:
65  WorkerThread(const std::string& name_prefix,
66               base::PosixDynamicThreadPool* pool)
67      : name_prefix_(name_prefix), pool_(pool) {}
68
69  void ThreadMain() override;
70
71 private:
72  const std::string name_prefix_;
73  scoped_refptr<base::PosixDynamicThreadPool> pool_;
74
75  DISALLOW_COPY_AND_ASSIGN(WorkerThread);
76};
77
78void WorkerThread::ThreadMain() {
79  g_worker_pool_running_on_this_thread.Get().Set(true);
80  const std::string name = base::StringPrintf("%s/%d", name_prefix_.c_str(),
81                                              PlatformThread::CurrentId());
82  // Note |name.c_str()| must remain valid for for the whole life of the thread.
83  PlatformThread::SetName(name);
84
85  for (;;) {
86    PendingTask pending_task = pool_->WaitForTask();
87    if (pending_task.task.is_null())
88      break;
89    TRACE_TASK_EXECUTION("WorkerThread::ThreadMain::Run", pending_task);
90
91    tracked_objects::TaskStopwatch stopwatch;
92    stopwatch.Start();
93    pending_task.task.Run();
94    stopwatch.Stop();
95
96    tracked_objects::ThreadData::TallyRunOnWorkerThreadIfTracking(
97        pending_task.birth_tally, pending_task.time_posted, stopwatch);
98  }
99
100  // The WorkerThread is non-joinable, so it deletes itself.
101  delete this;
102}
103
104}  // namespace
105
106// static
107bool WorkerPool::PostTask(const tracked_objects::Location& from_here,
108                          const base::Closure& task,
109                          bool task_is_slow) {
110  g_lazy_worker_pool.Pointer()->PostTask(from_here, task, task_is_slow);
111  return true;
112}
113
114// static
115bool WorkerPool::RunsTasksOnCurrentThread() {
116  return g_worker_pool_running_on_this_thread.Get().Get();
117}
118
119PosixDynamicThreadPool::PosixDynamicThreadPool(const std::string& name_prefix,
120                                               int idle_seconds_before_exit)
121    : name_prefix_(name_prefix),
122      idle_seconds_before_exit_(idle_seconds_before_exit),
123      pending_tasks_available_cv_(&lock_),
124      num_idle_threads_(0),
125      terminated_(false) {}
126
127PosixDynamicThreadPool::~PosixDynamicThreadPool() {
128  while (!pending_tasks_.empty())
129    pending_tasks_.pop();
130}
131
132void PosixDynamicThreadPool::Terminate() {
133  {
134    AutoLock locked(lock_);
135    DCHECK(!terminated_) << "Thread pool is already terminated.";
136    terminated_ = true;
137  }
138  pending_tasks_available_cv_.Broadcast();
139}
140
141void PosixDynamicThreadPool::PostTask(
142    const tracked_objects::Location& from_here,
143    const base::Closure& task) {
144  PendingTask pending_task(from_here, task);
145  AddTask(&pending_task);
146}
147
148void PosixDynamicThreadPool::AddTask(PendingTask* pending_task) {
149  AutoLock locked(lock_);
150  DCHECK(!terminated_)
151      << "This thread pool is already terminated.  Do not post new tasks.";
152
153  pending_tasks_.push(std::move(*pending_task));
154
155  // We have enough worker threads.
156  if (static_cast<size_t>(num_idle_threads_) >= pending_tasks_.size()) {
157    pending_tasks_available_cv_.Signal();
158  } else {
159    // The new PlatformThread will take ownership of the WorkerThread object,
160    // which will delete itself on exit.
161    WorkerThread* worker = new WorkerThread(name_prefix_, this);
162    PlatformThread::CreateNonJoinable(0, worker);
163  }
164}
165
166PendingTask PosixDynamicThreadPool::WaitForTask() {
167  AutoLock locked(lock_);
168
169  if (terminated_)
170    return PendingTask(FROM_HERE, base::Closure());
171
172  if (pending_tasks_.empty()) {  // No work available, wait for work.
173    num_idle_threads_++;
174    if (num_idle_threads_cv_.get())
175      num_idle_threads_cv_->Signal();
176    pending_tasks_available_cv_.TimedWait(
177        TimeDelta::FromSeconds(idle_seconds_before_exit_));
178    num_idle_threads_--;
179    if (num_idle_threads_cv_.get())
180      num_idle_threads_cv_->Signal();
181    if (pending_tasks_.empty()) {
182      // We waited for work, but there's still no work.  Return NULL to signal
183      // the thread to terminate.
184      return PendingTask(FROM_HERE, base::Closure());
185    }
186  }
187
188  PendingTask pending_task = std::move(pending_tasks_.front());
189  pending_tasks_.pop();
190  return pending_task;
191}
192
193}  // namespace base
194