1#include "SkTaskGroup.h"
2
3#include "SkCondVar.h"
4#include "SkTDArray.h"
5#include "SkThread.h"
6#include "SkThreadUtils.h"
7
8#if defined(SK_BUILD_FOR_WIN32)
9    static inline int num_cores() {
10        SYSTEM_INFO sysinfo;
11        GetSystemInfo(&sysinfo);
12        return sysinfo.dwNumberOfProcessors;
13    }
14#else
15    #include <unistd.h>
16    static inline int num_cores() {
17        return (int) sysconf(_SC_NPROCESSORS_ONLN);
18    }
19#endif
20
21namespace {
22
23class ThreadPool : SkNoncopyable {
24public:
25    static void Add(SkRunnable* task, int32_t* pending) {
26        if (!gGlobal) {  // If we have no threads, run synchronously.
27            return task->run();
28        }
29        gGlobal->add(task, pending);
30    }
31
32    static void Wait(int32_t* pending) {
33        if (!gGlobal) {  // If we have no threads, the work must already be done.
34            SkASSERT(*pending == 0);
35            return;
36        }
37        while (sk_acquire_load(pending) > 0) {  // Pairs with sk_atomic_dec here or in Loop.
38            // Lend a hand until our SkTaskGroup of interest is done.
39            Work work;
40            {
41                AutoLock lock(&gGlobal->fReady);
42                if (gGlobal->fWork.isEmpty()) {
43                    // Someone has picked up all the work (including ours).  How nice of them!
44                    // (They may still be working on it, so we can't assert *pending == 0 here.)
45                    continue;
46                }
47                gGlobal->fWork.pop(&work);
48            }
49            // This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine.
50            // We threads gotta stick together.  We're always making forward progress.
51            work.task->run();
52            sk_atomic_dec(work.pending);  // Release pairs with the sk_acquire_load() just above.
53        }
54    }
55
56private:
57    struct AutoLock {
58        AutoLock(SkCondVar* c) : fC(c) { fC->lock(); }
59        ~AutoLock() { fC->unlock(); }
60    private:
61        SkCondVar* fC;
62    };
63
64    struct Work {
65        SkRunnable* task;  // A task to ->run(),
66        int32_t* pending;  // then sk_atomic_dec(pending) afterwards.
67    };
68
69    explicit ThreadPool(int threads) : fDraining(false) {
70        if (threads == 0) {
71            threads = num_cores();
72        }
73        for (int i = 0; i < threads; i++) {
74            fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this)));
75            fThreads.top()->start();
76        }
77    }
78
79    ~ThreadPool() {
80        SkASSERT(fWork.isEmpty());  // All SkTaskGroups should be destroyed by now.
81        {
82            AutoLock lock(&fReady);
83            fDraining = true;
84            fReady.broadcast();
85        }
86        for (int i = 0; i < fThreads.count(); i++) {
87            fThreads[i]->join();
88        }
89        SkASSERT(fWork.isEmpty());  // Can't hurt to double check.
90        fThreads.deleteAll();
91    }
92
93    void add(SkRunnable* task, int32_t* pending) {
94        Work work = { task, pending };
95        sk_atomic_inc(pending);  // No barrier needed.
96        {
97            AutoLock lock(&fReady);
98            fWork.push(work);
99            fReady.signal();
100        }
101    }
102
103    static void Loop(void* arg) {
104        ThreadPool* pool = (ThreadPool*)arg;
105        Work work;
106        while (true) {
107            {
108                AutoLock lock(&pool->fReady);
109                while (pool->fWork.isEmpty()) {
110                    if (pool->fDraining) {
111                        return;
112                    }
113                    pool->fReady.wait();
114                }
115                pool->fWork.pop(&work);
116            }
117            work.task->run();
118            sk_atomic_dec(work.pending);  // Release pairs with sk_acquire_load() in Wait().
119        }
120    }
121
122    SkTDArray<Work>      fWork;
123    SkTDArray<SkThread*> fThreads;
124    SkCondVar            fReady;
125    bool                 fDraining;
126
127    static ThreadPool* gGlobal;
128    friend struct SkTaskGroup::Enabler;
129};
130ThreadPool* ThreadPool::gGlobal = NULL;
131
132}  // namespace
133
134SkTaskGroup::Enabler::Enabler(int threads) {
135    SkASSERT(ThreadPool::gGlobal == NULL);
136    ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads));
137}
138
139SkTaskGroup::Enabler::~Enabler() {
140    SkASSERT(ThreadPool::gGlobal != NULL);
141    SkDELETE(ThreadPool::gGlobal);
142}
143
144SkTaskGroup::SkTaskGroup() : fPending(0) {}
145
146void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPending); }
147void SkTaskGroup::wait()                { ThreadPool::Wait(&fPending); }
148
149