ThreadPool.cpp revision d2a7f0d6883a6d3835642e7b282f05ed1c54fe63
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17/* ThreadPool */ 18 19#include "sles_allinclusive.h" 20 21// Entry point for each worker thread 22 23static void *ThreadPool_start(void *context) 24{ 25 ThreadPool *tp = (ThreadPool *) context; 26 assert(NULL != tp); 27 for (;;) { 28 Closure *pClosure = ThreadPool_remove(tp); 29 // closure is NULL when engine is being destroyed 30 if (NULL == pClosure) 31 break; 32 void (*handler)(Closure *); 33 handler = pClosure->mHandler; 34 assert(NULL != handler); 35 (*handler)(pClosure); 36 } 37 return NULL; 38} 39 40#define INITIALIZED_NONE 0 41#define INITIALIZED_MUTEX 1 42#define INITIALIZED_CONDNOTFULL 2 43#define INITIALIZED_CONDNOTEMPTY 4 44#define INITIALIZED_ALL 7 45 46static void ThreadPool_deinit_internal(ThreadPool *tp, unsigned initialized, unsigned nThreads); 47 48// Initialize a ThreadPool 49// maxClosures defaults to CLOSURE_TYPICAL if 0 50// maxThreads defaults to THREAD_TYPICAL if 0 51 52SLresult ThreadPool_init(ThreadPool *tp, unsigned maxClosures, unsigned maxThreads) 53{ 54 assert(NULL != tp); 55 memset(tp, 0, sizeof(ThreadPool)); 56 tp->mShutdown = SL_BOOLEAN_FALSE; 57 unsigned initialized = INITIALIZED_NONE; 58 unsigned nThreads = 0; 59 SLresult result; 60 result = err_to_result(pthread_mutex_init(&tp->mMutex, (const pthread_mutexattr_t *) NULL)); 61 if (SL_RESULT_SUCCESS != result) 62 goto fail; 63 initialized |= INITIALIZED_MUTEX; 64 result = err_to_result(pthread_cond_init(&tp->mCondNotFull, (const pthread_condattr_t *) NULL)); 65 if (SL_RESULT_SUCCESS != result) 66 goto fail; 67 initialized |= INITIALIZED_CONDNOTFULL; 68 result = err_to_result(pthread_cond_init(&tp->mCondNotEmpty, (const pthread_condattr_t *) NULL)); 69 if (SL_RESULT_SUCCESS != result) 70 goto fail; 71 initialized |= INITIALIZED_CONDNOTEMPTY; 72 tp->mWaitingNotFull = 0; 73 tp->mWaitingNotEmpty = 0; 74 if (0 == maxClosures) 75 maxClosures = CLOSURE_TYPICAL; 76 tp->mMaxClosures = maxClosures; 77 if (0 == maxThreads) 78 maxThreads = THREAD_TYPICAL; 79 tp->mMaxThreads = maxThreads; 80 if (CLOSURE_TYPICAL >= maxClosures) { 81 tp->mClosureArray = tp->mClosureTypical; 82 } else { 83 tp->mClosureArray = (Closure **) malloc((maxClosures + 1) * sizeof(Closure *)); 84 if (NULL == tp->mClosureArray) { 85 result = SL_RESULT_RESOURCE_ERROR; 86 goto fail; 87 } 88 } 89 tp->mClosureFront = tp->mClosureArray; 90 tp->mClosureRear = tp->mClosureArray; 91 if (THREAD_TYPICAL >= maxThreads) { 92 tp->mThreadArray = tp->mThreadTypical; 93 } else { 94 tp->mThreadArray = (pthread_t *) malloc(maxThreads * sizeof(pthread_t)); 95 if (NULL == tp->mThreadArray) { 96 result = SL_RESULT_RESOURCE_ERROR; 97 goto fail; 98 } 99 } 100 unsigned i; 101 for (i = 0; i < maxThreads; ++i) { 102 result = err_to_result(pthread_create(&tp->mThreadArray[i], (const pthread_attr_t *) NULL, ThreadPool_start, tp)); 103 if (SL_RESULT_SUCCESS != result) 104 goto fail; 105 ++nThreads; 106 } 107 tp->mInitialized = initialized; 108 return SL_RESULT_SUCCESS; 109fail: 110 ThreadPool_deinit_internal(tp, initialized, nThreads); 111 return result; 112} 113 114static void ThreadPool_deinit_internal(ThreadPool *tp, unsigned initialized, unsigned nThreads) 115{ 116 assert(NULL != tp); 117 // FIXME Cancel all pending operations 118 // Destroy all threads 119 if (0 < nThreads) { 120 int ok; 121 assert(INITIALIZED_ALL == initialized); 122 ok = pthread_mutex_lock(&tp->mMutex); 123 assert(0 == ok); 124 tp->mShutdown = SL_BOOLEAN_TRUE; 125 ok = pthread_cond_broadcast(&tp->mCondNotEmpty); 126 assert(0 == ok); 127 ok = pthread_cond_broadcast(&tp->mCondNotFull); 128 assert(0 == ok); 129 ok = pthread_mutex_unlock(&tp->mMutex); 130 assert(0 == ok); 131 unsigned i; 132 for (i = 0; i < nThreads; ++i) { 133 ok = pthread_join(tp->mThreadArray[i], (void **) NULL); 134 assert(ok == 0); 135 } 136 ok = pthread_mutex_lock(&tp->mMutex); 137 assert(0 == ok); 138 assert(0 == tp->mWaitingNotEmpty); 139 ok = pthread_mutex_unlock(&tp->mMutex); 140 assert(0 == ok); 141 // Note that we can't be sure when mWaitingNotFull will drop to zero 142 } 143 if (initialized & INITIALIZED_CONDNOTEMPTY) 144 (void) pthread_cond_destroy(&tp->mCondNotEmpty); 145 if (initialized & INITIALIZED_CONDNOTFULL) 146 (void) pthread_cond_destroy(&tp->mCondNotFull); 147 if (initialized & INITIALIZED_MUTEX) 148 (void) pthread_mutex_destroy(&tp->mMutex); 149 tp->mInitialized = INITIALIZED_NONE; 150 if (tp->mClosureTypical != tp->mClosureArray && NULL != tp->mClosureArray) { 151 free(tp->mClosureArray); 152 tp->mClosureArray = NULL; 153 } 154 if (tp->mThreadTypical != tp->mThreadArray && NULL != tp->mThreadArray) { 155 free(tp->mThreadArray); 156 tp->mThreadArray = NULL; 157 } 158} 159 160void ThreadPool_deinit(ThreadPool *tp) 161{ 162 ThreadPool_deinit_internal(tp, tp->mInitialized, tp->mMaxThreads); 163} 164 165bool ThreadPool_add(ThreadPool *tp, Closure *closure) 166{ 167 assert(NULL != tp); 168 assert(NULL != closure); 169 int ok; 170 ok = pthread_mutex_lock(&tp->mMutex); 171 assert(0 == ok); 172 for (;;) { 173 Closure **oldRear = tp->mClosureRear; 174 Closure **newRear = oldRear; 175 if (++newRear == &tp->mClosureArray[tp->mMaxClosures + 1]) 176 newRear = tp->mClosureArray; 177 if (newRear == tp->mClosureFront) { 178 ++tp->mWaitingNotFull; 179 ok = pthread_cond_wait(&tp->mCondNotFull, &tp->mMutex); 180 assert(0 == ok); 181 if (tp->mShutdown) { 182 assert(0 < tp->mWaitingNotFull); 183 --tp->mWaitingNotFull; 184 ok = pthread_mutex_unlock(&tp->mMutex); 185 assert(0 == ok); 186 return false; 187 } 188 continue; 189 } 190 assert(NULL == *oldRear); 191 *oldRear = closure; 192 if (0 < tp->mWaitingNotEmpty) { 193 --tp->mWaitingNotEmpty; 194 ok = pthread_cond_signal(&tp->mCondNotEmpty); 195 assert(0 == ok); 196 } 197 break; 198 } 199 ok = pthread_mutex_unlock(&tp->mMutex); 200 assert(0 == ok); 201 return true; 202} 203 204Closure *ThreadPool_remove(ThreadPool *tp) 205{ 206 Closure *pClosure; 207 int ok; 208 ok = pthread_mutex_lock(&tp->mMutex); 209 assert(0 == ok); 210 for (;;) { 211 Closure **oldFront = tp->mClosureFront; 212 if (oldFront == tp->mClosureRear) { 213 ++tp->mWaitingNotEmpty; 214 ok = pthread_cond_wait(&tp->mCondNotEmpty, &tp->mMutex); 215 assert(0 == ok); 216 if (tp->mShutdown) { 217 assert(0 < tp->mWaitingNotEmpty); 218 --tp->mWaitingNotEmpty; 219 pClosure = NULL; 220 break; 221 } 222 continue; 223 } 224 Closure **newFront = oldFront; 225 if (++newFront == &tp->mClosureArray[tp->mMaxClosures + 1]) 226 newFront = tp->mClosureArray; 227 pClosure = *oldFront; 228 tp->mClosureFront = newFront; 229 if (0 < tp->mWaitingNotFull) { 230 --tp->mWaitingNotFull; 231 ok = pthread_cond_signal(&tp->mCondNotFull); 232 assert(0 == ok); 233 } 234 break; 235 } 236 ok = pthread_mutex_unlock(&tp->mMutex); 237 assert(0 == ok); 238 return pClosure; 239} 240