1#include "SkTaskGroup.h" 2 3#include "SkCondVar.h" 4#include "SkRunnable.h" 5#include "SkTDArray.h" 6#include "SkThread.h" 7#include "SkThreadUtils.h" 8 9#if defined(SK_BUILD_FOR_WIN32) 10 static inline int num_cores() { 11 SYSTEM_INFO sysinfo; 12 GetSystemInfo(&sysinfo); 13 return sysinfo.dwNumberOfProcessors; 14 } 15#else 16 #include <unistd.h> 17 static inline int num_cores() { 18 return (int) sysconf(_SC_NPROCESSORS_ONLN); 19 } 20#endif 21 22namespace { 23 24class ThreadPool : SkNoncopyable { 25public: 26 static void Add(SkRunnable* task, int32_t* pending) { 27 if (!gGlobal) { // If we have no threads, run synchronously. 28 return task->run(); 29 } 30 gGlobal->add(&CallRunnable, task, pending); 31 } 32 33 static void Add(void (*fn)(void*), void* arg, int32_t* pending) { 34 if (!gGlobal) { 35 return fn(arg); 36 } 37 gGlobal->add(fn, arg, pending); 38 } 39 40 static void Batch(void (*fn)(void*), void* args, int N, size_t stride, int32_t* pending) { 41 if (!gGlobal) { 42 for (int i = 0; i < N; i++) { fn((char*)args + i*stride); } 43 return; 44 } 45 gGlobal->batch(fn, args, N, stride, pending); 46 } 47 48 static void Wait(int32_t* pending) { 49 if (!gGlobal) { // If we have no threads, the work must already be done. 50 SkASSERT(*pending == 0); 51 return; 52 } 53 while (sk_acquire_load(pending) > 0) { // Pairs with sk_atomic_dec here or in Loop. 54 // Lend a hand until our SkTaskGroup of interest is done. 55 Work work; 56 { 57 AutoLock lock(&gGlobal->fReady); 58 if (gGlobal->fWork.isEmpty()) { 59 // Someone has picked up all the work (including ours). How nice of them! 60 // (They may still be working on it, so we can't assert *pending == 0 here.) 61 continue; 62 } 63 gGlobal->fWork.pop(&work); 64 } 65 // This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine. 66 // We threads gotta stick together. We're always making forward progress. 67 work.fn(work.arg); 68 sk_atomic_dec(work.pending); // Release pairs with the sk_acquire_load() just above. 69 } 70 } 71 72private: 73 struct AutoLock { 74 AutoLock(SkCondVar* c) : fC(c) { fC->lock(); } 75 ~AutoLock() { fC->unlock(); } 76 private: 77 SkCondVar* fC; 78 }; 79 80 static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run(); } 81 82 struct Work { 83 void (*fn)(void*); // A function to call, 84 void* arg; // its argument, 85 int32_t* pending; // then sk_atomic_dec(pending) afterwards. 86 }; 87 88 explicit ThreadPool(int threads) : fDraining(false) { 89 if (threads == -1) { 90 threads = num_cores(); 91 } 92 for (int i = 0; i < threads; i++) { 93 fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this))); 94 fThreads.top()->start(); 95 } 96 } 97 98 ~ThreadPool() { 99 SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by now. 100 { 101 AutoLock lock(&fReady); 102 fDraining = true; 103 fReady.broadcast(); 104 } 105 for (int i = 0; i < fThreads.count(); i++) { 106 fThreads[i]->join(); 107 } 108 SkASSERT(fWork.isEmpty()); // Can't hurt to double check. 109 fThreads.deleteAll(); 110 } 111 112 void add(void (*fn)(void*), void* arg, int32_t* pending) { 113 Work work = { fn, arg, pending }; 114 sk_atomic_inc(pending); // No barrier needed. 115 { 116 AutoLock lock(&fReady); 117 fWork.push(work); 118 fReady.signal(); 119 } 120 } 121 122 void batch(void (*fn)(void*), void* arg, int N, size_t stride, int32_t* pending) { 123 sk_atomic_add(pending, N); // No barrier needed. 124 { 125 AutoLock lock(&fReady); 126 Work* batch = fWork.append(N); 127 for (int i = 0; i < N; i++) { 128 Work work = { fn, (char*)arg + i*stride, pending }; 129 batch[i] = work; 130 } 131 fReady.broadcast(); 132 } 133 } 134 135 static void Loop(void* arg) { 136 ThreadPool* pool = (ThreadPool*)arg; 137 Work work; 138 while (true) { 139 { 140 AutoLock lock(&pool->fReady); 141 while (pool->fWork.isEmpty()) { 142 if (pool->fDraining) { 143 return; 144 } 145 pool->fReady.wait(); 146 } 147 pool->fWork.pop(&work); 148 } 149 work.fn(work.arg); 150 sk_atomic_dec(work.pending); // Release pairs with sk_acquire_load() in Wait(). 151 } 152 } 153 154 SkTDArray<Work> fWork; 155 SkTDArray<SkThread*> fThreads; 156 SkCondVar fReady; 157 bool fDraining; 158 159 static ThreadPool* gGlobal; 160 friend struct SkTaskGroup::Enabler; 161}; 162ThreadPool* ThreadPool::gGlobal = NULL; 163 164} // namespace 165 166SkTaskGroup::Enabler::Enabler(int threads) { 167 SkASSERT(ThreadPool::gGlobal == NULL); 168 if (threads != 0 && SkCondVar::Supported()) { 169 ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads)); 170 } 171} 172 173SkTaskGroup::Enabler::~Enabler() { 174 SkDELETE(ThreadPool::gGlobal); 175} 176 177SkTaskGroup::SkTaskGroup() : fPending(0) {} 178 179void SkTaskGroup::wait() { ThreadPool::Wait(&fPending); } 180void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPending); } 181void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, &fPending); } 182void SkTaskGroup::batch (void (*fn)(void*), void* args, int N, size_t stride) { 183 ThreadPool::Batch(fn, args, N, stride, &fPending); 184} 185 186