1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "base/threading/worker_pool_posix.h"
6
7#include <stddef.h>
8
9#include "base/bind.h"
10#include "base/callback.h"
11#include "base/lazy_instance.h"
12#include "base/logging.h"
13#include "base/macros.h"
14#include "base/memory/ref_counted.h"
15#include "base/strings/stringprintf.h"
16#include "base/threading/platform_thread.h"
17#include "base/threading/thread_local.h"
18#include "base/threading/worker_pool.h"
19#include "base/trace_event/trace_event.h"
20#include "base/tracked_objects.h"
21
22using tracked_objects::TrackedTime;
23
24namespace base {
25
26namespace {
27
28base::LazyInstance<ThreadLocalBoolean>::Leaky
29    g_worker_pool_running_on_this_thread = LAZY_INSTANCE_INITIALIZER;
30
31const int kIdleSecondsBeforeExit = 10 * 60;
32
33class WorkerPoolImpl {
34 public:
35  WorkerPoolImpl();
36  ~WorkerPoolImpl();
37
38  void PostTask(const tracked_objects::Location& from_here,
39                const base::Closure& task,
40                bool task_is_slow);
41
42 private:
43  scoped_refptr<base::PosixDynamicThreadPool> pool_;
44};
45
46WorkerPoolImpl::WorkerPoolImpl()
47    : pool_(new base::PosixDynamicThreadPool("WorkerPool",
48                                             kIdleSecondsBeforeExit)) {}
49
50WorkerPoolImpl::~WorkerPoolImpl() {
51  pool_->Terminate();
52}
53
54void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here,
55                              const base::Closure& task,
56                              bool /* task_is_slow */) {
57  pool_->PostTask(from_here, task);
58}
59
60base::LazyInstance<WorkerPoolImpl> g_lazy_worker_pool =
61    LAZY_INSTANCE_INITIALIZER;
62
63class WorkerThread : public PlatformThread::Delegate {
64 public:
65  WorkerThread(const std::string& name_prefix,
66               base::PosixDynamicThreadPool* pool)
67      : name_prefix_(name_prefix), pool_(pool) {}
68
69  void ThreadMain() override;
70
71 private:
72  const std::string name_prefix_;
73  scoped_refptr<base::PosixDynamicThreadPool> pool_;
74
75  DISALLOW_COPY_AND_ASSIGN(WorkerThread);
76};
77
78void WorkerThread::ThreadMain() {
79  g_worker_pool_running_on_this_thread.Get().Set(true);
80  const std::string name = base::StringPrintf("%s/%d", name_prefix_.c_str(),
81                                              PlatformThread::CurrentId());
82  // Note |name.c_str()| must remain valid for for the whole life of the thread.
83  PlatformThread::SetName(name);
84
85  for (;;) {
86    PendingTask pending_task = pool_->WaitForTask();
87    if (pending_task.task.is_null())
88      break;
89    TRACE_EVENT2("toplevel", "WorkerThread::ThreadMain::Run",
90        "src_file", pending_task.posted_from.file_name(),
91        "src_func", pending_task.posted_from.function_name());
92
93    tracked_objects::TaskStopwatch stopwatch;
94    stopwatch.Start();
95    pending_task.task.Run();
96    stopwatch.Stop();
97
98    tracked_objects::ThreadData::TallyRunOnWorkerThreadIfTracking(
99        pending_task.birth_tally, pending_task.time_posted, stopwatch);
100  }
101
102  // The WorkerThread is non-joinable, so it deletes itself.
103  delete this;
104}
105
106}  // namespace
107
108// static
109bool WorkerPool::PostTask(const tracked_objects::Location& from_here,
110                          const base::Closure& task,
111                          bool task_is_slow) {
112  g_lazy_worker_pool.Pointer()->PostTask(from_here, task, task_is_slow);
113  return true;
114}
115
116// static
117bool WorkerPool::RunsTasksOnCurrentThread() {
118  return g_worker_pool_running_on_this_thread.Get().Get();
119}
120
121PosixDynamicThreadPool::PosixDynamicThreadPool(const std::string& name_prefix,
122                                               int idle_seconds_before_exit)
123    : name_prefix_(name_prefix),
124      idle_seconds_before_exit_(idle_seconds_before_exit),
125      pending_tasks_available_cv_(&lock_),
126      num_idle_threads_(0),
127      terminated_(false) {}
128
129PosixDynamicThreadPool::~PosixDynamicThreadPool() {
130  while (!pending_tasks_.empty())
131    pending_tasks_.pop();
132}
133
134void PosixDynamicThreadPool::Terminate() {
135  {
136    AutoLock locked(lock_);
137    DCHECK(!terminated_) << "Thread pool is already terminated.";
138    terminated_ = true;
139  }
140  pending_tasks_available_cv_.Broadcast();
141}
142
143void PosixDynamicThreadPool::PostTask(
144    const tracked_objects::Location& from_here,
145    const base::Closure& task) {
146  PendingTask pending_task(from_here, task);
147  AddTask(&pending_task);
148}
149
150void PosixDynamicThreadPool::AddTask(PendingTask* pending_task) {
151  AutoLock locked(lock_);
152  DCHECK(!terminated_)
153      << "This thread pool is already terminated.  Do not post new tasks.";
154
155  pending_tasks_.push(*pending_task);
156  pending_task->task.Reset();
157
158  // We have enough worker threads.
159  if (static_cast<size_t>(num_idle_threads_) >= pending_tasks_.size()) {
160    pending_tasks_available_cv_.Signal();
161  } else {
162    // The new PlatformThread will take ownership of the WorkerThread object,
163    // which will delete itself on exit.
164    WorkerThread* worker = new WorkerThread(name_prefix_, this);
165    PlatformThread::CreateNonJoinable(0, worker);
166  }
167}
168
169PendingTask PosixDynamicThreadPool::WaitForTask() {
170  AutoLock locked(lock_);
171
172  if (terminated_)
173    return PendingTask(FROM_HERE, base::Closure());
174
175  if (pending_tasks_.empty()) {  // No work available, wait for work.
176    num_idle_threads_++;
177    if (num_idle_threads_cv_.get())
178      num_idle_threads_cv_->Signal();
179    pending_tasks_available_cv_.TimedWait(
180        TimeDelta::FromSeconds(idle_seconds_before_exit_));
181    num_idle_threads_--;
182    if (num_idle_threads_cv_.get())
183      num_idle_threads_cv_->Signal();
184    if (pending_tasks_.empty()) {
185      // We waited for work, but there's still no work.  Return NULL to signal
186      // the thread to terminate.
187      return PendingTask(FROM_HERE, base::Closure());
188    }
189  }
190
191  PendingTask pending_task = pending_tasks_.front();
192  pending_tasks_.pop();
193  return pending_task;
194}
195
196}  // namespace base
197