1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "base/threading/worker_pool_posix.h" 6 7#include <stddef.h> 8 9#include "base/bind.h" 10#include "base/callback.h" 11#include "base/lazy_instance.h" 12#include "base/logging.h" 13#include "base/macros.h" 14#include "base/memory/ref_counted.h" 15#include "base/strings/stringprintf.h" 16#include "base/threading/platform_thread.h" 17#include "base/threading/thread_local.h" 18#include "base/threading/worker_pool.h" 19#include "base/trace_event/trace_event.h" 20#include "base/tracked_objects.h" 21 22using tracked_objects::TrackedTime; 23 24namespace base { 25 26namespace { 27 28base::LazyInstance<ThreadLocalBoolean>::Leaky 29 g_worker_pool_running_on_this_thread = LAZY_INSTANCE_INITIALIZER; 30 31const int kIdleSecondsBeforeExit = 10 * 60; 32 33class WorkerPoolImpl { 34 public: 35 WorkerPoolImpl(); 36 ~WorkerPoolImpl(); 37 38 void PostTask(const tracked_objects::Location& from_here, 39 const base::Closure& task, 40 bool task_is_slow); 41 42 private: 43 scoped_refptr<base::PosixDynamicThreadPool> pool_; 44}; 45 46WorkerPoolImpl::WorkerPoolImpl() 47 : pool_(new base::PosixDynamicThreadPool("WorkerPool", 48 kIdleSecondsBeforeExit)) {} 49 50WorkerPoolImpl::~WorkerPoolImpl() { 51 pool_->Terminate(); 52} 53 54void WorkerPoolImpl::PostTask(const tracked_objects::Location& from_here, 55 const base::Closure& task, 56 bool /* task_is_slow */) { 57 pool_->PostTask(from_here, task); 58} 59 60base::LazyInstance<WorkerPoolImpl> g_lazy_worker_pool = 61 LAZY_INSTANCE_INITIALIZER; 62 63class WorkerThread : public PlatformThread::Delegate { 64 public: 65 WorkerThread(const std::string& name_prefix, 66 base::PosixDynamicThreadPool* pool) 67 : name_prefix_(name_prefix), pool_(pool) {} 68 69 void ThreadMain() override; 70 71 private: 72 const std::string name_prefix_; 73 scoped_refptr<base::PosixDynamicThreadPool> pool_; 74 75 DISALLOW_COPY_AND_ASSIGN(WorkerThread); 76}; 77 78void WorkerThread::ThreadMain() { 79 g_worker_pool_running_on_this_thread.Get().Set(true); 80 const std::string name = base::StringPrintf("%s/%d", name_prefix_.c_str(), 81 PlatformThread::CurrentId()); 82 // Note |name.c_str()| must remain valid for for the whole life of the thread. 83 PlatformThread::SetName(name); 84 85 for (;;) { 86 PendingTask pending_task = pool_->WaitForTask(); 87 if (pending_task.task.is_null()) 88 break; 89 TRACE_EVENT2("toplevel", "WorkerThread::ThreadMain::Run", 90 "src_file", pending_task.posted_from.file_name(), 91 "src_func", pending_task.posted_from.function_name()); 92 93 tracked_objects::TaskStopwatch stopwatch; 94 stopwatch.Start(); 95 pending_task.task.Run(); 96 stopwatch.Stop(); 97 98 tracked_objects::ThreadData::TallyRunOnWorkerThreadIfTracking( 99 pending_task.birth_tally, pending_task.time_posted, stopwatch); 100 } 101 102 // The WorkerThread is non-joinable, so it deletes itself. 103 delete this; 104} 105 106} // namespace 107 108// static 109bool WorkerPool::PostTask(const tracked_objects::Location& from_here, 110 const base::Closure& task, 111 bool task_is_slow) { 112 g_lazy_worker_pool.Pointer()->PostTask(from_here, task, task_is_slow); 113 return true; 114} 115 116// static 117bool WorkerPool::RunsTasksOnCurrentThread() { 118 return g_worker_pool_running_on_this_thread.Get().Get(); 119} 120 121PosixDynamicThreadPool::PosixDynamicThreadPool(const std::string& name_prefix, 122 int idle_seconds_before_exit) 123 : name_prefix_(name_prefix), 124 idle_seconds_before_exit_(idle_seconds_before_exit), 125 pending_tasks_available_cv_(&lock_), 126 num_idle_threads_(0), 127 terminated_(false) {} 128 129PosixDynamicThreadPool::~PosixDynamicThreadPool() { 130 while (!pending_tasks_.empty()) 131 pending_tasks_.pop(); 132} 133 134void PosixDynamicThreadPool::Terminate() { 135 { 136 AutoLock locked(lock_); 137 DCHECK(!terminated_) << "Thread pool is already terminated."; 138 terminated_ = true; 139 } 140 pending_tasks_available_cv_.Broadcast(); 141} 142 143void PosixDynamicThreadPool::PostTask( 144 const tracked_objects::Location& from_here, 145 const base::Closure& task) { 146 PendingTask pending_task(from_here, task); 147 AddTask(&pending_task); 148} 149 150void PosixDynamicThreadPool::AddTask(PendingTask* pending_task) { 151 AutoLock locked(lock_); 152 DCHECK(!terminated_) 153 << "This thread pool is already terminated. Do not post new tasks."; 154 155 pending_tasks_.push(*pending_task); 156 pending_task->task.Reset(); 157 158 // We have enough worker threads. 159 if (static_cast<size_t>(num_idle_threads_) >= pending_tasks_.size()) { 160 pending_tasks_available_cv_.Signal(); 161 } else { 162 // The new PlatformThread will take ownership of the WorkerThread object, 163 // which will delete itself on exit. 164 WorkerThread* worker = new WorkerThread(name_prefix_, this); 165 PlatformThread::CreateNonJoinable(0, worker); 166 } 167} 168 169PendingTask PosixDynamicThreadPool::WaitForTask() { 170 AutoLock locked(lock_); 171 172 if (terminated_) 173 return PendingTask(FROM_HERE, base::Closure()); 174 175 if (pending_tasks_.empty()) { // No work available, wait for work. 176 num_idle_threads_++; 177 if (num_idle_threads_cv_.get()) 178 num_idle_threads_cv_->Signal(); 179 pending_tasks_available_cv_.TimedWait( 180 TimeDelta::FromSeconds(idle_seconds_before_exit_)); 181 num_idle_threads_--; 182 if (num_idle_threads_cv_.get()) 183 num_idle_threads_cv_->Signal(); 184 if (pending_tasks_.empty()) { 185 // We waited for work, but there's still no work. Return NULL to signal 186 // the thread to terminate. 187 return PendingTask(FROM_HERE, base::Closure()); 188 } 189 } 190 191 PendingTask pending_task = pending_tasks_.front(); 192 pending_tasks_.pop(); 193 return pending_task; 194} 195 196} // namespace base 197