worker_pool_posix.cc revision b636ff6a8ac3b54b3067289f01848252ab71eceb
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "base/threading/worker_pool_posix.h" 6 7#include "base/bind.h" 8#include "base/callback.h" 9#include "base/lazy_instance.h" 10#include "base/logging.h" 11#include "base/memory/ref_counted.h" 12#include "base/strings/stringprintf.h" 13#include "base/threading/platform_thread.h" 14#include "base/threading/thread_local.h" 15#include "base/threading/worker_pool.h" 16#include "base/trace_event/trace_event.h" 17#include "base/tracked_objects.h" 18 19using tracked_objects::TrackedTime; 20 21namespace base { 22 23namespace { 24 25base::LazyInstance<ThreadLocalBoolean>::Leaky 26 g_worker_pool_running_on_this_thread = LAZY_INSTANCE_INITIALIZER; 27 28const int kIdleSecondsBeforeExit = 10 * 60; 29 30class WorkerPoolImpl { 31 public: 32 WorkerPoolImpl(); 33 ~WorkerPoolImpl(); 34 35 void PostTask(const tracked_objects::Location& from_here, 36 const base::Closure& task, bool task_is_slow); 37 38 private: 39 scoped_refptr<base::PosixDynamicThreadPool> pool_; 40}; 41 42WorkerPoolImpl::WorkerPoolImpl() 43 : pool_(new base::PosixDynamicThreadPool("WorkerPool", 44 kIdleSecondsBeforeExit)) { 45} 46 47WorkerPoolImpl::~WorkerPoolImpl() { 48 pool_->Terminate(); 49} 50 51void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here, 52 const base::Closure& task, 53 bool /* task_is_slow */) { 54 pool_->PostTask(from_here, task); 55} 56 57base::LazyInstance<WorkerPoolImpl> g_lazy_worker_pool = 58 LAZY_INSTANCE_INITIALIZER; 59 60class WorkerThread : public PlatformThread::Delegate { 61 public: 62 WorkerThread(const std::string& name_prefix, 63 base::PosixDynamicThreadPool* pool) 64 : name_prefix_(name_prefix), 65 pool_(pool) {} 66 67 void ThreadMain() override; 68 69 private: 70 const std::string name_prefix_; 71 scoped_refptr<base::PosixDynamicThreadPool> pool_; 72 73 DISALLOW_COPY_AND_ASSIGN(WorkerThread); 74}; 75 76void WorkerThread::ThreadMain() { 77 g_worker_pool_running_on_this_thread.Get().Set(true); 78 const std::string name = base::StringPrintf( 79 "%s/%d", name_prefix_.c_str(), PlatformThread::CurrentId()); 80 // Note |name.c_str()| must remain valid for for the whole life of the thread. 81 PlatformThread::SetName(name); 82 83 for (;;) { 84 PendingTask pending_task = pool_->WaitForTask(); 85 if (pending_task.task.is_null()) 86 break; 87 TRACE_EVENT2("toplevel", "WorkerThread::ThreadMain::Run", 88 "src_file", pending_task.posted_from.file_name(), 89 "src_func", pending_task.posted_from.function_name()); 90 91 tracked_objects::TaskStopwatch stopwatch; 92 stopwatch.Start(); 93 pending_task.task.Run(); 94 stopwatch.Stop(); 95 96 tracked_objects::ThreadData::TallyRunOnWorkerThreadIfTracking( 97 pending_task.birth_tally, pending_task.time_posted, stopwatch); 98 } 99 100 // The WorkerThread is non-joinable, so it deletes itself. 101 delete this; 102} 103 104} // namespace 105 106// static 107bool WorkerPool::PostTask(const tracked_objects::Location& from_here, 108 const base::Closure& task, bool task_is_slow) { 109 g_lazy_worker_pool.Pointer()->PostTask(from_here, task, task_is_slow); 110 return true; 111} 112 113// static 114bool WorkerPool::RunsTasksOnCurrentThread() { 115 return g_worker_pool_running_on_this_thread.Get().Get(); 116} 117 118PosixDynamicThreadPool::PosixDynamicThreadPool(const std::string& name_prefix, 119 int idle_seconds_before_exit) 120 : name_prefix_(name_prefix), 121 idle_seconds_before_exit_(idle_seconds_before_exit), 122 pending_tasks_available_cv_(&lock_), 123 num_idle_threads_(0), 124 terminated_(false) {} 125 126PosixDynamicThreadPool::~PosixDynamicThreadPool() { 127 while (!pending_tasks_.empty()) 128 pending_tasks_.pop(); 129} 130 131void PosixDynamicThreadPool::Terminate() { 132 { 133 AutoLock locked(lock_); 134 DCHECK(!terminated_) << "Thread pool is already terminated."; 135 terminated_ = true; 136 } 137 pending_tasks_available_cv_.Broadcast(); 138} 139 140void PosixDynamicThreadPool::PostTask( 141 const tracked_objects::Location& from_here, 142 const base::Closure& task) { 143 PendingTask pending_task(from_here, task); 144 AddTask(&pending_task); 145} 146 147void PosixDynamicThreadPool::AddTask(PendingTask* pending_task) { 148 AutoLock locked(lock_); 149 DCHECK(!terminated_) << 150 "This thread pool is already terminated. Do not post new tasks."; 151 152 pending_tasks_.push(*pending_task); 153 pending_task->task.Reset(); 154 155 // We have enough worker threads. 156 if (static_cast<size_t>(num_idle_threads_) >= pending_tasks_.size()) { 157 pending_tasks_available_cv_.Signal(); 158 } else { 159 // The new PlatformThread will take ownership of the WorkerThread object, 160 // which will delete itself on exit. 161 WorkerThread* worker = 162 new WorkerThread(name_prefix_, this); 163 PlatformThread::CreateNonJoinable(0, worker); 164 } 165} 166 167PendingTask PosixDynamicThreadPool::WaitForTask() { 168 AutoLock locked(lock_); 169 170 if (terminated_) 171 return PendingTask(FROM_HERE, base::Closure()); 172 173 if (pending_tasks_.empty()) { // No work available, wait for work. 174 num_idle_threads_++; 175 if (num_idle_threads_cv_.get()) 176 num_idle_threads_cv_->Signal(); 177 pending_tasks_available_cv_.TimedWait( 178 TimeDelta::FromSeconds(idle_seconds_before_exit_)); 179 num_idle_threads_--; 180 if (num_idle_threads_cv_.get()) 181 num_idle_threads_cv_->Signal(); 182 if (pending_tasks_.empty()) { 183 // We waited for work, but there's still no work. Return NULL to signal 184 // the thread to terminate. 185 return PendingTask(FROM_HERE, base::Closure()); 186 } 187 } 188 189 PendingTask pending_task = pending_tasks_.front(); 190 pending_tasks_.pop(); 191 return pending_task; 192} 193 194} // namespace base 195