ThreadPool.c revision d527933b750b107449263fff2c9d870edbfcc520
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17/* ThreadPool */
18
19#include "sles_allinclusive.h"
20
21// Entry point for each worker thread
22
23static void *ThreadPool_start(void *context)
24{
25    ThreadPool *tp = (ThreadPool *) context;
26    assert(NULL != tp);
27    for (;;) {
28        Closure *pClosure = ThreadPool_remove(tp);
29        // closure is NULL when thread pool is being destroyed
30        if (NULL == pClosure) {
31            break;
32        }
33        // make a copy of parameters, then free the parameters
34        const Closure closure = *pClosure;
35        free(pClosure);
36        // extract parameters and call the right method depending on kind
37        ClosureKind kind = closure.mKind;
38        void *context1 = closure.mContext1;
39        void *context2 = closure.mContext2;
40        int parameter1 = closure.mParameter1;
41        switch (kind) {
42          case CLOSURE_KIND_PPI:
43            {
44            ClosureHandler_ppi handler_ppi = closure.mHandler.mHandler_ppi;
45            assert(NULL != handler_ppi);
46            (*handler_ppi)(context1, context2, parameter1);
47            }
48            break;
49          case CLOSURE_KIND_PPII:
50            {
51            ClosureHandler_ppii handler_ppii = closure.mHandler.mHandler_ppii;
52            assert(NULL != handler_ppii);
53            int parameter2 = closure.mParameter2;
54            (*handler_ppii)(context1, context2, parameter1, parameter2);
55            }
56            break;
57          case CLOSURE_KIND_PIIPP:
58            {
59            ClosureHandler_piipp handler_piipp = closure.mHandler.mHandler_piipp;
60            assert(NULL != handler_piipp);
61            int parameter2 = closure.mParameter2;
62            void *context3 = closure.mContext3;
63            (*handler_piipp)(context1, parameter1, parameter2, context2, context3);
64            }
65            break;
66          default:
67            SL_LOGE("Unexpected callback kind %d", kind);
68            assert(false);
69            break;
70        }
71    }
72    return NULL;
73}
74
75#define INITIALIZED_NONE         0
76#define INITIALIZED_MUTEX        1
77#define INITIALIZED_CONDNOTFULL  2
78#define INITIALIZED_CONDNOTEMPTY 4
79#define INITIALIZED_ALL          7
80
81static void ThreadPool_deinit_internal(ThreadPool *tp, unsigned initialized, unsigned nThreads);
82
83// Initialize a ThreadPool
84// maxClosures defaults to CLOSURE_TYPICAL if 0
85// maxThreads defaults to THREAD_TYPICAL if 0
86
87SLresult ThreadPool_init(ThreadPool *tp, unsigned maxClosures, unsigned maxThreads)
88{
89    assert(NULL != tp);
90    memset(tp, 0, sizeof(ThreadPool));
91    tp->mShutdown = SL_BOOLEAN_FALSE;
92    unsigned initialized = INITIALIZED_NONE;    // which objects were successfully initialized
93    unsigned nThreads = 0;                      // number of threads successfully created
94    int err;
95    SLresult result;
96
97    // initialize mutex and condition variables
98    err = pthread_mutex_init(&tp->mMutex, (const pthread_mutexattr_t *) NULL);
99    result = err_to_result(err);
100    if (SL_RESULT_SUCCESS != result)
101        goto fail;
102    initialized |= INITIALIZED_MUTEX;
103    err = pthread_cond_init(&tp->mCondNotFull, (const pthread_condattr_t *) NULL);
104    result = err_to_result(err);
105    if (SL_RESULT_SUCCESS != result)
106        goto fail;
107    initialized |= INITIALIZED_CONDNOTFULL;
108    err = pthread_cond_init(&tp->mCondNotEmpty, (const pthread_condattr_t *) NULL);
109    result = err_to_result(err);
110    if (SL_RESULT_SUCCESS != result)
111        goto fail;
112    initialized |= INITIALIZED_CONDNOTEMPTY;
113
114    // use default values for parameters, if not specified explicitly
115    tp->mWaitingNotFull = 0;
116    tp->mWaitingNotEmpty = 0;
117    if (0 == maxClosures)
118        maxClosures = CLOSURE_TYPICAL;
119    tp->mMaxClosures = maxClosures;
120    if (0 == maxThreads)
121        maxThreads = THREAD_TYPICAL;
122    tp->mMaxThreads = maxThreads;
123
124    // initialize circular buffer for closures
125    if (CLOSURE_TYPICAL >= maxClosures) {
126        tp->mClosureArray = tp->mClosureTypical;
127    } else {
128        tp->mClosureArray = (Closure **) malloc((maxClosures + 1) * sizeof(Closure *));
129        if (NULL == tp->mClosureArray) {
130            result = SL_RESULT_RESOURCE_ERROR;
131            goto fail;
132        }
133    }
134    tp->mClosureFront = tp->mClosureArray;
135    tp->mClosureRear = tp->mClosureArray;
136
137    // initialize thread pool
138    if (THREAD_TYPICAL >= maxThreads) {
139        tp->mThreadArray = tp->mThreadTypical;
140    } else {
141        tp->mThreadArray = (pthread_t *) malloc(maxThreads * sizeof(pthread_t));
142        if (NULL == tp->mThreadArray) {
143            result = SL_RESULT_RESOURCE_ERROR;
144            goto fail;
145        }
146    }
147    unsigned i;
148    for (i = 0; i < maxThreads; ++i) {
149        int err = pthread_create(&tp->mThreadArray[i], (const pthread_attr_t *) NULL,
150            ThreadPool_start, tp);
151        result = err_to_result(err);
152        if (SL_RESULT_SUCCESS != result)
153            goto fail;
154        ++nThreads;
155    }
156    tp->mInitialized = initialized;
157
158    // done
159    return SL_RESULT_SUCCESS;
160
161    // here on any kind of error
162fail:
163    ThreadPool_deinit_internal(tp, initialized, nThreads);
164    return result;
165}
166
167static void ThreadPool_deinit_internal(ThreadPool *tp, unsigned initialized, unsigned nThreads)
168{
169    int ok;
170
171    assert(NULL != tp);
172    // Destroy all threads
173    if (0 < nThreads) {
174        assert(INITIALIZED_ALL == initialized);
175        ok = pthread_mutex_lock(&tp->mMutex);
176        assert(0 == ok);
177        tp->mShutdown = SL_BOOLEAN_TRUE;
178        ok = pthread_cond_broadcast(&tp->mCondNotEmpty);
179        assert(0 == ok);
180        ok = pthread_cond_broadcast(&tp->mCondNotFull);
181        assert(0 == ok);
182        ok = pthread_mutex_unlock(&tp->mMutex);
183        assert(0 == ok);
184        unsigned i;
185        for (i = 0; i < nThreads; ++i) {
186            ok = pthread_join(tp->mThreadArray[i], (void **) NULL);
187            assert(ok == 0);
188        }
189
190        // Empty out the circular buffer of closures
191        ok = pthread_mutex_lock(&tp->mMutex);
192        assert(0 == ok);
193        assert(0 == tp->mWaitingNotEmpty);
194        Closure **oldFront = tp->mClosureFront;
195        while (oldFront != tp->mClosureRear) {
196            Closure **newFront = oldFront;
197            if (++newFront == &tp->mClosureArray[tp->mMaxClosures + 1])
198                newFront = tp->mClosureArray;
199            Closure *pClosure = *oldFront;
200            assert(NULL != pClosure);
201            *oldFront = NULL;
202            tp->mClosureFront = newFront;
203            ok = pthread_mutex_unlock(&tp->mMutex);
204            assert(0 == ok);
205            free(pClosure);
206            ok = pthread_mutex_lock(&tp->mMutex);
207            assert(0 == ok);
208        }
209        ok = pthread_mutex_unlock(&tp->mMutex);
210        assert(0 == ok);
211        // Note that we can't be sure when mWaitingNotFull will drop to zero
212    }
213
214    // destroy the mutex and condition variables
215    if (initialized & INITIALIZED_CONDNOTEMPTY) {
216        ok = pthread_cond_destroy(&tp->mCondNotEmpty);
217        assert(0 == ok);
218    }
219    if (initialized & INITIALIZED_CONDNOTFULL) {
220        ok = pthread_cond_destroy(&tp->mCondNotFull);
221        assert(0 == ok);
222    }
223    if (initialized & INITIALIZED_MUTEX) {
224        ok = pthread_mutex_destroy(&tp->mMutex);
225        assert(0 == ok);
226    }
227    tp->mInitialized = INITIALIZED_NONE;
228
229    // release the closure circular buffer
230    if (tp->mClosureTypical != tp->mClosureArray && NULL != tp->mClosureArray) {
231        free(tp->mClosureArray);
232        tp->mClosureArray = NULL;
233    }
234
235    // release the thread pool
236    if (tp->mThreadTypical != tp->mThreadArray && NULL != tp->mThreadArray) {
237        free(tp->mThreadArray);
238        tp->mThreadArray = NULL;
239    }
240
241}
242
243void ThreadPool_deinit(ThreadPool *tp)
244{
245    ThreadPool_deinit_internal(tp, tp->mInitialized, tp->mMaxThreads);
246}
247
248// Enqueue a closure to be executed later by a worker thread.
249// Note that this raw interface requires an explicit "kind" and full parameter list.
250// There are convenience methods below that make this easier to use.
251SLresult ThreadPool_add(ThreadPool *tp, ClosureKind kind, ClosureHandler_generic handler,
252        void *context1, void *context2, void *context3, int parameter1, int parameter2)
253{
254    assert(NULL != tp);
255    assert(NULL != handler);
256    Closure *closure = (Closure *) malloc(sizeof(Closure));
257    if (NULL == closure) {
258        return SL_RESULT_RESOURCE_ERROR;
259    }
260    closure->mKind = kind;
261    switch(kind) {
262      case CLOSURE_KIND_PPI:
263        closure->mHandler.mHandler_ppi = (ClosureHandler_ppi)handler;
264        break;
265      case CLOSURE_KIND_PPII:
266        closure->mHandler.mHandler_ppii = (ClosureHandler_ppii)handler;
267        break;
268      case CLOSURE_KIND_PIIPP:
269        closure->mHandler.mHandler_piipp = (ClosureHandler_piipp)handler;
270        break;
271      default:
272        SL_LOGE("ThreadPool_add() invalid closure kind %d", kind);
273        assert(false);
274    }
275    closure->mContext1 = context1;
276    closure->mContext2 = context2;
277    closure->mContext3 = context3;
278    closure->mParameter1 = parameter1;
279    closure->mParameter2 = parameter2;
280    int ok;
281    ok = pthread_mutex_lock(&tp->mMutex);
282    assert(0 == ok);
283    // can't enqueue while thread pool shutting down
284    if (tp->mShutdown) {
285        ok = pthread_mutex_unlock(&tp->mMutex);
286        assert(0 == ok);
287        free(closure);
288        return SL_RESULT_PRECONDITIONS_VIOLATED;
289    }
290    for (;;) {
291        Closure **oldRear = tp->mClosureRear;
292        Closure **newRear = oldRear;
293        if (++newRear == &tp->mClosureArray[tp->mMaxClosures + 1])
294            newRear = tp->mClosureArray;
295        // if closure circular buffer is full, then wait for it to become non-full
296        if (newRear == tp->mClosureFront) {
297            ++tp->mWaitingNotFull;
298            ok = pthread_cond_wait(&tp->mCondNotFull, &tp->mMutex);
299            assert(0 == ok);
300            // can't enqueue while thread pool shutting down
301            if (tp->mShutdown) {
302                assert(0 < tp->mWaitingNotFull);
303                --tp->mWaitingNotFull;
304                ok = pthread_mutex_unlock(&tp->mMutex);
305                assert(0 == ok);
306                free(closure);
307                return SL_RESULT_PRECONDITIONS_VIOLATED;
308            }
309            continue;
310        }
311        assert(NULL == *oldRear);
312        *oldRear = closure;
313        tp->mClosureRear = newRear;
314        // if a worker thread was waiting to dequeue, then suggest that it try again
315        if (0 < tp->mWaitingNotEmpty) {
316            --tp->mWaitingNotEmpty;
317            ok = pthread_cond_signal(&tp->mCondNotEmpty);
318            assert(0 == ok);
319        }
320        break;
321    }
322    ok = pthread_mutex_unlock(&tp->mMutex);
323    assert(0 == ok);
324    return SL_RESULT_SUCCESS;
325}
326
327// Called by a worker thread when it is ready to accept the next closure to execute
328Closure *ThreadPool_remove(ThreadPool *tp)
329{
330    Closure *pClosure;
331    int ok;
332    ok = pthread_mutex_lock(&tp->mMutex);
333    assert(0 == ok);
334    for (;;) {
335        // fail if thread pool is shutting down
336        if (tp->mShutdown) {
337            pClosure = NULL;
338            break;
339        }
340        Closure **oldFront = tp->mClosureFront;
341        // if closure circular buffer is empty, then wait for it to become non-empty
342        if (oldFront == tp->mClosureRear) {
343            ++tp->mWaitingNotEmpty;
344            ok = pthread_cond_wait(&tp->mCondNotEmpty, &tp->mMutex);
345            assert(0 == ok);
346            // try again
347            continue;
348        }
349        // dequeue the closure at front of circular buffer
350        Closure **newFront = oldFront;
351        if (++newFront == &tp->mClosureArray[tp->mMaxClosures + 1]) {
352            newFront = tp->mClosureArray;
353        }
354        pClosure = *oldFront;
355        assert(NULL != pClosure);
356        *oldFront = NULL;
357        tp->mClosureFront = newFront;
358        // if a client thread was waiting to enqueue, then suggest that it try again
359        if (0 < tp->mWaitingNotFull) {
360            --tp->mWaitingNotFull;
361            ok = pthread_cond_signal(&tp->mCondNotFull);
362            assert(0 == ok);
363        }
364        break;
365    }
366    ok = pthread_mutex_unlock(&tp->mMutex);
367    assert(0 == ok);
368    return pClosure;
369}
370
371// Convenience methods for applications
372SLresult ThreadPool_add_ppi(ThreadPool *tp, ClosureHandler_ppi handler,
373        void *context1, void *context2, int parameter1)
374{
375    // function pointers are the same size so this is a safe cast
376    return ThreadPool_add(tp, CLOSURE_KIND_PPI, (ClosureHandler_generic) handler,
377            context1, context2, NULL, parameter1, 0);
378}
379
380SLresult ThreadPool_add_ppii(ThreadPool *tp, ClosureHandler_ppii handler,
381        void *context1, void *context2, int parameter1, int parameter2)
382{
383    // function pointers are the same size so this is a safe cast
384    return ThreadPool_add(tp, CLOSURE_KIND_PPII, (ClosureHandler_generic) handler,
385            context1, context2, NULL, parameter1, parameter2);
386}
387
388SLresult ThreadPool_add_piipp(ThreadPool *tp, ClosureHandler_piipp handler,
389        void *cntxt1, int param1, int param2, void *cntxt2, void *cntxt3)
390{
391    // function pointers are the same size so this is a safe cast
392    return ThreadPool_add(tp, CLOSURE_KIND_PIIPP, (ClosureHandler_generic) handler,
393            cntxt1, cntxt2, cntxt3, param1, param2);
394}
395