worker_pool_posix.cc revision b636ff6a8ac3b54b3067289f01848252ab71eceb
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "base/threading/worker_pool_posix.h"
6
7#include "base/bind.h"
8#include "base/callback.h"
9#include "base/lazy_instance.h"
10#include "base/logging.h"
11#include "base/memory/ref_counted.h"
12#include "base/strings/stringprintf.h"
13#include "base/threading/platform_thread.h"
14#include "base/threading/thread_local.h"
15#include "base/threading/worker_pool.h"
16#include "base/trace_event/trace_event.h"
17#include "base/tracked_objects.h"
18
19using tracked_objects::TrackedTime;
20
21namespace base {
22
23namespace {
24
25base::LazyInstance<ThreadLocalBoolean>::Leaky
26    g_worker_pool_running_on_this_thread = LAZY_INSTANCE_INITIALIZER;
27
28const int kIdleSecondsBeforeExit = 10 * 60;
29
30class WorkerPoolImpl {
31 public:
32  WorkerPoolImpl();
33  ~WorkerPoolImpl();
34
35  void PostTask(const tracked_objects::Location& from_here,
36                const base::Closure& task, bool task_is_slow);
37
38 private:
39  scoped_refptr<base::PosixDynamicThreadPool> pool_;
40};
41
42WorkerPoolImpl::WorkerPoolImpl()
43    : pool_(new base::PosixDynamicThreadPool("WorkerPool",
44                                             kIdleSecondsBeforeExit)) {
45}
46
47WorkerPoolImpl::~WorkerPoolImpl() {
48  pool_->Terminate();
49}
50
51void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here,
52                              const base::Closure& task,
53                              bool /* task_is_slow */) {
54  pool_->PostTask(from_here, task);
55}
56
57base::LazyInstance<WorkerPoolImpl> g_lazy_worker_pool =
58    LAZY_INSTANCE_INITIALIZER;
59
60class WorkerThread : public PlatformThread::Delegate {
61 public:
62  WorkerThread(const std::string& name_prefix,
63               base::PosixDynamicThreadPool* pool)
64      : name_prefix_(name_prefix),
65        pool_(pool) {}
66
67  void ThreadMain() override;
68
69 private:
70  const std::string name_prefix_;
71  scoped_refptr<base::PosixDynamicThreadPool> pool_;
72
73  DISALLOW_COPY_AND_ASSIGN(WorkerThread);
74};
75
76void WorkerThread::ThreadMain() {
77  g_worker_pool_running_on_this_thread.Get().Set(true);
78  const std::string name = base::StringPrintf(
79      "%s/%d", name_prefix_.c_str(), PlatformThread::CurrentId());
80  // Note |name.c_str()| must remain valid for for the whole life of the thread.
81  PlatformThread::SetName(name);
82
83  for (;;) {
84    PendingTask pending_task = pool_->WaitForTask();
85    if (pending_task.task.is_null())
86      break;
87    TRACE_EVENT2("toplevel", "WorkerThread::ThreadMain::Run",
88        "src_file", pending_task.posted_from.file_name(),
89        "src_func", pending_task.posted_from.function_name());
90
91    tracked_objects::TaskStopwatch stopwatch;
92    stopwatch.Start();
93    pending_task.task.Run();
94    stopwatch.Stop();
95
96    tracked_objects::ThreadData::TallyRunOnWorkerThreadIfTracking(
97        pending_task.birth_tally, pending_task.time_posted, stopwatch);
98  }
99
100  // The WorkerThread is non-joinable, so it deletes itself.
101  delete this;
102}
103
104}  // namespace
105
106// static
107bool WorkerPool::PostTask(const tracked_objects::Location& from_here,
108                          const base::Closure& task, bool task_is_slow) {
109  g_lazy_worker_pool.Pointer()->PostTask(from_here, task, task_is_slow);
110  return true;
111}
112
113// static
114bool WorkerPool::RunsTasksOnCurrentThread() {
115  return g_worker_pool_running_on_this_thread.Get().Get();
116}
117
118PosixDynamicThreadPool::PosixDynamicThreadPool(const std::string& name_prefix,
119                                               int idle_seconds_before_exit)
120    : name_prefix_(name_prefix),
121      idle_seconds_before_exit_(idle_seconds_before_exit),
122      pending_tasks_available_cv_(&lock_),
123      num_idle_threads_(0),
124      terminated_(false) {}
125
126PosixDynamicThreadPool::~PosixDynamicThreadPool() {
127  while (!pending_tasks_.empty())
128    pending_tasks_.pop();
129}
130
131void PosixDynamicThreadPool::Terminate() {
132  {
133    AutoLock locked(lock_);
134    DCHECK(!terminated_) << "Thread pool is already terminated.";
135    terminated_ = true;
136  }
137  pending_tasks_available_cv_.Broadcast();
138}
139
140void PosixDynamicThreadPool::PostTask(
141    const tracked_objects::Location& from_here,
142    const base::Closure& task) {
143  PendingTask pending_task(from_here, task);
144  AddTask(&pending_task);
145}
146
147void PosixDynamicThreadPool::AddTask(PendingTask* pending_task) {
148  AutoLock locked(lock_);
149  DCHECK(!terminated_) <<
150      "This thread pool is already terminated.  Do not post new tasks.";
151
152  pending_tasks_.push(*pending_task);
153  pending_task->task.Reset();
154
155  // We have enough worker threads.
156  if (static_cast<size_t>(num_idle_threads_) >= pending_tasks_.size()) {
157    pending_tasks_available_cv_.Signal();
158  } else {
159    // The new PlatformThread will take ownership of the WorkerThread object,
160    // which will delete itself on exit.
161    WorkerThread* worker =
162        new WorkerThread(name_prefix_, this);
163    PlatformThread::CreateNonJoinable(0, worker);
164  }
165}
166
167PendingTask PosixDynamicThreadPool::WaitForTask() {
168  AutoLock locked(lock_);
169
170  if (terminated_)
171    return PendingTask(FROM_HERE, base::Closure());
172
173  if (pending_tasks_.empty()) {  // No work available, wait for work.
174    num_idle_threads_++;
175    if (num_idle_threads_cv_.get())
176      num_idle_threads_cv_->Signal();
177    pending_tasks_available_cv_.TimedWait(
178        TimeDelta::FromSeconds(idle_seconds_before_exit_));
179    num_idle_threads_--;
180    if (num_idle_threads_cv_.get())
181      num_idle_threads_cv_->Signal();
182    if (pending_tasks_.empty()) {
183      // We waited for work, but there's still no work.  Return NULL to signal
184      // the thread to terminate.
185      return PendingTask(FROM_HERE, base::Closure());
186    }
187  }
188
189  PendingTask pending_task = pending_tasks_.front();
190  pending_tasks_.pop();
191  return pending_task;
192}
193
194}  // namespace base
195