thread_pool.h revision 02c8cc6d1312a2b55533f02f6369dc7c94672f90
1/* 2 * Copyright (C) 2012 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#ifndef ART_RUNTIME_THREAD_POOL_H_ 18#define ART_RUNTIME_THREAD_POOL_H_ 19 20#include <deque> 21#include <vector> 22 23#include "barrier.h" 24#include "base/mutex.h" 25#include "closure.h" 26#include "locks.h" 27 28namespace art { 29 30class ThreadPool; 31 32class Task : public Closure { 33 public: 34 // Called when references reaches 0. 35 virtual void Finalize() { } 36}; 37 38class ThreadPoolWorker { 39 public: 40 static const size_t kDefaultStackSize = 1 * MB; 41 42 size_t GetStackSize() const { 43 return stack_size_; 44 } 45 46 virtual ~ThreadPoolWorker(); 47 48 protected: 49 ThreadPoolWorker(ThreadPool* thread_pool, const std::string& name, size_t stack_size); 50 static void* Callback(void* arg) LOCKS_EXCLUDED(Locks::mutator_lock_); 51 virtual void Run(); 52 53 ThreadPool* const thread_pool_; 54 const std::string name_; 55 const size_t stack_size_; 56 pthread_t pthread_; 57 58 friend class ThreadPool; 59 DISALLOW_COPY_AND_ASSIGN(ThreadPoolWorker); 60}; 61 62class ThreadPool { 63 public: 64 // Returns the number of threads in the thread pool. 65 size_t GetThreadCount() const { 66 return threads_.size(); 67 } 68 69 // Broadcast to the workers and tell them to empty out the work queue. 70 void StartWorkers(Thread* self); 71 72 // Do not allow workers to grab any new tasks. 73 void StopWorkers(Thread* self); 74 75 // Add a new task, the first available started worker will process it. Does not delete the task 76 // after running it, it is the caller's responsibility. 77 void AddTask(Thread* self, Task* task); 78 79 explicit ThreadPool(size_t num_threads); 80 virtual ~ThreadPool(); 81 82 // Wait for all tasks currently on queue to get completed. 83 void Wait(Thread* self, bool do_work, bool may_hold_locks); 84 85 size_t GetTaskCount(Thread* self); 86 87 // Returns the total amount of workers waited for tasks. 88 uint64_t GetWaitTime() const { 89 return total_wait_time_; 90 } 91 92 protected: 93 // Get a task to run, blocks if there are no tasks left 94 virtual Task* GetTask(Thread* self); 95 96 // Try to get a task, returning NULL if there is none available. 97 Task* TryGetTask(Thread* self); 98 Task* TryGetTaskLocked(Thread* self) EXCLUSIVE_LOCKS_REQUIRED(task_queue_lock_); 99 100 // Are we shutting down? 101 bool IsShuttingDown() const EXCLUSIVE_LOCKS_REQUIRED(task_queue_lock_) { 102 return shutting_down_; 103 } 104 105 Mutex task_queue_lock_; 106 ConditionVariable task_queue_condition_ GUARDED_BY(task_queue_lock_); 107 ConditionVariable completion_condition_ GUARDED_BY(task_queue_lock_); 108 volatile bool started_ GUARDED_BY(task_queue_lock_); 109 volatile bool shutting_down_ GUARDED_BY(task_queue_lock_); 110 // How many worker threads are waiting on the condition. 111 volatile size_t waiting_count_ GUARDED_BY(task_queue_lock_); 112 std::deque<Task*> tasks_ GUARDED_BY(task_queue_lock_); 113 // TODO: make this immutable/const? 114 std::vector<ThreadPoolWorker*> threads_; 115 // Work balance detection. 116 uint64_t start_time_ GUARDED_BY(task_queue_lock_); 117 uint64_t total_wait_time_; 118 Barrier creation_barier_; 119 120 friend class ThreadPoolWorker; 121 friend class WorkStealingWorker; 122 DISALLOW_COPY_AND_ASSIGN(ThreadPool); 123}; 124 125class WorkStealingTask : public Task { 126 public: 127 WorkStealingTask() : ref_count_(0) {} 128 129 size_t GetRefCount() const { 130 return ref_count_; 131 } 132 133 virtual void StealFrom(Thread* self, WorkStealingTask* source) = 0; 134 135 private: 136 // How many people are referencing this task. 137 size_t ref_count_; 138 139 friend class WorkStealingWorker; 140}; 141 142class WorkStealingWorker : public ThreadPoolWorker { 143 public: 144 virtual ~WorkStealingWorker(); 145 146 bool IsRunningTask() const { 147 return task_ != NULL; 148 } 149 150 protected: 151 WorkStealingTask* task_; 152 153 WorkStealingWorker(ThreadPool* thread_pool, const std::string& name, size_t stack_size); 154 virtual void Run(); 155 156 friend class WorkStealingThreadPool; 157 DISALLOW_COPY_AND_ASSIGN(WorkStealingWorker); 158}; 159 160class WorkStealingThreadPool : public ThreadPool { 161 public: 162 explicit WorkStealingThreadPool(size_t num_threads); 163 virtual ~WorkStealingThreadPool(); 164 165 private: 166 Mutex work_steal_lock_; 167 // Which thread we are stealing from (round robin). 168 size_t steal_index_; 169 170 // Find a task to steal from 171 WorkStealingTask* FindTaskToStealFrom(Thread* self) EXCLUSIVE_LOCKS_REQUIRED(work_steal_lock_); 172 173 friend class WorkStealingWorker; 174}; 175 176} // namespace art 177 178#endif // ART_RUNTIME_THREAD_POOL_H_ 179