SkTaskGroup.cpp revision 89889b69391a730f0ba2a1efb549864b7762263f
1#include "SkTaskGroup.h" 2 3#include "SkCondVar.h" 4#include "SkRunnable.h" 5#include "SkTDArray.h" 6#include "SkThread.h" 7#include "SkThreadUtils.h" 8 9#if defined(SK_BUILD_FOR_WIN32) 10 static inline int num_cores() { 11 SYSTEM_INFO sysinfo; 12 GetSystemInfo(&sysinfo); 13 return sysinfo.dwNumberOfProcessors; 14 } 15#else 16 #include <unistd.h> 17 static inline int num_cores() { 18 return (int) sysconf(_SC_NPROCESSORS_ONLN); 19 } 20#endif 21 22namespace { 23 24class ThreadPool : SkNoncopyable { 25public: 26 static void Add(SkRunnable* task, int32_t* pending) { 27 if (!gGlobal) { // If we have no threads, run synchronously. 28 return task->run(); 29 } 30 gGlobal->add(&CallRunnable, task, pending); 31 } 32 33 static void Add(void (*fn)(void*), void* arg, int32_t* pending) { 34 if (!gGlobal) { 35 return fn(arg); 36 } 37 gGlobal->add(fn, arg, pending); 38 } 39 40 static void Wait(int32_t* pending) { 41 if (!gGlobal) { // If we have no threads, the work must already be done. 42 SkASSERT(*pending == 0); 43 return; 44 } 45 while (sk_acquire_load(pending) > 0) { // Pairs with sk_atomic_dec here or in Loop. 46 // Lend a hand until our SkTaskGroup of interest is done. 47 Work work; 48 { 49 AutoLock lock(&gGlobal->fReady); 50 if (gGlobal->fWork.isEmpty()) { 51 // Someone has picked up all the work (including ours). How nice of them! 52 // (They may still be working on it, so we can't assert *pending == 0 here.) 53 continue; 54 } 55 gGlobal->fWork.pop(&work); 56 } 57 // This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine. 58 // We threads gotta stick together. We're always making forward progress. 59 work.fn(work.arg); 60 sk_atomic_dec(work.pending); // Release pairs with the sk_acquire_load() just above. 61 } 62 } 63 64private: 65 struct AutoLock { 66 AutoLock(SkCondVar* c) : fC(c) { fC->lock(); } 67 ~AutoLock() { fC->unlock(); } 68 private: 69 SkCondVar* fC; 70 }; 71 72 static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run(); } 73 74 struct Work { 75 void (*fn)(void*); // A function to call, 76 void* arg; // its argument, 77 int32_t* pending; // then sk_atomic_dec(pending) afterwards. 78 }; 79 80 explicit ThreadPool(int threads) : fDraining(false) { 81 if (threads == -1) { 82 threads = num_cores(); 83 } 84 for (int i = 0; i < threads; i++) { 85 fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this))); 86 fThreads.top()->start(); 87 } 88 } 89 90 ~ThreadPool() { 91 SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by now. 92 { 93 AutoLock lock(&fReady); 94 fDraining = true; 95 fReady.broadcast(); 96 } 97 for (int i = 0; i < fThreads.count(); i++) { 98 fThreads[i]->join(); 99 } 100 SkASSERT(fWork.isEmpty()); // Can't hurt to double check. 101 fThreads.deleteAll(); 102 } 103 104 void add(void (*fn)(void*), void* arg, int32_t* pending) { 105 Work work = { fn, arg, pending }; 106 sk_atomic_inc(pending); // No barrier needed. 107 { 108 AutoLock lock(&fReady); 109 fWork.push(work); 110 fReady.signal(); 111 } 112 } 113 114 static void Loop(void* arg) { 115 ThreadPool* pool = (ThreadPool*)arg; 116 Work work; 117 while (true) { 118 { 119 AutoLock lock(&pool->fReady); 120 while (pool->fWork.isEmpty()) { 121 if (pool->fDraining) { 122 return; 123 } 124 pool->fReady.wait(); 125 } 126 pool->fWork.pop(&work); 127 } 128 work.fn(work.arg); 129 sk_atomic_dec(work.pending); // Release pairs with sk_acquire_load() in Wait(). 130 } 131 } 132 133 SkTDArray<Work> fWork; 134 SkTDArray<SkThread*> fThreads; 135 SkCondVar fReady; 136 bool fDraining; 137 138 static ThreadPool* gGlobal; 139 friend struct SkTaskGroup::Enabler; 140}; 141ThreadPool* ThreadPool::gGlobal = NULL; 142 143} // namespace 144 145SkTaskGroup::Enabler::Enabler(int threads) { 146 SkASSERT(ThreadPool::gGlobal == NULL); 147 if (threads != 0) { 148 ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads)); 149 } 150} 151 152SkTaskGroup::Enabler::~Enabler() { 153 SkDELETE(ThreadPool::gGlobal); 154} 155 156SkTaskGroup::SkTaskGroup() : fPending(0) {} 157 158void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPending); } 159void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, &fPending); } 160void SkTaskGroup::wait() { ThreadPool::Wait(&fPending); } 161 162