thread_pool.h revision 7940e44f4517de5e2634a7e07d58d0fb26160513
1/* 2 * Copyright (C) 2012 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#ifndef ART_SRC_THREAD_POOL_H_ 18#define ART_SRC_THREAD_POOL_H_ 19 20#include <deque> 21#include <vector> 22 23#include "barrier.h" 24#include "base/mutex.h" 25#include "closure.h" 26#include "locks.h" 27 28namespace art { 29 30class ThreadPool; 31 32class Task : public Closure { 33public: 34 // Called when references reaches 0. 35 virtual void Finalize() { } 36}; 37 38class ThreadPoolWorker { 39 public: 40 static const size_t kDefaultStackSize = 1 * MB; 41 42 size_t GetStackSize() const { 43 return stack_size_; 44 } 45 46 virtual ~ThreadPoolWorker(); 47 48 protected: 49 ThreadPoolWorker(ThreadPool* thread_pool, const std::string& name, size_t stack_size); 50 static void* Callback(void* arg) LOCKS_EXCLUDED(Locks::mutator_lock_); 51 virtual void Run(); 52 53 ThreadPool* const thread_pool_; 54 const std::string name_; 55 const size_t stack_size_; 56 pthread_t pthread_; 57 58 friend class ThreadPool; 59 DISALLOW_COPY_AND_ASSIGN(ThreadPoolWorker); 60}; 61 62class ThreadPool { 63 public: 64 // Returns the number of threads in the thread pool. 65 size_t GetThreadCount() const { 66 return threads_.size(); 67 } 68 69 // Broadcast to the workers and tell them to empty out the work queue. 70 void StartWorkers(Thread* self); 71 72 // Do not allow workers to grab any new tasks. 73 void StopWorkers(Thread* self); 74 75 // Add a new task, the first available started worker will process it. Does not delete the task 76 // after running it, it is the caller's responsibility. 77 void AddTask(Thread* self, Task* task); 78 79 ThreadPool(size_t num_threads); 80 virtual ~ThreadPool(); 81 82 // Wait for all tasks currently on queue to get completed. 83 void Wait(Thread* self, bool do_work, bool may_hold_locks); 84 85 size_t GetTaskCount(Thread* self); 86 87 // Returns the total amount of workers waited for tasks. 88 uint64_t GetWaitTime() const { 89 return total_wait_time_; 90 } 91 92 protected: 93 // Get a task to run, blocks if there are no tasks left 94 virtual Task* GetTask(Thread* self); 95 96 // Try to get a task, returning NULL if there is none available. 97 Task* TryGetTask(Thread* self); 98 Task* TryGetTaskLocked(Thread* self) EXCLUSIVE_LOCKS_REQUIRED(task_queue_lock_); 99 100 // Are we shutting down? 101 bool IsShuttingDown() const EXCLUSIVE_LOCKS_REQUIRED(task_queue_lock_) { 102 return shutting_down_; 103 } 104 105 Mutex task_queue_lock_; 106 ConditionVariable task_queue_condition_ GUARDED_BY(task_queue_lock_); 107 ConditionVariable completion_condition_ GUARDED_BY(task_queue_lock_); 108 volatile bool started_ GUARDED_BY(task_queue_lock_); 109 volatile bool shutting_down_ GUARDED_BY(task_queue_lock_); 110 // How many worker threads are waiting on the condition. 111 volatile size_t waiting_count_ GUARDED_BY(task_queue_lock_); 112 std::deque<Task*> tasks_ GUARDED_BY(task_queue_lock_); 113 // TODO: make this immutable/const? 114 std::vector<ThreadPoolWorker*> threads_; 115 // Work balance detection. 116 uint64_t start_time_ GUARDED_BY(task_queue_lock_); 117 uint64_t total_wait_time_; 118 Barrier creation_barier_; 119 120 friend class ThreadPoolWorker; 121 friend class WorkStealingWorker; 122 DISALLOW_COPY_AND_ASSIGN(ThreadPool); 123}; 124 125class WorkStealingTask : public Task { 126 public: 127 WorkStealingTask() : ref_count_(0) { 128 129 } 130 131 size_t GetRefCount() const { 132 return ref_count_; 133 } 134 135 virtual void StealFrom(Thread* self, WorkStealingTask* source) = 0; 136 137 private: 138 // How many people are referencing this task. 139 size_t ref_count_; 140 141 friend class WorkStealingWorker; 142}; 143 144class WorkStealingWorker : public ThreadPoolWorker { 145 public: 146 virtual ~WorkStealingWorker(); 147 148 bool IsRunningTask() const { 149 return task_ != NULL; 150 } 151 152 protected: 153 WorkStealingTask* task_; 154 155 WorkStealingWorker(ThreadPool* thread_pool, const std::string& name, size_t stack_size); 156 virtual void Run(); 157 158 friend class WorkStealingThreadPool; 159 DISALLOW_COPY_AND_ASSIGN(WorkStealingWorker); 160}; 161 162class WorkStealingThreadPool : public ThreadPool { 163 public: 164 WorkStealingThreadPool(size_t num_threads); 165 virtual ~WorkStealingThreadPool(); 166 167 private: 168 Mutex work_steal_lock_; 169 // Which thread we are stealing from (round robin). 170 size_t steal_index_; 171 172 // Find a task to steal from 173 WorkStealingTask* FindTaskToStealFrom(Thread* self) EXCLUSIVE_LOCKS_REQUIRED(work_steal_lock_); 174 175 friend class WorkStealingWorker; 176}; 177 178} // namespace art 179 180#endif // ART_SRC_THREAD_POOL_H_ 181