thread_pool.cc revision 7940e44f4517de5e2634a7e07d58d0fb26160513
1/* 2 * Copyright (C) 2012 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#include "thread_pool.h" 18 19#include "base/casts.h" 20#include "base/stl_util.h" 21#include "runtime.h" 22#include "thread.h" 23 24namespace art { 25 26ThreadPoolWorker::ThreadPoolWorker(ThreadPool* thread_pool, const std::string& name, 27 size_t stack_size) 28 : thread_pool_(thread_pool), 29 name_(name), 30 stack_size_(stack_size) { 31 const char* reason = "new thread pool worker thread"; 32 pthread_attr_t attr; 33 CHECK_PTHREAD_CALL(pthread_attr_init, (&attr), reason); 34 CHECK_PTHREAD_CALL(pthread_attr_setstacksize, (&attr, stack_size), reason); 35 CHECK_PTHREAD_CALL(pthread_create, (&pthread_, &attr, &Callback, this), reason); 36 CHECK_PTHREAD_CALL(pthread_attr_destroy, (&attr), reason); 37} 38 39ThreadPoolWorker::~ThreadPoolWorker() { 40 CHECK_PTHREAD_CALL(pthread_join, (pthread_, NULL), "thread pool worker shutdown"); 41} 42 43void ThreadPoolWorker::Run() { 44 Thread* self = Thread::Current(); 45 Task* task = NULL; 46 thread_pool_->creation_barier_.Wait(self); 47 while ((task = thread_pool_->GetTask(self)) != NULL) { 48 task->Run(self); 49 task->Finalize(); 50 } 51} 52 53void* ThreadPoolWorker::Callback(void* arg) { 54 ThreadPoolWorker* worker = reinterpret_cast<ThreadPoolWorker*>(arg); 55 Runtime* runtime = Runtime::Current(); 56 CHECK(runtime->AttachCurrentThread(worker->name_.c_str(), true, NULL, false)); 57 // Do work until its time to shut down. 58 worker->Run(); 59 runtime->DetachCurrentThread(); 60 return NULL; 61} 62 63void ThreadPool::AddTask(Thread* self, Task* task){ 64 MutexLock mu(self, task_queue_lock_); 65 tasks_.push_back(task); 66 // If we have any waiters, signal one. 67 if (waiting_count_ != 0) { 68 task_queue_condition_.Signal(self); 69 } 70} 71 72ThreadPool::ThreadPool(size_t num_threads) 73 : task_queue_lock_("task queue lock"), 74 task_queue_condition_("task queue condition", task_queue_lock_), 75 completion_condition_("task completion condition", task_queue_lock_), 76 started_(false), 77 shutting_down_(false), 78 waiting_count_(0), 79 start_time_(0), 80 total_wait_time_(0), 81 // Add one since the caller of constructor waits on the barrier too. 82 creation_barier_(num_threads + 1) { 83 Thread* self = Thread::Current(); 84 while (GetThreadCount() < num_threads) { 85 const std::string name = StringPrintf("Thread pool worker %zu", GetThreadCount()); 86 threads_.push_back(new ThreadPoolWorker(this, name, ThreadPoolWorker::kDefaultStackSize)); 87 } 88 // Wait for all of the threads to attach. 89 creation_barier_.Wait(self); 90} 91 92ThreadPool::~ThreadPool() { 93 { 94 Thread* self = Thread::Current(); 95 MutexLock mu(self, task_queue_lock_); 96 // Tell any remaining workers to shut down. 97 shutting_down_ = true; 98 // Broadcast to everyone waiting. 99 task_queue_condition_.Broadcast(self); 100 completion_condition_.Broadcast(self); 101 } 102 // Wait for the threads to finish. 103 STLDeleteElements(&threads_); 104} 105 106void ThreadPool::StartWorkers(Thread* self) { 107 MutexLock mu(self, task_queue_lock_); 108 started_ = true; 109 task_queue_condition_.Broadcast(self); 110 start_time_ = NanoTime(); 111 total_wait_time_ = 0; 112} 113 114void ThreadPool::StopWorkers(Thread* self) { 115 MutexLock mu(self, task_queue_lock_); 116 started_ = false; 117} 118 119Task* ThreadPool::GetTask(Thread* self) { 120 MutexLock mu(self, task_queue_lock_); 121 while (!IsShuttingDown()) { 122 Task* task = TryGetTaskLocked(self); 123 if (task != NULL) { 124 return task; 125 } 126 127 waiting_count_++; 128 if (waiting_count_ == GetThreadCount() && tasks_.empty()) { 129 // We may be done, lets broadcast to the completion condition. 130 completion_condition_.Broadcast(self); 131 } 132 const uint64_t wait_start = NanoTime(); 133 task_queue_condition_.Wait(self); 134 const uint64_t wait_end = NanoTime(); 135 total_wait_time_ += wait_end - std::max(wait_start, start_time_); 136 waiting_count_--; 137 } 138 139 // We are shutting down, return NULL to tell the worker thread to stop looping. 140 return NULL; 141} 142 143Task* ThreadPool::TryGetTask(Thread* self) { 144 MutexLock mu(self, task_queue_lock_); 145 return TryGetTaskLocked(self); 146} 147 148Task* ThreadPool::TryGetTaskLocked(Thread* self) { 149 if (started_ && !tasks_.empty()) { 150 Task* task = tasks_.front(); 151 tasks_.pop_front(); 152 return task; 153 } 154 return NULL; 155} 156 157void ThreadPool::Wait(Thread* self, bool do_work, bool may_hold_locks) { 158 if (do_work) { 159 Task* task = NULL; 160 while ((task = TryGetTask(self)) != NULL) { 161 task->Run(self); 162 task->Finalize(); 163 } 164 } 165 // Wait until each thread is waiting and the task list is empty. 166 MutexLock mu(self, task_queue_lock_); 167 while (!shutting_down_ && (waiting_count_ != GetThreadCount() || !tasks_.empty())) { 168 if (!may_hold_locks) { 169 completion_condition_.Wait(self); 170 } else { 171 completion_condition_.WaitHoldingLocks(self); 172 } 173 } 174} 175 176size_t ThreadPool::GetTaskCount(Thread* self){ 177 MutexLock mu(self, task_queue_lock_); 178 return tasks_.size(); 179} 180 181WorkStealingWorker::WorkStealingWorker(ThreadPool* thread_pool, const std::string& name, 182 size_t stack_size) 183 : ThreadPoolWorker(thread_pool, name, stack_size), 184 task_(NULL) { 185 186} 187 188void WorkStealingWorker::Run() { 189 Thread* self = Thread::Current(); 190 Task* task = NULL; 191 WorkStealingThreadPool* thread_pool = down_cast<WorkStealingThreadPool*>(thread_pool_); 192 while ((task = thread_pool_->GetTask(self)) != NULL) { 193 WorkStealingTask* stealing_task = down_cast<WorkStealingTask*>(task); 194 195 { 196 CHECK(task_ == NULL); 197 MutexLock mu(self, thread_pool->work_steal_lock_); 198 // Register that we are running the task 199 ++stealing_task->ref_count_; 200 task_ = stealing_task; 201 } 202 stealing_task->Run(self); 203 // Mark ourselves as not running a task so that nobody tries to steal from us. 204 // There is a race condition that someone starts stealing from us at this point. This is okay 205 // due to the reference counting. 206 task_ = NULL; 207 208 bool finalize; 209 210 // Steal work from tasks until there is none left to steal. Note: There is a race, but 211 // all that happens when the race occurs is that we steal some work instead of processing a 212 // task from the queue. 213 while (thread_pool->GetTaskCount(self) == 0) { 214 WorkStealingTask* steal_from_task = NULL; 215 216 { 217 MutexLock mu(self, thread_pool->work_steal_lock_); 218 // Try finding a task to steal from. 219 steal_from_task = thread_pool->FindTaskToStealFrom(self); 220 if (steal_from_task != NULL) { 221 CHECK_NE(stealing_task, steal_from_task) 222 << "Attempting to steal from completed self task"; 223 steal_from_task->ref_count_++; 224 } else { 225 break; 226 } 227 } 228 229 if (steal_from_task != NULL) { 230 // Task which completed earlier is going to steal some work. 231 stealing_task->StealFrom(self, steal_from_task); 232 233 { 234 // We are done stealing from the task, lets decrement its reference count. 235 MutexLock mu(self, thread_pool->work_steal_lock_); 236 finalize = !--steal_from_task->ref_count_; 237 } 238 239 if (finalize) { 240 steal_from_task->Finalize(); 241 } 242 } 243 } 244 245 { 246 MutexLock mu(self, thread_pool->work_steal_lock_); 247 // If nobody is still referencing task_ we can finalize it. 248 finalize = !--stealing_task->ref_count_; 249 } 250 251 if (finalize) { 252 stealing_task->Finalize(); 253 } 254 } 255} 256 257WorkStealingWorker::~WorkStealingWorker() { 258 259} 260 261WorkStealingThreadPool::WorkStealingThreadPool(size_t num_threads) 262 : ThreadPool(0), 263 work_steal_lock_("work stealing lock"), 264 steal_index_(0) { 265 while (GetThreadCount() < num_threads) { 266 const std::string name = StringPrintf("Work stealing worker %zu", GetThreadCount()); 267 threads_.push_back(new WorkStealingWorker(this, name, ThreadPoolWorker::kDefaultStackSize)); 268 } 269} 270 271WorkStealingTask* WorkStealingThreadPool::FindTaskToStealFrom(Thread* self) { 272 const size_t thread_count = GetThreadCount(); 273 for (size_t i = 0; i < thread_count; ++i) { 274 // TODO: Use CAS instead of lock. 275 ++steal_index_; 276 if (steal_index_ >= thread_count) { 277 steal_index_-= thread_count; 278 } 279 280 WorkStealingWorker* worker = down_cast<WorkStealingWorker*>(threads_[steal_index_]); 281 WorkStealingTask* task = worker->task_; 282 if (task) { 283 // Not null, we can probably steal from this worker. 284 return task; 285 } 286 } 287 // Couldn't find something to steal. 288 return NULL; 289} 290 291WorkStealingThreadPool::~WorkStealingThreadPool() { 292 293} 294 295} // namespace art 296