ThreadPool.c revision 157ad9aa0b0a8709e14439c1b6555ef68d0731b8
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17/* ThreadPool */
18
19#include "sles_allinclusive.h"
20
21// Entry point for each worker thread
22
23static void *ThreadPool_start(void *context)
24{
25    ThreadPool *tp = (ThreadPool *) context;
26    assert(NULL != tp);
27    for (;;) {
28        Closure *pClosure = ThreadPool_remove(tp);
29        // closure is NULL when thread pool is being destroyed
30        if (NULL == pClosure) {
31            break;
32        }
33        // make a copy of parameters, then free the parameters
34        const Closure closure = *pClosure;
35        free(pClosure);
36        // extract parameters and call the right method depending on kind
37        ClosureKind kind = closure.mKind;
38        void *context1 = closure.mContext1;
39        void *context2 = closure.mContext2;
40        int parameter1 = closure.mParameter1;
41        switch (kind) {
42          case CLOSURE_KIND_PPI:
43            {
44            ClosureHandler_ppi handler_ppi = closure.mHandler.mHandler_ppi;
45            assert(NULL != handler_ppi);
46            (*handler_ppi)(context1, context2, parameter1);
47            }
48            break;
49          case CLOSURE_KIND_PPII:
50            {
51            ClosureHandler_ppii handler_ppii = closure.mHandler.mHandler_ppii;
52            assert(NULL != handler_ppii);
53            int parameter2 = closure.mParameter2;
54            (*handler_ppii)(context1, context2, parameter1, parameter2);
55            }
56            break;
57          case CLOSURE_KIND_PIIPP:
58            {
59            ClosureHandler_piipp handler_piipp = closure.mHandler.mHandler_piipp;
60            assert(NULL != handler_piipp);
61            int parameter2 = closure.mParameter2;
62            void *context3 = closure.mContext3;
63            (*handler_piipp)(context1, parameter1, parameter2, context2, context3);
64            }
65            break;
66          default:
67            SL_LOGE("Unexpected callback kind %d", kind);
68            assert(false);
69            break;
70        }
71    }
72    return NULL;
73}
74
75#define INITIALIZED_NONE         0
76#define INITIALIZED_MUTEX        1
77#define INITIALIZED_CONDNOTFULL  2
78#define INITIALIZED_CONDNOTEMPTY 4
79#define INITIALIZED_ALL          7
80
81static void ThreadPool_deinit_internal(ThreadPool *tp, unsigned initialized, unsigned nThreads);
82
83// Initialize a ThreadPool
84// maxClosures defaults to CLOSURE_TYPICAL if 0
85// maxThreads defaults to THREAD_TYPICAL if 0
86
87SLresult ThreadPool_init(ThreadPool *tp, unsigned maxClosures, unsigned maxThreads)
88{
89    assert(NULL != tp);
90    memset(tp, 0, sizeof(ThreadPool));
91    tp->mShutdown = SL_BOOLEAN_FALSE;
92    unsigned initialized = INITIALIZED_NONE;    // which objects were successfully initialized
93    unsigned nThreads = 0;                      // number of threads successfully created
94    int err;
95    SLresult result;
96
97    // initialize mutex and condition variables
98    err = pthread_mutex_init(&tp->mMutex, (const pthread_mutexattr_t *) NULL);
99    result = err_to_result(err);
100    if (SL_RESULT_SUCCESS != result)
101        goto fail;
102    initialized |= INITIALIZED_MUTEX;
103    err = pthread_cond_init(&tp->mCondNotFull, (const pthread_condattr_t *) NULL);
104    result = err_to_result(err);
105    if (SL_RESULT_SUCCESS != result)
106        goto fail;
107    initialized |= INITIALIZED_CONDNOTFULL;
108    err = pthread_cond_init(&tp->mCondNotEmpty, (const pthread_condattr_t *) NULL);
109    result = err_to_result(err);
110    if (SL_RESULT_SUCCESS != result)
111        goto fail;
112    initialized |= INITIALIZED_CONDNOTEMPTY;
113
114    // use default values for parameters, if not specified explicitly
115    tp->mWaitingNotFull = 0;
116    tp->mWaitingNotEmpty = 0;
117    if (0 == maxClosures)
118        maxClosures = CLOSURE_TYPICAL;
119    tp->mMaxClosures = maxClosures;
120    if (0 == maxThreads)
121        maxThreads = THREAD_TYPICAL;
122    tp->mMaxThreads = maxThreads;
123
124    // initialize circular buffer for closures
125    if (CLOSURE_TYPICAL >= maxClosures) {
126        tp->mClosureArray = tp->mClosureTypical;
127    } else {
128        tp->mClosureArray = (Closure **) malloc((maxClosures + 1) * sizeof(Closure *));
129        if (NULL == tp->mClosureArray) {
130            result = SL_RESULT_RESOURCE_ERROR;
131            goto fail;
132        }
133    }
134    tp->mClosureFront = tp->mClosureArray;
135    tp->mClosureRear = tp->mClosureArray;
136
137    // initialize thread pool
138    if (THREAD_TYPICAL >= maxThreads) {
139        tp->mThreadArray = tp->mThreadTypical;
140    } else {
141        tp->mThreadArray = (pthread_t *) malloc(maxThreads * sizeof(pthread_t));
142        if (NULL == tp->mThreadArray) {
143            result = SL_RESULT_RESOURCE_ERROR;
144            goto fail;
145        }
146    }
147    unsigned i;
148    for (i = 0; i < maxThreads; ++i) {
149        int err = pthread_create(&tp->mThreadArray[i], (const pthread_attr_t *) NULL,
150            ThreadPool_start, tp);
151        result = err_to_result(err);
152        if (SL_RESULT_SUCCESS != result)
153            goto fail;
154        ++nThreads;
155    }
156    tp->mInitialized = initialized;
157
158    // done
159    return SL_RESULT_SUCCESS;
160
161    // here on any kind of error
162fail:
163    ThreadPool_deinit_internal(tp, initialized, nThreads);
164    return result;
165}
166
167static void ThreadPool_deinit_internal(ThreadPool *tp, unsigned initialized, unsigned nThreads)
168{
169    int ok;
170
171    assert(NULL != tp);
172    // Destroy all threads
173    if (0 < nThreads) {
174        assert(INITIALIZED_ALL == initialized);
175        ok = pthread_mutex_lock(&tp->mMutex);
176        assert(0 == ok);
177        tp->mShutdown = SL_BOOLEAN_TRUE;
178        ok = pthread_cond_broadcast(&tp->mCondNotEmpty);
179        assert(0 == ok);
180        ok = pthread_cond_broadcast(&tp->mCondNotFull);
181        assert(0 == ok);
182        ok = pthread_mutex_unlock(&tp->mMutex);
183        assert(0 == ok);
184        unsigned i;
185        for (i = 0; i < nThreads; ++i) {
186            ok = pthread_join(tp->mThreadArray[i], (void **) NULL);
187            assert(ok == 0);
188        }
189
190        // Empty out the circular buffer of closures
191        ok = pthread_mutex_lock(&tp->mMutex);
192        assert(0 == ok);
193        Closure **oldFront = tp->mClosureFront;
194        while (oldFront != tp->mClosureRear) {
195            Closure **newFront = oldFront;
196            if (++newFront == &tp->mClosureArray[tp->mMaxClosures + 1])
197                newFront = tp->mClosureArray;
198            Closure *pClosure = *oldFront;
199            assert(NULL != pClosure);
200            *oldFront = NULL;
201            tp->mClosureFront = newFront;
202            ok = pthread_mutex_unlock(&tp->mMutex);
203            assert(0 == ok);
204            free(pClosure);
205            ok = pthread_mutex_lock(&tp->mMutex);
206            assert(0 == ok);
207        }
208        ok = pthread_mutex_unlock(&tp->mMutex);
209        assert(0 == ok);
210        // Note that we can't be sure when mWaitingNotFull will drop to zero
211    }
212
213    // destroy the mutex and condition variables
214    if (initialized & INITIALIZED_CONDNOTEMPTY) {
215        ok = pthread_cond_destroy(&tp->mCondNotEmpty);
216        assert(0 == ok);
217    }
218    if (initialized & INITIALIZED_CONDNOTFULL) {
219        ok = pthread_cond_destroy(&tp->mCondNotFull);
220        assert(0 == ok);
221    }
222    if (initialized & INITIALIZED_MUTEX) {
223        ok = pthread_mutex_destroy(&tp->mMutex);
224        assert(0 == ok);
225    }
226    tp->mInitialized = INITIALIZED_NONE;
227
228    // release the closure circular buffer
229    if (tp->mClosureTypical != tp->mClosureArray && NULL != tp->mClosureArray) {
230        free(tp->mClosureArray);
231        tp->mClosureArray = NULL;
232    }
233
234    // release the thread pool
235    if (tp->mThreadTypical != tp->mThreadArray && NULL != tp->mThreadArray) {
236        free(tp->mThreadArray);
237        tp->mThreadArray = NULL;
238    }
239
240}
241
242void ThreadPool_deinit(ThreadPool *tp)
243{
244    ThreadPool_deinit_internal(tp, tp->mInitialized, tp->mMaxThreads);
245}
246
247// Enqueue a closure to be executed later by a worker thread.
248// Note that this raw interface requires an explicit "kind" and full parameter list.
249// There are convenience methods below that make this easier to use.
250SLresult ThreadPool_add(ThreadPool *tp, ClosureKind kind, ClosureHandler_generic handler,
251        void *context1, void *context2, void *context3, int parameter1, int parameter2)
252{
253    assert(NULL != tp);
254    assert(NULL != handler);
255    Closure *closure = (Closure *) malloc(sizeof(Closure));
256    if (NULL == closure) {
257        return SL_RESULT_RESOURCE_ERROR;
258    }
259    closure->mKind = kind;
260    switch(kind) {
261      case CLOSURE_KIND_PPI:
262        closure->mHandler.mHandler_ppi = (ClosureHandler_ppi)handler;
263        break;
264      case CLOSURE_KIND_PPII:
265        closure->mHandler.mHandler_ppii = (ClosureHandler_ppii)handler;
266        break;
267      case CLOSURE_KIND_PIIPP:
268        closure->mHandler.mHandler_piipp = (ClosureHandler_piipp)handler;
269        break;
270      default:
271        SL_LOGE("ThreadPool_add() invalid closure kind %d", kind);
272        assert(false);
273    }
274    closure->mContext1 = context1;
275    closure->mContext2 = context2;
276    closure->mContext3 = context3;
277    closure->mParameter1 = parameter1;
278    closure->mParameter2 = parameter2;
279    int ok;
280    ok = pthread_mutex_lock(&tp->mMutex);
281    assert(0 == ok);
282    // can't enqueue while thread pool shutting down
283    if (tp->mShutdown) {
284        ok = pthread_mutex_unlock(&tp->mMutex);
285        assert(0 == ok);
286        free(closure);
287        return SL_RESULT_PRECONDITIONS_VIOLATED;
288    }
289    for (;;) {
290        Closure **oldRear = tp->mClosureRear;
291        Closure **newRear = oldRear;
292        if (++newRear == &tp->mClosureArray[tp->mMaxClosures + 1])
293            newRear = tp->mClosureArray;
294        // if closure circular buffer is full, then wait for it to become non-full
295        if (newRear == tp->mClosureFront) {
296            ++tp->mWaitingNotFull;
297            ok = pthread_cond_wait(&tp->mCondNotFull, &tp->mMutex);
298            assert(0 == ok);
299            // can't enqueue while thread pool shutting down
300            if (tp->mShutdown) {
301                assert(0 < tp->mWaitingNotFull);
302                --tp->mWaitingNotFull;
303                ok = pthread_mutex_unlock(&tp->mMutex);
304                assert(0 == ok);
305                free(closure);
306                return SL_RESULT_PRECONDITIONS_VIOLATED;
307            }
308            continue;
309        }
310        assert(NULL == *oldRear);
311        *oldRear = closure;
312        tp->mClosureRear = newRear;
313        // if a worker thread was waiting to dequeue, then suggest that it try again
314        if (0 < tp->mWaitingNotEmpty) {
315            --tp->mWaitingNotEmpty;
316            ok = pthread_cond_signal(&tp->mCondNotEmpty);
317            assert(0 == ok);
318        }
319        break;
320    }
321    ok = pthread_mutex_unlock(&tp->mMutex);
322    assert(0 == ok);
323    return SL_RESULT_SUCCESS;
324}
325
326// Called by a worker thread when it is ready to accept the next closure to execute
327Closure *ThreadPool_remove(ThreadPool *tp)
328{
329    Closure *pClosure;
330    int ok;
331    ok = pthread_mutex_lock(&tp->mMutex);
332    assert(0 == ok);
333    for (;;) {
334        // fail if thread pool is shutting down
335        if (tp->mShutdown) {
336            pClosure = NULL;
337            break;
338        }
339        Closure **oldFront = tp->mClosureFront;
340        // if closure circular buffer is empty, then wait for it to become non-empty
341        if (oldFront == tp->mClosureRear) {
342            ++tp->mWaitingNotEmpty;
343            ok = pthread_cond_wait(&tp->mCondNotEmpty, &tp->mMutex);
344            assert(0 == ok);
345            // try again
346            continue;
347        }
348        // dequeue the closure at front of circular buffer
349        Closure **newFront = oldFront;
350        if (++newFront == &tp->mClosureArray[tp->mMaxClosures + 1]) {
351            newFront = tp->mClosureArray;
352        }
353        pClosure = *oldFront;
354        assert(NULL != pClosure);
355        *oldFront = NULL;
356        tp->mClosureFront = newFront;
357        // if a client thread was waiting to enqueue, then suggest that it try again
358        if (0 < tp->mWaitingNotFull) {
359            --tp->mWaitingNotFull;
360            ok = pthread_cond_signal(&tp->mCondNotFull);
361            assert(0 == ok);
362        }
363        break;
364    }
365    ok = pthread_mutex_unlock(&tp->mMutex);
366    assert(0 == ok);
367    return pClosure;
368}
369
370// Convenience methods for applications
371SLresult ThreadPool_add_ppi(ThreadPool *tp, ClosureHandler_ppi handler,
372        void *context1, void *context2, int parameter1)
373{
374    // function pointers are the same size so this is a safe cast
375    return ThreadPool_add(tp, CLOSURE_KIND_PPI, (ClosureHandler_generic) handler,
376            context1, context2, NULL, parameter1, 0);
377}
378
379SLresult ThreadPool_add_ppii(ThreadPool *tp, ClosureHandler_ppii handler,
380        void *context1, void *context2, int parameter1, int parameter2)
381{
382    // function pointers are the same size so this is a safe cast
383    return ThreadPool_add(tp, CLOSURE_KIND_PPII, (ClosureHandler_generic) handler,
384            context1, context2, NULL, parameter1, parameter2);
385}
386
387SLresult ThreadPool_add_piipp(ThreadPool *tp, ClosureHandler_piipp handler,
388        void *cntxt1, int param1, int param2, void *cntxt2, void *cntxt3)
389{
390    // function pointers are the same size so this is a safe cast
391    return ThreadPool_add(tp, CLOSURE_KIND_PIIPP, (ClosureHandler_generic) handler,
392            cntxt1, cntxt2, cntxt3, param1, param2);
393}
394