ThreadPool.c revision 262059f71a68edc5e510427c63f5f1623d3672a8
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17/* ThreadPool */
18
19#include "sles_allinclusive.h"
20
21// Entry point for each worker thread
22
23static void *ThreadPool_start(void *context)
24{
25    ThreadPool *tp = (ThreadPool *) context;
26    assert(NULL != tp);
27    for (;;) {
28        Closure *pClosure = ThreadPool_remove(tp);
29        // closure is NULL when thread pool is being destroyed
30        if (NULL == pClosure)
31            break;
32        void (*handler)(void *, int);
33        handler = pClosure->mHandler;
34        void *context = pClosure->mContext;
35        int parameter = pClosure->mParameter;
36        free(pClosure);
37        assert(NULL != handler);
38        (*handler)(context, parameter);
39    }
40    return NULL;
41}
42
43#define INITIALIZED_NONE         0
44#define INITIALIZED_MUTEX        1
45#define INITIALIZED_CONDNOTFULL  2
46#define INITIALIZED_CONDNOTEMPTY 4
47#define INITIALIZED_ALL          7
48
49static void ThreadPool_deinit_internal(ThreadPool *tp, unsigned initialized, unsigned nThreads);
50
51// Initialize a ThreadPool
52// maxClosures defaults to CLOSURE_TYPICAL if 0
53// maxThreads defaults to THREAD_TYPICAL if 0
54
55SLresult ThreadPool_init(ThreadPool *tp, unsigned maxClosures, unsigned maxThreads)
56{
57    assert(NULL != tp);
58    memset(tp, 0, sizeof(ThreadPool));
59    tp->mShutdown = SL_BOOLEAN_FALSE;
60    unsigned initialized = INITIALIZED_NONE;    // which objects were successfully initialized
61    unsigned nThreads = 0;                      // number of threads successfully created
62    int err;
63    SLresult result;
64
65    // initialize mutex and condition variables
66    err = pthread_mutex_init(&tp->mMutex, (const pthread_mutexattr_t *) NULL);
67    result = err_to_result(err);
68    if (SL_RESULT_SUCCESS != result)
69        goto fail;
70    initialized |= INITIALIZED_MUTEX;
71    err = pthread_cond_init(&tp->mCondNotFull, (const pthread_condattr_t *) NULL);
72    result = err_to_result(err);
73    if (SL_RESULT_SUCCESS != result)
74        goto fail;
75    initialized |= INITIALIZED_CONDNOTFULL;
76    err = pthread_cond_init(&tp->mCondNotEmpty, (const pthread_condattr_t *) NULL);
77    result = err_to_result(err);
78    if (SL_RESULT_SUCCESS != result)
79        goto fail;
80    initialized |= INITIALIZED_CONDNOTEMPTY;
81
82    // use default values for parameters, if not specified explicitly
83    tp->mWaitingNotFull = 0;
84    tp->mWaitingNotEmpty = 0;
85    if (0 == maxClosures)
86        maxClosures = CLOSURE_TYPICAL;
87    tp->mMaxClosures = maxClosures;
88    if (0 == maxThreads)
89        maxThreads = THREAD_TYPICAL;
90    tp->mMaxThreads = maxThreads;
91
92    // initialize circular buffer for closures
93    if (CLOSURE_TYPICAL >= maxClosures) {
94        tp->mClosureArray = tp->mClosureTypical;
95    } else {
96        tp->mClosureArray = (Closure **) malloc((maxClosures + 1) * sizeof(Closure *));
97        if (NULL == tp->mClosureArray) {
98            result = SL_RESULT_RESOURCE_ERROR;
99            goto fail;
100        }
101    }
102    tp->mClosureFront = tp->mClosureArray;
103    tp->mClosureRear = tp->mClosureArray;
104
105    // initialize thread pool
106    if (THREAD_TYPICAL >= maxThreads) {
107        tp->mThreadArray = tp->mThreadTypical;
108    } else {
109        tp->mThreadArray = (pthread_t *) malloc(maxThreads * sizeof(pthread_t));
110        if (NULL == tp->mThreadArray) {
111            result = SL_RESULT_RESOURCE_ERROR;
112            goto fail;
113        }
114    }
115    unsigned i;
116    for (i = 0; i < maxThreads; ++i) {
117        int err = pthread_create(&tp->mThreadArray[i], (const pthread_attr_t *) NULL,
118            ThreadPool_start, tp);
119        result = err_to_result(err);
120        if (SL_RESULT_SUCCESS != result)
121            goto fail;
122        ++nThreads;
123    }
124    tp->mInitialized = initialized;
125
126    // done
127    return SL_RESULT_SUCCESS;
128
129    // here on any kind of error
130fail:
131    ThreadPool_deinit_internal(tp, initialized, nThreads);
132    return result;
133}
134
135static void ThreadPool_deinit_internal(ThreadPool *tp, unsigned initialized, unsigned nThreads)
136{
137    int ok;
138
139    assert(NULL != tp);
140    // Destroy all threads
141    if (0 < nThreads) {
142        assert(INITIALIZED_ALL == initialized);
143        ok = pthread_mutex_lock(&tp->mMutex);
144        assert(0 == ok);
145        tp->mShutdown = SL_BOOLEAN_TRUE;
146        ok = pthread_cond_broadcast(&tp->mCondNotEmpty);
147        assert(0 == ok);
148        ok = pthread_cond_broadcast(&tp->mCondNotFull);
149        assert(0 == ok);
150        ok = pthread_mutex_unlock(&tp->mMutex);
151        assert(0 == ok);
152        unsigned i;
153        for (i = 0; i < nThreads; ++i) {
154            ok = pthread_join(tp->mThreadArray[i], (void **) NULL);
155            assert(ok == 0);
156        }
157
158        // Empty out the circular buffer of closures
159        ok = pthread_mutex_lock(&tp->mMutex);
160        assert(0 == ok);
161        assert(0 == tp->mWaitingNotEmpty);
162        Closure **oldFront = tp->mClosureFront;
163        while (oldFront != tp->mClosureRear) {
164            Closure **newFront = oldFront;
165            if (++newFront == &tp->mClosureArray[tp->mMaxClosures + 1])
166                newFront = tp->mClosureArray;
167            Closure *pClosure = *oldFront;
168            assert(NULL != pClosure);
169            *oldFront = NULL;
170            tp->mClosureFront = newFront;
171            ok = pthread_mutex_unlock(&tp->mMutex);
172            assert(0 == ok);
173            free(pClosure);
174            ok = pthread_mutex_lock(&tp->mMutex);
175            assert(0 == ok);
176        }
177        ok = pthread_mutex_unlock(&tp->mMutex);
178        assert(0 == ok);
179        // Note that we can't be sure when mWaitingNotFull will drop to zero
180    }
181
182    // destroy the mutex and condition variables
183    if (initialized & INITIALIZED_CONDNOTEMPTY) {
184        ok = pthread_cond_destroy(&tp->mCondNotEmpty);
185        assert(0 == ok);
186    }
187    if (initialized & INITIALIZED_CONDNOTFULL) {
188        ok = pthread_cond_destroy(&tp->mCondNotFull);
189        assert(0 == ok);
190    }
191    if (initialized & INITIALIZED_MUTEX) {
192        ok = pthread_mutex_destroy(&tp->mMutex);
193        assert(0 == ok);
194    }
195    tp->mInitialized = INITIALIZED_NONE;
196
197    // release the closure circular buffer
198    if (tp->mClosureTypical != tp->mClosureArray && NULL != tp->mClosureArray) {
199        free(tp->mClosureArray);
200        tp->mClosureArray = NULL;
201    }
202
203    // release the thread pool
204    if (tp->mThreadTypical != tp->mThreadArray && NULL != tp->mThreadArray) {
205        free(tp->mThreadArray);
206        tp->mThreadArray = NULL;
207    }
208
209}
210
211void ThreadPool_deinit(ThreadPool *tp)
212{
213    ThreadPool_deinit_internal(tp, tp->mInitialized, tp->mMaxThreads);
214}
215
216// Enqueue a closure to be executed later by a worker thread
217SLresult ThreadPool_add(ThreadPool *tp, void (*handler)(void *, int), void *context, int parameter)
218{
219    assert(NULL != tp);
220    assert(NULL != handler);
221    Closure *closure = (Closure *) malloc(sizeof(Closure));
222    if (NULL == closure)
223        return SL_RESULT_RESOURCE_ERROR;
224    closure->mHandler = handler;
225    closure->mContext = context;
226    closure->mParameter = parameter;
227    int ok;
228    ok = pthread_mutex_lock(&tp->mMutex);
229    assert(0 == ok);
230    // can't enqueue while thread pool shutting down
231    if (tp->mShutdown) {
232        ok = pthread_mutex_unlock(&tp->mMutex);
233        assert(0 == ok);
234        free(closure);
235        return SL_RESULT_PRECONDITIONS_VIOLATED;
236    }
237    for (;;) {
238        Closure **oldRear = tp->mClosureRear;
239        Closure **newRear = oldRear;
240        if (++newRear == &tp->mClosureArray[tp->mMaxClosures + 1])
241            newRear = tp->mClosureArray;
242        // if closure circular buffer is full, then wait for it to become non-full
243        if (newRear == tp->mClosureFront) {
244            ++tp->mWaitingNotFull;
245            ok = pthread_cond_wait(&tp->mCondNotFull, &tp->mMutex);
246            assert(0 == ok);
247            // can't enqueue while thread pool shutting down
248            if (tp->mShutdown) {
249                assert(0 < tp->mWaitingNotFull);
250                --tp->mWaitingNotFull;
251                ok = pthread_mutex_unlock(&tp->mMutex);
252                assert(0 == ok);
253                free(closure);
254                return SL_RESULT_PRECONDITIONS_VIOLATED;
255            }
256            continue;
257        }
258        assert(NULL == *oldRear);
259        *oldRear = closure;
260        tp->mClosureRear = newRear;
261        // if a worker thread was waiting to dequeue, then suggest that it try again
262        if (0 < tp->mWaitingNotEmpty) {
263            --tp->mWaitingNotEmpty;
264            ok = pthread_cond_signal(&tp->mCondNotEmpty);
265            assert(0 == ok);
266        }
267        break;
268    }
269    ok = pthread_mutex_unlock(&tp->mMutex);
270    assert(0 == ok);
271    return SL_RESULT_SUCCESS;
272}
273
274// Called by a worker thread when it is ready to accept the next closure to execute
275Closure *ThreadPool_remove(ThreadPool *tp)
276{
277    Closure *pClosure;
278    int ok;
279    ok = pthread_mutex_lock(&tp->mMutex);
280    assert(0 == ok);
281    for (;;) {
282        Closure **oldFront = tp->mClosureFront;
283        // if closure circular buffer is empty, then wait for it to become non-empty
284        if (oldFront == tp->mClosureRear) {
285            ++tp->mWaitingNotEmpty;
286            ok = pthread_cond_wait(&tp->mCondNotEmpty, &tp->mMutex);
287            assert(0 == ok);
288            // fail if thread pool is shutting down
289            if (tp->mShutdown) {
290                assert(0 < tp->mWaitingNotEmpty);
291                --tp->mWaitingNotEmpty;
292                pClosure = NULL;
293                break;
294            }
295            // try again
296            continue;
297        }
298        // dequeue the closure at front of circular buffer
299        Closure **newFront = oldFront;
300        if (++newFront == &tp->mClosureArray[tp->mMaxClosures + 1])
301            newFront = tp->mClosureArray;
302        pClosure = *oldFront;
303        assert(NULL != pClosure);
304        *oldFront = NULL;
305        tp->mClosureFront = newFront;
306        // if a client thread was waiting to enqueue, then suggest that it try again
307        if (0 < tp->mWaitingNotFull) {
308            --tp->mWaitingNotFull;
309            ok = pthread_cond_signal(&tp->mCondNotFull);
310            assert(0 == ok);
311        }
312        break;
313    }
314    ok = pthread_mutex_unlock(&tp->mMutex);
315    assert(0 == ok);
316    return pClosure;
317}
318