1#include "SkTaskGroup.h"
2
3#include "SkCondVar.h"
4#include "SkRunnable.h"
5#include "SkTDArray.h"
6#include "SkThread.h"
7#include "SkThreadUtils.h"
8
9#if defined(SK_BUILD_FOR_WIN32)
10    static inline int num_cores() {
11        SYSTEM_INFO sysinfo;
12        GetSystemInfo(&sysinfo);
13        return sysinfo.dwNumberOfProcessors;
14    }
15#else
16    #include <unistd.h>
17    static inline int num_cores() {
18        return (int) sysconf(_SC_NPROCESSORS_ONLN);
19    }
20#endif
21
22namespace {
23
24class ThreadPool : SkNoncopyable {
25public:
26    static void Add(SkRunnable* task, int32_t* pending) {
27        if (!gGlobal) {  // If we have no threads, run synchronously.
28            return task->run();
29        }
30        gGlobal->add(&CallRunnable, task, pending);
31    }
32
33    static void Add(void (*fn)(void*), void* arg, int32_t* pending) {
34        if (!gGlobal) {
35            return fn(arg);
36        }
37        gGlobal->add(fn, arg, pending);
38    }
39
40    static void Batch(void (*fn)(void*), void* args, int N, size_t stride, int32_t* pending) {
41        if (!gGlobal) {
42            for (int i = 0; i < N; i++) { fn((char*)args + i*stride); }
43            return;
44        }
45        gGlobal->batch(fn, args, N, stride, pending);
46    }
47
48    static void Wait(int32_t* pending) {
49        if (!gGlobal) {  // If we have no threads, the work must already be done.
50            SkASSERT(*pending == 0);
51            return;
52        }
53        while (sk_acquire_load(pending) > 0) {  // Pairs with sk_atomic_dec here or in Loop.
54            // Lend a hand until our SkTaskGroup of interest is done.
55            Work work;
56            {
57                AutoLock lock(&gGlobal->fReady);
58                if (gGlobal->fWork.isEmpty()) {
59                    // Someone has picked up all the work (including ours).  How nice of them!
60                    // (They may still be working on it, so we can't assert *pending == 0 here.)
61                    continue;
62                }
63                gGlobal->fWork.pop(&work);
64            }
65            // This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine.
66            // We threads gotta stick together.  We're always making forward progress.
67            work.fn(work.arg);
68            sk_atomic_dec(work.pending);  // Release pairs with the sk_acquire_load() just above.
69        }
70    }
71
72private:
73    struct AutoLock {
74        AutoLock(SkCondVar* c) : fC(c) { fC->lock(); }
75        ~AutoLock() { fC->unlock(); }
76    private:
77        SkCondVar* fC;
78    };
79
80    static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run(); }
81
82    struct Work {
83        void (*fn)(void*);  // A function to call,
84        void* arg;          // its argument,
85        int32_t* pending;   // then sk_atomic_dec(pending) afterwards.
86    };
87
88    explicit ThreadPool(int threads) : fDraining(false) {
89        if (threads == -1) {
90            threads = num_cores();
91        }
92        for (int i = 0; i < threads; i++) {
93            fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this)));
94            fThreads.top()->start();
95        }
96    }
97
98    ~ThreadPool() {
99        SkASSERT(fWork.isEmpty());  // All SkTaskGroups should be destroyed by now.
100        {
101            AutoLock lock(&fReady);
102            fDraining = true;
103            fReady.broadcast();
104        }
105        for (int i = 0; i < fThreads.count(); i++) {
106            fThreads[i]->join();
107        }
108        SkASSERT(fWork.isEmpty());  // Can't hurt to double check.
109        fThreads.deleteAll();
110    }
111
112    void add(void (*fn)(void*), void* arg, int32_t* pending) {
113        Work work = { fn, arg, pending };
114        sk_atomic_inc(pending);  // No barrier needed.
115        {
116            AutoLock lock(&fReady);
117            fWork.push(work);
118            fReady.signal();
119        }
120    }
121
122    void batch(void (*fn)(void*), void* arg, int N, size_t stride, int32_t* pending) {
123        sk_atomic_add(pending, N);  // No barrier needed.
124        {
125            AutoLock lock(&fReady);
126            Work* batch = fWork.append(N);
127            for (int i = 0; i < N; i++) {
128                Work work = { fn, (char*)arg + i*stride, pending };
129                batch[i] = work;
130            }
131            fReady.broadcast();
132        }
133    }
134
135    static void Loop(void* arg) {
136        ThreadPool* pool = (ThreadPool*)arg;
137        Work work;
138        while (true) {
139            {
140                AutoLock lock(&pool->fReady);
141                while (pool->fWork.isEmpty()) {
142                    if (pool->fDraining) {
143                        return;
144                    }
145                    pool->fReady.wait();
146                }
147                pool->fWork.pop(&work);
148            }
149            work.fn(work.arg);
150            sk_atomic_dec(work.pending);  // Release pairs with sk_acquire_load() in Wait().
151        }
152    }
153
154    SkTDArray<Work>      fWork;
155    SkTDArray<SkThread*> fThreads;
156    SkCondVar            fReady;
157    bool                 fDraining;
158
159    static ThreadPool* gGlobal;
160    friend struct SkTaskGroup::Enabler;
161};
162ThreadPool* ThreadPool::gGlobal = NULL;
163
164}  // namespace
165
166SkTaskGroup::Enabler::Enabler(int threads) {
167    SkASSERT(ThreadPool::gGlobal == NULL);
168    if (threads != 0 && SkCondVar::Supported()) {
169        ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads));
170    }
171}
172
173SkTaskGroup::Enabler::~Enabler() {
174    SkDELETE(ThreadPool::gGlobal);
175}
176
177SkTaskGroup::SkTaskGroup() : fPending(0) {}
178
179void SkTaskGroup::wait()                            { ThreadPool::Wait(&fPending); }
180void SkTaskGroup::add(SkRunnable* task)             { ThreadPool::Add(task, &fPending); }
181void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, &fPending); }
182void SkTaskGroup::batch (void (*fn)(void*), void* args, int N, size_t stride) {
183    ThreadPool::Batch(fn, args, N, stride, &fPending);
184}
185
186