1#include "SkTaskGroup.h" 2 3#include "SkCondVar.h" 4#include "SkTDArray.h" 5#include "SkThread.h" 6#include "SkThreadUtils.h" 7 8#if defined(SK_BUILD_FOR_WIN32) 9 static inline int num_cores() { 10 SYSTEM_INFO sysinfo; 11 GetSystemInfo(&sysinfo); 12 return sysinfo.dwNumberOfProcessors; 13 } 14#else 15 #include <unistd.h> 16 static inline int num_cores() { 17 return (int) sysconf(_SC_NPROCESSORS_ONLN); 18 } 19#endif 20 21namespace { 22 23class ThreadPool : SkNoncopyable { 24public: 25 static void Add(SkRunnable* task, int32_t* pending) { 26 if (!gGlobal) { // If we have no threads, run synchronously. 27 return task->run(); 28 } 29 gGlobal->add(task, pending); 30 } 31 32 static void Wait(int32_t* pending) { 33 if (!gGlobal) { // If we have no threads, the work must already be done. 34 SkASSERT(*pending == 0); 35 return; 36 } 37 while (sk_acquire_load(pending) > 0) { // Pairs with sk_atomic_dec here or in Loop. 38 // Lend a hand until our SkTaskGroup of interest is done. 39 Work work; 40 { 41 AutoLock lock(&gGlobal->fReady); 42 if (gGlobal->fWork.isEmpty()) { 43 // Someone has picked up all the work (including ours). How nice of them! 44 // (They may still be working on it, so we can't assert *pending == 0 here.) 45 continue; 46 } 47 gGlobal->fWork.pop(&work); 48 } 49 // This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine. 50 // We threads gotta stick together. We're always making forward progress. 51 work.task->run(); 52 sk_atomic_dec(work.pending); // Release pairs with the sk_acquire_load() just above. 53 } 54 } 55 56private: 57 struct AutoLock { 58 AutoLock(SkCondVar* c) : fC(c) { fC->lock(); } 59 ~AutoLock() { fC->unlock(); } 60 private: 61 SkCondVar* fC; 62 }; 63 64 struct Work { 65 SkRunnable* task; // A task to ->run(), 66 int32_t* pending; // then sk_atomic_dec(pending) afterwards. 67 }; 68 69 explicit ThreadPool(int threads) : fDraining(false) { 70 if (threads == 0) { 71 threads = num_cores(); 72 } 73 for (int i = 0; i < threads; i++) { 74 fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this))); 75 fThreads.top()->start(); 76 } 77 } 78 79 ~ThreadPool() { 80 SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by now. 81 { 82 AutoLock lock(&fReady); 83 fDraining = true; 84 fReady.broadcast(); 85 } 86 for (int i = 0; i < fThreads.count(); i++) { 87 fThreads[i]->join(); 88 } 89 SkASSERT(fWork.isEmpty()); // Can't hurt to double check. 90 fThreads.deleteAll(); 91 } 92 93 void add(SkRunnable* task, int32_t* pending) { 94 Work work = { task, pending }; 95 sk_atomic_inc(pending); // No barrier needed. 96 { 97 AutoLock lock(&fReady); 98 fWork.push(work); 99 fReady.signal(); 100 } 101 } 102 103 static void Loop(void* arg) { 104 ThreadPool* pool = (ThreadPool*)arg; 105 Work work; 106 while (true) { 107 { 108 AutoLock lock(&pool->fReady); 109 while (pool->fWork.isEmpty()) { 110 if (pool->fDraining) { 111 return; 112 } 113 pool->fReady.wait(); 114 } 115 pool->fWork.pop(&work); 116 } 117 work.task->run(); 118 sk_atomic_dec(work.pending); // Release pairs with sk_acquire_load() in Wait(). 119 } 120 } 121 122 SkTDArray<Work> fWork; 123 SkTDArray<SkThread*> fThreads; 124 SkCondVar fReady; 125 bool fDraining; 126 127 static ThreadPool* gGlobal; 128 friend struct SkTaskGroup::Enabler; 129}; 130ThreadPool* ThreadPool::gGlobal = NULL; 131 132} // namespace 133 134SkTaskGroup::Enabler::Enabler(int threads) { 135 SkASSERT(ThreadPool::gGlobal == NULL); 136 ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads)); 137} 138 139SkTaskGroup::Enabler::~Enabler() { 140 SkASSERT(ThreadPool::gGlobal != NULL); 141 SkDELETE(ThreadPool::gGlobal); 142} 143 144SkTaskGroup::SkTaskGroup() : fPending(0) {} 145 146void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPending); } 147void SkTaskGroup::wait() { ThreadPool::Wait(&fPending); } 148 149