ThreadPool.cpp revision 4597a7427b697df31d0bbf4c2040806d0c27b6e0
1d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten/* 2d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten * Copyright (C) 2010 The Android Open Source Project 3d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten * 4d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten * Licensed under the Apache License, Version 2.0 (the "License"); 5d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten * you may not use this file except in compliance with the License. 6d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten * You may obtain a copy of the License at 7d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten * 8d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten * http://www.apache.org/licenses/LICENSE-2.0 9d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten * 10d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten * Unless required by applicable law or agreed to in writing, software 11d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten * distributed under the License is distributed on an "AS IS" BASIS, 12d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten * See the License for the specific language governing permissions and 14d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten * limitations under the License. 15d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten */ 16d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten 17d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten/* ThreadPool */ 18d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten 19d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten#include "sles_allinclusive.h" 20d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten 21d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten// Entry point for each worker thread 22d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten 23d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kastenstatic void *ThreadPool_start(void *context) 24d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten{ 25d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten ThreadPool *tp = (ThreadPool *) context; 26d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten assert(NULL != tp); 27d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten for (;;) { 28d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten Closure *pClosure = ThreadPool_remove(tp); 29510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten // closure is NULL when thread pool is being destroyed 30d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten if (NULL == pClosure) 31d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten break; 32510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten void (*handler)(void *, int); 33d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten handler = pClosure->mHandler; 34510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten void *context = pClosure->mContext; 35510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten int parameter = pClosure->mParameter; 36510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten free(pClosure); 37d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten assert(NULL != handler); 38510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten (*handler)(context, parameter); 39d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten } 40d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten return NULL; 41d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten} 42d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten 43d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten#define INITIALIZED_NONE 0 44d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten#define INITIALIZED_MUTEX 1 45d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten#define INITIALIZED_CONDNOTFULL 2 46d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten#define INITIALIZED_CONDNOTEMPTY 4 47d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten#define INITIALIZED_ALL 7 48d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten 49d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kastenstatic void ThreadPool_deinit_internal(ThreadPool *tp, unsigned initialized, unsigned nThreads); 50d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten 51d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten// Initialize a ThreadPool 52d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten// maxClosures defaults to CLOSURE_TYPICAL if 0 53d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten// maxThreads defaults to THREAD_TYPICAL if 0 54d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten 55d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn KastenSLresult ThreadPool_init(ThreadPool *tp, unsigned maxClosures, unsigned maxThreads) 56d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten{ 57d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten assert(NULL != tp); 58d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten memset(tp, 0, sizeof(ThreadPool)); 59d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten tp->mShutdown = SL_BOOLEAN_FALSE; 60510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten unsigned initialized = INITIALIZED_NONE; // which objects were successfully initialized 61510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten unsigned nThreads = 0; // number of threads successfully created 62510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten int err; 63d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten SLresult result; 64510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten 65510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten // initialize mutex and condition variables 66510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten err = pthread_mutex_init(&tp->mMutex, (const pthread_mutexattr_t *) NULL); 67510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten result = err_to_result(err); 68d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten if (SL_RESULT_SUCCESS != result) 69d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten goto fail; 70d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten initialized |= INITIALIZED_MUTEX; 71510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten err = pthread_cond_init(&tp->mCondNotFull, (const pthread_condattr_t *) NULL); 72510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten result = err_to_result(err); 73d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten if (SL_RESULT_SUCCESS != result) 74d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten goto fail; 75d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten initialized |= INITIALIZED_CONDNOTFULL; 76510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten err = pthread_cond_init(&tp->mCondNotEmpty, (const pthread_condattr_t *) NULL); 77510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten result = err_to_result(err); 78d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten if (SL_RESULT_SUCCESS != result) 79d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten goto fail; 80d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten initialized |= INITIALIZED_CONDNOTEMPTY; 81510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten 82510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten // use default values for parameters, if not specified explicitly 83d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten tp->mWaitingNotFull = 0; 84d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten tp->mWaitingNotEmpty = 0; 85d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten if (0 == maxClosures) 86d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten maxClosures = CLOSURE_TYPICAL; 87d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten tp->mMaxClosures = maxClosures; 88d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten if (0 == maxThreads) 89d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten maxThreads = THREAD_TYPICAL; 90d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten tp->mMaxThreads = maxThreads; 91510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten 92510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten // initialize circular buffer for closures 93d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten if (CLOSURE_TYPICAL >= maxClosures) { 94d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten tp->mClosureArray = tp->mClosureTypical; 95d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten } else { 96d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten tp->mClosureArray = (Closure **) malloc((maxClosures + 1) * sizeof(Closure *)); 97d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten if (NULL == tp->mClosureArray) { 98d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten result = SL_RESULT_RESOURCE_ERROR; 99d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten goto fail; 100d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten } 101d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten } 102d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten tp->mClosureFront = tp->mClosureArray; 103d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten tp->mClosureRear = tp->mClosureArray; 104510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten 105510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten // initialize thread pool 106d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten if (THREAD_TYPICAL >= maxThreads) { 107d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten tp->mThreadArray = tp->mThreadTypical; 108d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten } else { 109d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten tp->mThreadArray = (pthread_t *) malloc(maxThreads * sizeof(pthread_t)); 110d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten if (NULL == tp->mThreadArray) { 111d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten result = SL_RESULT_RESOURCE_ERROR; 112d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten goto fail; 113d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten } 114d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten } 115d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten unsigned i; 116d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten for (i = 0; i < maxThreads; ++i) { 117510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten int err = pthread_create(&tp->mThreadArray[i], (const pthread_attr_t *) NULL, 118510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten ThreadPool_start, tp); 119510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten result = err_to_result(err); 120d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten if (SL_RESULT_SUCCESS != result) 121d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten goto fail; 122d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten ++nThreads; 123d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten } 124d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten tp->mInitialized = initialized; 125510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten 126510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten // done 127d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten return SL_RESULT_SUCCESS; 128510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten 129510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten // here on any kind of error 130d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kastenfail: 131d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten ThreadPool_deinit_internal(tp, initialized, nThreads); 132d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten return result; 133d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten} 134d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten 135d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kastenstatic void ThreadPool_deinit_internal(ThreadPool *tp, unsigned initialized, unsigned nThreads) 136d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten{ 1374597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten int ok; 1384597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten 139d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten assert(NULL != tp); 140d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten // Destroy all threads 141d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten if (0 < nThreads) { 142d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten assert(INITIALIZED_ALL == initialized); 143d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten ok = pthread_mutex_lock(&tp->mMutex); 144d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten assert(0 == ok); 145d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten tp->mShutdown = SL_BOOLEAN_TRUE; 146d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten ok = pthread_cond_broadcast(&tp->mCondNotEmpty); 147d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten assert(0 == ok); 148d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten ok = pthread_cond_broadcast(&tp->mCondNotFull); 149d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten assert(0 == ok); 150d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten ok = pthread_mutex_unlock(&tp->mMutex); 151d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten assert(0 == ok); 152d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten unsigned i; 153d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten for (i = 0; i < nThreads; ++i) { 154d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten ok = pthread_join(tp->mThreadArray[i], (void **) NULL); 155d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten assert(ok == 0); 156d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten } 157510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten 158510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten // Empty out the circular buffer of closures 159d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten ok = pthread_mutex_lock(&tp->mMutex); 160d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten assert(0 == ok); 161d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten assert(0 == tp->mWaitingNotEmpty); 162510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten Closure **oldFront = tp->mClosureFront; 163510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten while (oldFront != tp->mClosureRear) { 164510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten Closure **newFront = oldFront; 165510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten if (++newFront == &tp->mClosureArray[tp->mMaxClosures + 1]) 166510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten newFront = tp->mClosureArray; 167510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten Closure *pClosure = *oldFront; 168510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten assert(NULL != pClosure); 169510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten *oldFront = NULL; 170510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten tp->mClosureFront = newFront; 171510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten ok = pthread_mutex_unlock(&tp->mMutex); 172510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten assert(0 == ok); 173510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten free(pClosure); 174510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten ok = pthread_mutex_lock(&tp->mMutex); 175510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten assert(0 == ok); 176510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten } 177d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten ok = pthread_mutex_unlock(&tp->mMutex); 178d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten assert(0 == ok); 179d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten // Note that we can't be sure when mWaitingNotFull will drop to zero 180d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten } 181510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten 182510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten // destroy the mutex and condition variables 1834597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten if (initialized & INITIALIZED_CONDNOTEMPTY) { 1844597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten ok = pthread_cond_destroy(&tp->mCondNotEmpty); 1854597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten assert(0 == ok); 1864597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten } 1874597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten if (initialized & INITIALIZED_CONDNOTFULL) { 1884597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten ok = pthread_cond_destroy(&tp->mCondNotFull); 1894597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten assert(0 == ok); 1904597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten } 1914597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten if (initialized & INITIALIZED_MUTEX) { 1924597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten ok = pthread_mutex_destroy(&tp->mMutex); 1934597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten assert(0 == ok); 1944597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten } 195d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten tp->mInitialized = INITIALIZED_NONE; 196510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten 197510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten // release the closure circular buffer 198d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten if (tp->mClosureTypical != tp->mClosureArray && NULL != tp->mClosureArray) { 199d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten free(tp->mClosureArray); 200d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten tp->mClosureArray = NULL; 201d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten } 202510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten 203510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten // release the thread pool 204d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten if (tp->mThreadTypical != tp->mThreadArray && NULL != tp->mThreadArray) { 205d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten free(tp->mThreadArray); 206d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten tp->mThreadArray = NULL; 207d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten } 208510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten 209d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten} 210d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten 211d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kastenvoid ThreadPool_deinit(ThreadPool *tp) 212d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten{ 213d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten ThreadPool_deinit_internal(tp, tp->mInitialized, tp->mMaxThreads); 214d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten} 215d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten 216510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten// Enqueue a closure to be executed later by a worker thread 217510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn KastenSLresult ThreadPool_add(ThreadPool *tp, void (*handler)(void *, int), void *context, int parameter) 218d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten{ 219d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten assert(NULL != tp); 220510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten assert(NULL != handler); 221510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten Closure *closure = (Closure *) malloc(sizeof(Closure)); 222510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten if (NULL == closure) 223510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten return SL_RESULT_RESOURCE_ERROR; 224510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten closure->mHandler = handler; 225510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten closure->mContext = context; 226510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten closure->mParameter = parameter; 227d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten int ok; 228d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten ok = pthread_mutex_lock(&tp->mMutex); 229d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten assert(0 == ok); 230510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten // can't enqueue while thread pool shutting down 231510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten if (tp->mShutdown) { 232510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten ok = pthread_mutex_unlock(&tp->mMutex); 233510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten assert(0 == ok); 234510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten free(closure); 235510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten return SL_RESULT_PRECONDITIONS_VIOLATED; 236510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten } 237d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten for (;;) { 238d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten Closure **oldRear = tp->mClosureRear; 239d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten Closure **newRear = oldRear; 240d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten if (++newRear == &tp->mClosureArray[tp->mMaxClosures + 1]) 241d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten newRear = tp->mClosureArray; 242510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten // if closure circular buffer is full, then wait for it to become non-full 243d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten if (newRear == tp->mClosureFront) { 244d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten ++tp->mWaitingNotFull; 245d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten ok = pthread_cond_wait(&tp->mCondNotFull, &tp->mMutex); 246d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten assert(0 == ok); 247510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten // can't enqueue while thread pool shutting down 248d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten if (tp->mShutdown) { 249d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten assert(0 < tp->mWaitingNotFull); 250d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten --tp->mWaitingNotFull; 251d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten ok = pthread_mutex_unlock(&tp->mMutex); 252d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten assert(0 == ok); 253510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten free(closure); 254510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten return SL_RESULT_PRECONDITIONS_VIOLATED; 255d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten } 256d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten continue; 257d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten } 258d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten assert(NULL == *oldRear); 259d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten *oldRear = closure; 260d4b099b83a686a7bb88f5e36c063f5efe623b56dGlenn Kasten tp->mClosureRear = newRear; 261510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten // if a worker thread was waiting to dequeue, then suggest that it try again 262d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten if (0 < tp->mWaitingNotEmpty) { 263d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten --tp->mWaitingNotEmpty; 264d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten ok = pthread_cond_signal(&tp->mCondNotEmpty); 265d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten assert(0 == ok); 266d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten } 267d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten break; 268d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten } 269d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten ok = pthread_mutex_unlock(&tp->mMutex); 270d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten assert(0 == ok); 271510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten return SL_RESULT_SUCCESS; 272d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten} 273d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten 274510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten// Called by a worker thread when it is ready to accept the next closure to execute 275d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn KastenClosure *ThreadPool_remove(ThreadPool *tp) 276d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten{ 277d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten Closure *pClosure; 278d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten int ok; 279d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten ok = pthread_mutex_lock(&tp->mMutex); 280d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten assert(0 == ok); 281d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten for (;;) { 282d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten Closure **oldFront = tp->mClosureFront; 283510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten // if closure circular buffer is empty, then wait for it to become non-empty 284d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten if (oldFront == tp->mClosureRear) { 285d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten ++tp->mWaitingNotEmpty; 286d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten ok = pthread_cond_wait(&tp->mCondNotEmpty, &tp->mMutex); 287d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten assert(0 == ok); 288510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten // fail if thread pool is shutting down 289d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten if (tp->mShutdown) { 290d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten assert(0 < tp->mWaitingNotEmpty); 291d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten --tp->mWaitingNotEmpty; 292d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten pClosure = NULL; 293d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten break; 294d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten } 295510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten // try again 296d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten continue; 297d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten } 298510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten // dequeue the closure at front of circular buffer 299d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten Closure **newFront = oldFront; 300d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten if (++newFront == &tp->mClosureArray[tp->mMaxClosures + 1]) 301d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten newFront = tp->mClosureArray; 302d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten pClosure = *oldFront; 303510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten assert(NULL != pClosure); 304510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten *oldFront = NULL; 305d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten tp->mClosureFront = newFront; 306510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten // if a client thread was waiting to enqueue, then suggest that it try again 307d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten if (0 < tp->mWaitingNotFull) { 308d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten --tp->mWaitingNotFull; 309d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten ok = pthread_cond_signal(&tp->mCondNotFull); 310d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten assert(0 == ok); 311d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten } 312d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten break; 313d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten } 314d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten ok = pthread_mutex_unlock(&tp->mMutex); 315d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten assert(0 == ok); 316d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten return pClosure; 317d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten} 318