1/* 2 * Copyright (C) 2015 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#include "WorkerPool.h" 18//#include <atomic> 19#include <stdlib.h> 20#include <unistd.h> 21#include <errno.h> 22#include <android/log.h> 23 24 25//static pthread_key_t gThreadTLSKey = 0; 26//static uint32_t gThreadTLSKeyCount = 0; 27//static pthread_mutex_t gInitMutex = PTHREAD_MUTEX_INITIALIZER; 28 29 30WorkerPool::Signal::Signal() { 31 mSet = true; 32} 33 34WorkerPool::Signal::~Signal() { 35 pthread_mutex_destroy(&mMutex); 36 pthread_cond_destroy(&mCondition); 37} 38 39bool WorkerPool::Signal::init() { 40 int status = pthread_mutex_init(&mMutex, NULL); 41 if (status) { 42 __android_log_print(ANDROID_LOG_INFO, "bench", "WorkerPool mutex init failure"); 43 return false; 44 } 45 46 status = pthread_cond_init(&mCondition, NULL); 47 if (status) { 48 __android_log_print(ANDROID_LOG_INFO, "bench", "WorkerPool condition init failure"); 49 pthread_mutex_destroy(&mMutex); 50 return false; 51 } 52 53 return true; 54} 55 56void WorkerPool::Signal::set() { 57 int status; 58 59 status = pthread_mutex_lock(&mMutex); 60 if (status) { 61 __android_log_print(ANDROID_LOG_INFO, "bench", "WorkerPool: error %i locking for set condition.", status); 62 return; 63 } 64 65 mSet = true; 66 67 status = pthread_cond_signal(&mCondition); 68 if (status) { 69 __android_log_print(ANDROID_LOG_INFO, "bench", "WorkerPool: error %i on set condition.", status); 70 } 71 72 status = pthread_mutex_unlock(&mMutex); 73 if (status) { 74 __android_log_print(ANDROID_LOG_INFO, "bench", "WorkerPool: error %i unlocking for set condition.", status); 75 } 76} 77 78bool WorkerPool::Signal::wait(uint64_t timeout) { 79 int status; 80 bool ret = false; 81 82 status = pthread_mutex_lock(&mMutex); 83 if (status) { 84 __android_log_print(ANDROID_LOG_INFO, "bench", "WorkerPool: error %i locking for condition.", status); 85 return false; 86 } 87 88 if (!mSet) { 89 if (!timeout) { 90 status = pthread_cond_wait(&mCondition, &mMutex); 91 } else { 92#if defined(HAVE_PTHREAD_COND_TIMEDWAIT_RELATIVE) 93 status = pthread_cond_timeout_np(&mCondition, &mMutex, timeout / 1000000); 94#else 95 // This is safe it will just make things less reponsive 96 status = pthread_cond_wait(&mCondition, &mMutex); 97#endif 98 } 99 } 100 101 if (!status) { 102 mSet = false; 103 ret = true; 104 } else { 105#ifndef RS_SERVER 106 if (status != ETIMEDOUT) { 107 __android_log_print(ANDROID_LOG_INFO, "bench", "WorkerPool: error %i waiting for condition.", status); 108 } 109#endif 110 } 111 112 status = pthread_mutex_unlock(&mMutex); 113 if (status) { 114 __android_log_print(ANDROID_LOG_INFO, "bench", "WorkerPool: error %i unlocking for condition.", status); 115 } 116 117 return ret; 118} 119 120 121 122WorkerPool::WorkerPool() { 123 mExit = false; 124 mRunningCount = 0; 125 mLaunchCount = 0; 126 mCount = 0; 127 mThreadId = NULL; 128 mNativeThreadId = NULL; 129 mLaunchSignals = NULL; 130 mLaunchCallback = NULL; 131 132 133} 134 135 136WorkerPool::~WorkerPool() { 137__android_log_print(ANDROID_LOG_INFO, "bench", "~wp"); 138 mExit = true; 139 mLaunchData = NULL; 140 mLaunchCallback = NULL; 141 mRunningCount = mCount; 142 143 __sync_synchronize(); 144 for (uint32_t ct = 0; ct < mCount; ct++) { 145 mLaunchSignals[ct].set(); 146 } 147 void *res; 148 for (uint32_t ct = 0; ct < mCount; ct++) { 149 pthread_join(mThreadId[ct], &res); 150 } 151 //rsAssert(__sync_fetch_and_or(&mRunningCount, 0) == 0); 152 free(mThreadId); 153 free(mNativeThreadId); 154 delete[] mLaunchSignals; 155} 156 157bool WorkerPool::init(int threadCount) { 158 int cpu = sysconf(_SC_NPROCESSORS_CONF); 159 if (threadCount > 0) { 160 cpu = threadCount; 161 } 162 if (cpu < 1) { 163 return false; 164 } 165 mCount = (uint32_t)cpu; 166 167 __android_log_print(ANDROID_LOG_INFO, "Bench", "ThreadLaunch %i", mCount); 168 169 mThreadId = (pthread_t *) calloc(mCount, sizeof(pthread_t)); 170 mNativeThreadId = (pid_t *) calloc(mCount, sizeof(pid_t)); 171 mLaunchSignals = new Signal[mCount]; 172 mLaunchCallback = NULL; 173 174 mCompleteSignal.init(); 175 mRunningCount = mCount; 176 mLaunchCount = 0; 177 __sync_synchronize(); 178 179 pthread_attr_t threadAttr; 180 int status = pthread_attr_init(&threadAttr); 181 if (status) { 182 __android_log_print(ANDROID_LOG_INFO, "bench", "Failed to init thread attribute."); 183 return false; 184 } 185 186 for (uint32_t ct=0; ct < mCount; ct++) { 187 status = pthread_create(&mThreadId[ct], &threadAttr, helperThreadProc, this); 188 if (status) { 189 mCount = ct; 190 __android_log_print(ANDROID_LOG_INFO, "bench", "Created fewer than expected number of threads."); 191 return false; 192 } 193 } 194 while (__sync_fetch_and_or(&mRunningCount, 0) != 0) { 195 usleep(100); 196 } 197 198 pthread_attr_destroy(&threadAttr); 199 return true; 200} 201 202void * WorkerPool::helperThreadProc(void *vwp) { 203 WorkerPool *wp = (WorkerPool *)vwp; 204 205 uint32_t idx = __sync_fetch_and_add(&wp->mLaunchCount, 1); 206 207 wp->mLaunchSignals[idx].init(); 208 wp->mNativeThreadId[idx] = gettid(); 209 210 while (!wp->mExit) { 211 wp->mLaunchSignals[idx].wait(); 212 if (wp->mLaunchCallback) { 213 // idx +1 is used because the calling thread is always worker 0. 214 wp->mLaunchCallback(wp->mLaunchData, idx); 215 } 216 __sync_fetch_and_sub(&wp->mRunningCount, 1); 217 wp->mCompleteSignal.set(); 218 } 219 220 //ALOGV("RS helperThread exited %p idx=%i", dc, idx); 221 return NULL; 222} 223 224 225void WorkerPool::waitForAll() const { 226} 227 228void WorkerPool::waitFor(uint64_t) const { 229} 230 231 232 233uint64_t WorkerPool::launchWork(WorkerCallback_t cb, void *usr, int maxThreads) { 234 //__android_log_print(ANDROID_LOG_INFO, "bench", "lw 1"); 235 mLaunchData = usr; 236 mLaunchCallback = cb; 237 238 if (maxThreads < 1) { 239 maxThreads = mCount; 240 } 241 if ((uint32_t)maxThreads > mCount) { 242 //__android_log_print(ANDROID_LOG_INFO, "bench", "launchWork max > count", maxThreads, mCount); 243 maxThreads = mCount; 244 } 245 246 //__android_log_print(ANDROID_LOG_INFO, "bench", "lw 2 %i %i %i", maxThreads, mRunningCount, mCount); 247 mRunningCount = maxThreads; 248 __sync_synchronize(); 249 250 for (int ct = 0; ct < maxThreads; ct++) { 251 mLaunchSignals[ct].set(); 252 } 253 254 //__android_log_print(ANDROID_LOG_INFO, "bench", "lw 3 %i", mRunningCount); 255 while (__sync_fetch_and_or(&mRunningCount, 0) != 0) { 256 //__android_log_print(ANDROID_LOG_INFO, "bench", "lw 3.1 %i", mRunningCount); 257 mCompleteSignal.wait(); 258 } 259 260 //__android_log_print(ANDROID_LOG_INFO, "bench", "lw 4 %i", mRunningCount); 261 return 0; 262 263} 264 265 266 267