1/*
2 * Copyright (C) 2015 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "WorkerPool.h"
18//#include <atomic>
19#include <stdlib.h>
20#include <unistd.h>
21#include <errno.h>
22#include <android/log.h>
23
24
25//static pthread_key_t gThreadTLSKey = 0;
26//static uint32_t gThreadTLSKeyCount = 0;
27//static pthread_mutex_t gInitMutex = PTHREAD_MUTEX_INITIALIZER;
28
29
30WorkerPool::Signal::Signal() {
31    mSet = true;
32}
33
34WorkerPool::Signal::~Signal() {
35    pthread_mutex_destroy(&mMutex);
36    pthread_cond_destroy(&mCondition);
37}
38
39bool WorkerPool::Signal::init() {
40    int status = pthread_mutex_init(&mMutex, NULL);
41    if (status) {
42        __android_log_print(ANDROID_LOG_INFO, "bench", "WorkerPool mutex init failure");
43        return false;
44    }
45
46    status = pthread_cond_init(&mCondition, NULL);
47    if (status) {
48        __android_log_print(ANDROID_LOG_INFO, "bench", "WorkerPool condition init failure");
49        pthread_mutex_destroy(&mMutex);
50        return false;
51    }
52
53    return true;
54}
55
56void WorkerPool::Signal::set() {
57    int status;
58
59    status = pthread_mutex_lock(&mMutex);
60    if (status) {
61        __android_log_print(ANDROID_LOG_INFO, "bench", "WorkerPool: error %i locking for set condition.", status);
62        return;
63    }
64
65    mSet = true;
66
67    status = pthread_cond_signal(&mCondition);
68    if (status) {
69        __android_log_print(ANDROID_LOG_INFO, "bench", "WorkerPool: error %i on set condition.", status);
70    }
71
72    status = pthread_mutex_unlock(&mMutex);
73    if (status) {
74        __android_log_print(ANDROID_LOG_INFO, "bench", "WorkerPool: error %i unlocking for set condition.", status);
75    }
76}
77
78bool WorkerPool::Signal::wait(uint64_t timeout) {
79    int status;
80    bool ret = false;
81
82    status = pthread_mutex_lock(&mMutex);
83    if (status) {
84        __android_log_print(ANDROID_LOG_INFO, "bench", "WorkerPool: error %i locking for condition.", status);
85        return false;
86    }
87
88    if (!mSet) {
89        if (!timeout) {
90            status = pthread_cond_wait(&mCondition, &mMutex);
91        } else {
92#if defined(HAVE_PTHREAD_COND_TIMEDWAIT_RELATIVE)
93            status = pthread_cond_timeout_np(&mCondition, &mMutex, timeout / 1000000);
94#else
95            // This is safe it will just make things less reponsive
96            status = pthread_cond_wait(&mCondition, &mMutex);
97#endif
98        }
99    }
100
101    if (!status) {
102        mSet = false;
103        ret = true;
104    } else {
105#ifndef RS_SERVER
106        if (status != ETIMEDOUT) {
107            __android_log_print(ANDROID_LOG_INFO, "bench", "WorkerPool: error %i waiting for condition.", status);
108        }
109#endif
110    }
111
112    status = pthread_mutex_unlock(&mMutex);
113    if (status) {
114        __android_log_print(ANDROID_LOG_INFO, "bench", "WorkerPool: error %i unlocking for condition.", status);
115    }
116
117    return ret;
118}
119
120
121
122WorkerPool::WorkerPool() {
123    mExit = false;
124    mRunningCount = 0;
125    mLaunchCount = 0;
126    mCount = 0;
127    mThreadId = NULL;
128    mNativeThreadId = NULL;
129    mLaunchSignals = NULL;
130    mLaunchCallback = NULL;
131
132
133}
134
135
136WorkerPool::~WorkerPool() {
137__android_log_print(ANDROID_LOG_INFO, "bench", "~wp");
138    mExit = true;
139    mLaunchData = NULL;
140    mLaunchCallback = NULL;
141    mRunningCount = mCount;
142
143    __sync_synchronize();
144    for (uint32_t ct = 0; ct < mCount; ct++) {
145        mLaunchSignals[ct].set();
146    }
147    void *res;
148    for (uint32_t ct = 0; ct < mCount; ct++) {
149        pthread_join(mThreadId[ct], &res);
150    }
151    //rsAssert(__sync_fetch_and_or(&mRunningCount, 0) == 0);
152    free(mThreadId);
153    free(mNativeThreadId);
154    delete[] mLaunchSignals;
155}
156
157bool WorkerPool::init(int threadCount) {
158    int cpu = sysconf(_SC_NPROCESSORS_CONF);
159    if (threadCount > 0) {
160        cpu = threadCount;
161    }
162    if (cpu < 1) {
163        return false;
164    }
165    mCount = (uint32_t)cpu;
166
167    __android_log_print(ANDROID_LOG_INFO, "Bench", "ThreadLaunch %i", mCount);
168
169    mThreadId = (pthread_t *) calloc(mCount, sizeof(pthread_t));
170    mNativeThreadId = (pid_t *) calloc(mCount, sizeof(pid_t));
171    mLaunchSignals = new Signal[mCount];
172    mLaunchCallback = NULL;
173
174    mCompleteSignal.init();
175    mRunningCount = mCount;
176    mLaunchCount = 0;
177    __sync_synchronize();
178
179    pthread_attr_t threadAttr;
180    int status = pthread_attr_init(&threadAttr);
181    if (status) {
182        __android_log_print(ANDROID_LOG_INFO, "bench", "Failed to init thread attribute.");
183        return false;
184    }
185
186    for (uint32_t ct=0; ct < mCount; ct++) {
187        status = pthread_create(&mThreadId[ct], &threadAttr, helperThreadProc, this);
188        if (status) {
189            mCount = ct;
190            __android_log_print(ANDROID_LOG_INFO, "bench", "Created fewer than expected number of threads.");
191            return false;
192        }
193    }
194    while (__sync_fetch_and_or(&mRunningCount, 0) != 0) {
195        usleep(100);
196    }
197
198    pthread_attr_destroy(&threadAttr);
199    return true;
200}
201
202void * WorkerPool::helperThreadProc(void *vwp) {
203    WorkerPool *wp = (WorkerPool *)vwp;
204
205    uint32_t idx = __sync_fetch_and_add(&wp->mLaunchCount, 1);
206
207    wp->mLaunchSignals[idx].init();
208    wp->mNativeThreadId[idx] = gettid();
209
210    while (!wp->mExit) {
211        wp->mLaunchSignals[idx].wait();
212        if (wp->mLaunchCallback) {
213           // idx +1 is used because the calling thread is always worker 0.
214           wp->mLaunchCallback(wp->mLaunchData, idx);
215        }
216        __sync_fetch_and_sub(&wp->mRunningCount, 1);
217        wp->mCompleteSignal.set();
218    }
219
220    //ALOGV("RS helperThread exited %p idx=%i", dc, idx);
221    return NULL;
222}
223
224
225void WorkerPool::waitForAll() const {
226}
227
228void WorkerPool::waitFor(uint64_t) const {
229}
230
231
232
233uint64_t WorkerPool::launchWork(WorkerCallback_t cb, void *usr, int maxThreads) {
234    //__android_log_print(ANDROID_LOG_INFO, "bench", "lw 1");
235    mLaunchData = usr;
236    mLaunchCallback = cb;
237
238    if (maxThreads < 1) {
239        maxThreads = mCount;
240    }
241    if ((uint32_t)maxThreads > mCount) {
242        //__android_log_print(ANDROID_LOG_INFO, "bench", "launchWork max > count", maxThreads, mCount);
243        maxThreads = mCount;
244    }
245
246    //__android_log_print(ANDROID_LOG_INFO, "bench", "lw 2  %i  %i  %i", maxThreads, mRunningCount, mCount);
247    mRunningCount = maxThreads;
248    __sync_synchronize();
249
250    for (int ct = 0; ct < maxThreads; ct++) {
251        mLaunchSignals[ct].set();
252    }
253
254    //__android_log_print(ANDROID_LOG_INFO, "bench", "lw 3    %i", mRunningCount);
255    while (__sync_fetch_and_or(&mRunningCount, 0) != 0) {
256        //__android_log_print(ANDROID_LOG_INFO, "bench", "lw 3.1    %i", mRunningCount);
257        mCompleteSignal.wait();
258    }
259
260    //__android_log_print(ANDROID_LOG_INFO, "bench", "lw 4    %i", mRunningCount);
261    return 0;
262
263}
264
265
266
267