SkTaskGroup.cpp revision 89889b69391a730f0ba2a1efb549864b7762263f
1#include "SkTaskGroup.h"
2
3#include "SkCondVar.h"
4#include "SkRunnable.h"
5#include "SkTDArray.h"
6#include "SkThread.h"
7#include "SkThreadUtils.h"
8
9#if defined(SK_BUILD_FOR_WIN32)
10    static inline int num_cores() {
11        SYSTEM_INFO sysinfo;
12        GetSystemInfo(&sysinfo);
13        return sysinfo.dwNumberOfProcessors;
14    }
15#else
16    #include <unistd.h>
17    static inline int num_cores() {
18        return (int) sysconf(_SC_NPROCESSORS_ONLN);
19    }
20#endif
21
22namespace {
23
24class ThreadPool : SkNoncopyable {
25public:
26    static void Add(SkRunnable* task, int32_t* pending) {
27        if (!gGlobal) {  // If we have no threads, run synchronously.
28            return task->run();
29        }
30        gGlobal->add(&CallRunnable, task, pending);
31    }
32
33    static void Add(void (*fn)(void*), void* arg, int32_t* pending) {
34        if (!gGlobal) {
35            return fn(arg);
36        }
37        gGlobal->add(fn, arg, pending);
38    }
39
40    static void Wait(int32_t* pending) {
41        if (!gGlobal) {  // If we have no threads, the work must already be done.
42            SkASSERT(*pending == 0);
43            return;
44        }
45        while (sk_acquire_load(pending) > 0) {  // Pairs with sk_atomic_dec here or in Loop.
46            // Lend a hand until our SkTaskGroup of interest is done.
47            Work work;
48            {
49                AutoLock lock(&gGlobal->fReady);
50                if (gGlobal->fWork.isEmpty()) {
51                    // Someone has picked up all the work (including ours).  How nice of them!
52                    // (They may still be working on it, so we can't assert *pending == 0 here.)
53                    continue;
54                }
55                gGlobal->fWork.pop(&work);
56            }
57            // This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine.
58            // We threads gotta stick together.  We're always making forward progress.
59            work.fn(work.arg);
60            sk_atomic_dec(work.pending);  // Release pairs with the sk_acquire_load() just above.
61        }
62    }
63
64private:
65    struct AutoLock {
66        AutoLock(SkCondVar* c) : fC(c) { fC->lock(); }
67        ~AutoLock() { fC->unlock(); }
68    private:
69        SkCondVar* fC;
70    };
71
72    static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run(); }
73
74    struct Work {
75        void (*fn)(void*);  // A function to call,
76        void* arg;          // its argument,
77        int32_t* pending;   // then sk_atomic_dec(pending) afterwards.
78    };
79
80    explicit ThreadPool(int threads) : fDraining(false) {
81        if (threads == -1) {
82            threads = num_cores();
83        }
84        for (int i = 0; i < threads; i++) {
85            fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this)));
86            fThreads.top()->start();
87        }
88    }
89
90    ~ThreadPool() {
91        SkASSERT(fWork.isEmpty());  // All SkTaskGroups should be destroyed by now.
92        {
93            AutoLock lock(&fReady);
94            fDraining = true;
95            fReady.broadcast();
96        }
97        for (int i = 0; i < fThreads.count(); i++) {
98            fThreads[i]->join();
99        }
100        SkASSERT(fWork.isEmpty());  // Can't hurt to double check.
101        fThreads.deleteAll();
102    }
103
104    void add(void (*fn)(void*), void* arg, int32_t* pending) {
105        Work work = { fn, arg, pending };
106        sk_atomic_inc(pending);  // No barrier needed.
107        {
108            AutoLock lock(&fReady);
109            fWork.push(work);
110            fReady.signal();
111        }
112    }
113
114    static void Loop(void* arg) {
115        ThreadPool* pool = (ThreadPool*)arg;
116        Work work;
117        while (true) {
118            {
119                AutoLock lock(&pool->fReady);
120                while (pool->fWork.isEmpty()) {
121                    if (pool->fDraining) {
122                        return;
123                    }
124                    pool->fReady.wait();
125                }
126                pool->fWork.pop(&work);
127            }
128            work.fn(work.arg);
129            sk_atomic_dec(work.pending);  // Release pairs with sk_acquire_load() in Wait().
130        }
131    }
132
133    SkTDArray<Work>      fWork;
134    SkTDArray<SkThread*> fThreads;
135    SkCondVar            fReady;
136    bool                 fDraining;
137
138    static ThreadPool* gGlobal;
139    friend struct SkTaskGroup::Enabler;
140};
141ThreadPool* ThreadPool::gGlobal = NULL;
142
143}  // namespace
144
145SkTaskGroup::Enabler::Enabler(int threads) {
146    SkASSERT(ThreadPool::gGlobal == NULL);
147    if (threads != 0) {
148        ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads));
149    }
150}
151
152SkTaskGroup::Enabler::~Enabler() {
153    SkDELETE(ThreadPool::gGlobal);
154}
155
156SkTaskGroup::SkTaskGroup() : fPending(0) {}
157
158void SkTaskGroup::add(SkRunnable* task)             { ThreadPool::Add(task, &fPending); }
159void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, &fPending); }
160void SkTaskGroup::wait()                            { ThreadPool::Wait(&fPending); }
161
162