1b32f58018498ea2225959b0ba11c18f0c433deefEvgeniy Stepanov// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2b32f58018498ea2225959b0ba11c18f0c433deefEvgeniy Stepanov// Use of this source code is governed by a BSD-style license that can be 3b32f58018498ea2225959b0ba11c18f0c433deefEvgeniy Stepanov// found in the LICENSE file. 4b32f58018498ea2225959b0ba11c18f0c433deefEvgeniy Stepanov 5b32f58018498ea2225959b0ba11c18f0c433deefEvgeniy Stepanov#include "base/threading/worker_pool_posix.h" 6b32f58018498ea2225959b0ba11c18f0c433deefEvgeniy Stepanov 7b32f58018498ea2225959b0ba11c18f0c433deefEvgeniy Stepanov#include "base/bind.h" 8b32f58018498ea2225959b0ba11c18f0c433deefEvgeniy Stepanov#include "base/callback.h" 9b32f58018498ea2225959b0ba11c18f0c433deefEvgeniy Stepanov#include "base/debug/trace_event.h" 10b32f58018498ea2225959b0ba11c18f0c433deefEvgeniy Stepanov#include "base/lazy_instance.h" 11b32f58018498ea2225959b0ba11c18f0c433deefEvgeniy Stepanov#include "base/logging.h" 12b32f58018498ea2225959b0ba11c18f0c433deefEvgeniy Stepanov#include "base/memory/ref_counted.h" 13b32f58018498ea2225959b0ba11c18f0c433deefEvgeniy Stepanov#include "base/strings/stringprintf.h" 14b32f58018498ea2225959b0ba11c18f0c433deefEvgeniy Stepanov#include "base/threading/platform_thread.h" 15b32f58018498ea2225959b0ba11c18f0c433deefEvgeniy Stepanov#include "base/threading/thread_local.h" 16b32f58018498ea2225959b0ba11c18f0c433deefEvgeniy Stepanov#include "base/threading/worker_pool.h" 17b32f58018498ea2225959b0ba11c18f0c433deefEvgeniy Stepanov#include "base/tracked_objects.h" 18b32f58018498ea2225959b0ba11c18f0c433deefEvgeniy Stepanov 19b32f58018498ea2225959b0ba11c18f0c433deefEvgeniy Stepanovusing tracked_objects::TrackedTime; 20b32f58018498ea2225959b0ba11c18f0c433deefEvgeniy Stepanov 21namespace base { 22 23namespace { 24 25base::LazyInstance<ThreadLocalBoolean>::Leaky 26 g_worker_pool_running_on_this_thread = LAZY_INSTANCE_INITIALIZER; 27 28const int kIdleSecondsBeforeExit = 10 * 60; 29 30#ifdef ADDRESS_SANITIZER 31const int kWorkerThreadStackSize = 256 * 1024; 32#else 33// A stack size of 64 KB is too small for the CERT_PKIXVerifyCert 34// function of NSS because of NSS bug 439169. 35const int kWorkerThreadStackSize = 128 * 1024; 36#endif 37 38class WorkerPoolImpl { 39 public: 40 WorkerPoolImpl(); 41 ~WorkerPoolImpl(); 42 43 void PostTask(const tracked_objects::Location& from_here, 44 const base::Closure& task, bool task_is_slow); 45 46 private: 47 scoped_refptr<base::PosixDynamicThreadPool> pool_; 48}; 49 50WorkerPoolImpl::WorkerPoolImpl() 51 : pool_(new base::PosixDynamicThreadPool("WorkerPool", 52 kIdleSecondsBeforeExit)) { 53} 54 55WorkerPoolImpl::~WorkerPoolImpl() { 56 pool_->Terminate(); 57} 58 59void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here, 60 const base::Closure& task, bool task_is_slow) { 61 pool_->PostTask(from_here, task); 62} 63 64base::LazyInstance<WorkerPoolImpl> g_lazy_worker_pool = 65 LAZY_INSTANCE_INITIALIZER; 66 67class WorkerThread : public PlatformThread::Delegate { 68 public: 69 WorkerThread(const std::string& name_prefix, 70 base::PosixDynamicThreadPool* pool) 71 : name_prefix_(name_prefix), 72 pool_(pool) {} 73 74 virtual void ThreadMain() OVERRIDE; 75 76 private: 77 const std::string name_prefix_; 78 scoped_refptr<base::PosixDynamicThreadPool> pool_; 79 80 DISALLOW_COPY_AND_ASSIGN(WorkerThread); 81}; 82 83void WorkerThread::ThreadMain() { 84 g_worker_pool_running_on_this_thread.Get().Set(true); 85 const std::string name = base::StringPrintf( 86 "%s/%d", name_prefix_.c_str(), PlatformThread::CurrentId()); 87 // Note |name.c_str()| must remain valid for for the whole life of the thread. 88 PlatformThread::SetName(name.c_str()); 89 90 for (;;) { 91 PendingTask pending_task = pool_->WaitForTask(); 92 if (pending_task.task.is_null()) 93 break; 94 TRACE_EVENT2("toplevel", "WorkerThread::ThreadMain::Run", 95 "src_file", pending_task.posted_from.file_name(), 96 "src_func", pending_task.posted_from.function_name()); 97 98 tracked_objects::ThreadData::PrepareForStartOfRun(pending_task.birth_tally); 99 tracked_objects::TaskStopwatch stopwatch; 100 pending_task.task.Run(); 101 stopwatch.Stop(); 102 103 tracked_objects::ThreadData::TallyRunOnWorkerThreadIfTracking( 104 pending_task.birth_tally, TrackedTime(pending_task.time_posted), 105 stopwatch); 106 } 107 108 // The WorkerThread is non-joinable, so it deletes itself. 109 delete this; 110} 111 112} // namespace 113 114// static 115bool WorkerPool::PostTask(const tracked_objects::Location& from_here, 116 const base::Closure& task, bool task_is_slow) { 117 g_lazy_worker_pool.Pointer()->PostTask(from_here, task, task_is_slow); 118 return true; 119} 120 121// static 122bool WorkerPool::RunsTasksOnCurrentThread() { 123 return g_worker_pool_running_on_this_thread.Get().Get(); 124} 125 126PosixDynamicThreadPool::PosixDynamicThreadPool(const std::string& name_prefix, 127 int idle_seconds_before_exit) 128 : name_prefix_(name_prefix), 129 idle_seconds_before_exit_(idle_seconds_before_exit), 130 pending_tasks_available_cv_(&lock_), 131 num_idle_threads_(0), 132 terminated_(false) {} 133 134PosixDynamicThreadPool::~PosixDynamicThreadPool() { 135 while (!pending_tasks_.empty()) 136 pending_tasks_.pop(); 137} 138 139void PosixDynamicThreadPool::Terminate() { 140 { 141 AutoLock locked(lock_); 142 DCHECK(!terminated_) << "Thread pool is already terminated."; 143 terminated_ = true; 144 } 145 pending_tasks_available_cv_.Broadcast(); 146} 147 148void PosixDynamicThreadPool::PostTask( 149 const tracked_objects::Location& from_here, 150 const base::Closure& task) { 151 PendingTask pending_task(from_here, task); 152 AddTask(&pending_task); 153} 154 155void PosixDynamicThreadPool::AddTask(PendingTask* pending_task) { 156 AutoLock locked(lock_); 157 DCHECK(!terminated_) << 158 "This thread pool is already terminated. Do not post new tasks."; 159 160 pending_tasks_.push(*pending_task); 161 pending_task->task.Reset(); 162 163 // We have enough worker threads. 164 if (static_cast<size_t>(num_idle_threads_) >= pending_tasks_.size()) { 165 pending_tasks_available_cv_.Signal(); 166 } else { 167 // The new PlatformThread will take ownership of the WorkerThread object, 168 // which will delete itself on exit. 169 WorkerThread* worker = 170 new WorkerThread(name_prefix_, this); 171 PlatformThread::CreateNonJoinable(kWorkerThreadStackSize, worker); 172 } 173} 174 175PendingTask PosixDynamicThreadPool::WaitForTask() { 176 AutoLock locked(lock_); 177 178 if (terminated_) 179 return PendingTask(FROM_HERE, base::Closure()); 180 181 if (pending_tasks_.empty()) { // No work available, wait for work. 182 num_idle_threads_++; 183 if (num_idle_threads_cv_.get()) 184 num_idle_threads_cv_->Signal(); 185 pending_tasks_available_cv_.TimedWait( 186 TimeDelta::FromSeconds(idle_seconds_before_exit_)); 187 num_idle_threads_--; 188 if (num_idle_threads_cv_.get()) 189 num_idle_threads_cv_->Signal(); 190 if (pending_tasks_.empty()) { 191 // We waited for work, but there's still no work. Return NULL to signal 192 // the thread to terminate. 193 return PendingTask(FROM_HERE, base::Closure()); 194 } 195 } 196 197 PendingTask pending_task = pending_tasks_.front(); 198 pending_tasks_.pop(); 199 return pending_task; 200} 201 202} // namespace base 203