ThreadPool.cpp revision d2a7f0d6883a6d3835642e7b282f05ed1c54fe63
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17/* ThreadPool */
18
19#include "sles_allinclusive.h"
20
21// Entry point for each worker thread
22
23static void *ThreadPool_start(void *context)
24{
25    ThreadPool *tp = (ThreadPool *) context;
26    assert(NULL != tp);
27    for (;;) {
28        Closure *pClosure = ThreadPool_remove(tp);
29        // closure is NULL when engine is being destroyed
30        if (NULL == pClosure)
31            break;
32        void (*handler)(Closure *);
33        handler = pClosure->mHandler;
34        assert(NULL != handler);
35        (*handler)(pClosure);
36    }
37    return NULL;
38}
39
40#define INITIALIZED_NONE         0
41#define INITIALIZED_MUTEX        1
42#define INITIALIZED_CONDNOTFULL  2
43#define INITIALIZED_CONDNOTEMPTY 4
44#define INITIALIZED_ALL          7
45
46static void ThreadPool_deinit_internal(ThreadPool *tp, unsigned initialized, unsigned nThreads);
47
48// Initialize a ThreadPool
49// maxClosures defaults to CLOSURE_TYPICAL if 0
50// maxThreads defaults to THREAD_TYPICAL if 0
51
52SLresult ThreadPool_init(ThreadPool *tp, unsigned maxClosures, unsigned maxThreads)
53{
54    assert(NULL != tp);
55    memset(tp, 0, sizeof(ThreadPool));
56    tp->mShutdown = SL_BOOLEAN_FALSE;
57    unsigned initialized = INITIALIZED_NONE;
58    unsigned nThreads = 0;
59    SLresult result;
60    result = err_to_result(pthread_mutex_init(&tp->mMutex, (const pthread_mutexattr_t *) NULL));
61    if (SL_RESULT_SUCCESS != result)
62        goto fail;
63    initialized |= INITIALIZED_MUTEX;
64    result = err_to_result(pthread_cond_init(&tp->mCondNotFull, (const pthread_condattr_t *) NULL));
65    if (SL_RESULT_SUCCESS != result)
66        goto fail;
67    initialized |= INITIALIZED_CONDNOTFULL;
68    result = err_to_result(pthread_cond_init(&tp->mCondNotEmpty, (const pthread_condattr_t *) NULL));
69    if (SL_RESULT_SUCCESS != result)
70        goto fail;
71    initialized |= INITIALIZED_CONDNOTEMPTY;
72    tp->mWaitingNotFull = 0;
73    tp->mWaitingNotEmpty = 0;
74    if (0 == maxClosures)
75        maxClosures = CLOSURE_TYPICAL;
76    tp->mMaxClosures = maxClosures;
77    if (0 == maxThreads)
78        maxThreads = THREAD_TYPICAL;
79    tp->mMaxThreads = maxThreads;
80    if (CLOSURE_TYPICAL >= maxClosures) {
81        tp->mClosureArray = tp->mClosureTypical;
82    } else {
83        tp->mClosureArray = (Closure **) malloc((maxClosures + 1) * sizeof(Closure *));
84        if (NULL == tp->mClosureArray) {
85            result = SL_RESULT_RESOURCE_ERROR;
86            goto fail;
87        }
88    }
89    tp->mClosureFront = tp->mClosureArray;
90    tp->mClosureRear = tp->mClosureArray;
91    if (THREAD_TYPICAL >= maxThreads) {
92        tp->mThreadArray = tp->mThreadTypical;
93    } else {
94        tp->mThreadArray = (pthread_t *) malloc(maxThreads * sizeof(pthread_t));
95        if (NULL == tp->mThreadArray) {
96            result = SL_RESULT_RESOURCE_ERROR;
97            goto fail;
98        }
99    }
100    unsigned i;
101    for (i = 0; i < maxThreads; ++i) {
102        result = err_to_result(pthread_create(&tp->mThreadArray[i], (const pthread_attr_t *) NULL, ThreadPool_start, tp));
103        if (SL_RESULT_SUCCESS != result)
104            goto fail;
105        ++nThreads;
106    }
107    tp->mInitialized = initialized;
108    return SL_RESULT_SUCCESS;
109fail:
110    ThreadPool_deinit_internal(tp, initialized, nThreads);
111    return result;
112}
113
114static void ThreadPool_deinit_internal(ThreadPool *tp, unsigned initialized, unsigned nThreads)
115{
116    assert(NULL != tp);
117    // FIXME Cancel all pending operations
118    // Destroy all threads
119    if (0 < nThreads) {
120        int ok;
121        assert(INITIALIZED_ALL == initialized);
122        ok = pthread_mutex_lock(&tp->mMutex);
123        assert(0 == ok);
124        tp->mShutdown = SL_BOOLEAN_TRUE;
125        ok = pthread_cond_broadcast(&tp->mCondNotEmpty);
126        assert(0 == ok);
127        ok = pthread_cond_broadcast(&tp->mCondNotFull);
128        assert(0 == ok);
129        ok = pthread_mutex_unlock(&tp->mMutex);
130        assert(0 == ok);
131        unsigned i;
132        for (i = 0; i < nThreads; ++i) {
133            ok = pthread_join(tp->mThreadArray[i], (void **) NULL);
134            assert(ok == 0);
135        }
136        ok = pthread_mutex_lock(&tp->mMutex);
137        assert(0 == ok);
138        assert(0 == tp->mWaitingNotEmpty);
139        ok = pthread_mutex_unlock(&tp->mMutex);
140        assert(0 == ok);
141        // Note that we can't be sure when mWaitingNotFull will drop to zero
142    }
143    if (initialized & INITIALIZED_CONDNOTEMPTY)
144        (void) pthread_cond_destroy(&tp->mCondNotEmpty);
145    if (initialized & INITIALIZED_CONDNOTFULL)
146        (void) pthread_cond_destroy(&tp->mCondNotFull);
147    if (initialized & INITIALIZED_MUTEX)
148        (void) pthread_mutex_destroy(&tp->mMutex);
149    tp->mInitialized = INITIALIZED_NONE;
150    if (tp->mClosureTypical != tp->mClosureArray && NULL != tp->mClosureArray) {
151        free(tp->mClosureArray);
152        tp->mClosureArray = NULL;
153    }
154    if (tp->mThreadTypical != tp->mThreadArray && NULL != tp->mThreadArray) {
155        free(tp->mThreadArray);
156        tp->mThreadArray = NULL;
157    }
158}
159
160void ThreadPool_deinit(ThreadPool *tp)
161{
162    ThreadPool_deinit_internal(tp, tp->mInitialized, tp->mMaxThreads);
163}
164
165bool ThreadPool_add(ThreadPool *tp, Closure *closure)
166{
167    assert(NULL != tp);
168    assert(NULL != closure);
169    int ok;
170    ok = pthread_mutex_lock(&tp->mMutex);
171    assert(0 == ok);
172    for (;;) {
173        Closure **oldRear = tp->mClosureRear;
174        Closure **newRear = oldRear;
175        if (++newRear == &tp->mClosureArray[tp->mMaxClosures + 1])
176            newRear = tp->mClosureArray;
177        if (newRear == tp->mClosureFront) {
178            ++tp->mWaitingNotFull;
179            ok = pthread_cond_wait(&tp->mCondNotFull, &tp->mMutex);
180            assert(0 == ok);
181            if (tp->mShutdown) {
182                assert(0 < tp->mWaitingNotFull);
183                --tp->mWaitingNotFull;
184                ok = pthread_mutex_unlock(&tp->mMutex);
185                assert(0 == ok);
186                return false;
187            }
188            continue;
189        }
190        assert(NULL == *oldRear);
191        *oldRear = closure;
192        if (0 < tp->mWaitingNotEmpty) {
193            --tp->mWaitingNotEmpty;
194            ok = pthread_cond_signal(&tp->mCondNotEmpty);
195            assert(0 == ok);
196        }
197        break;
198    }
199    ok = pthread_mutex_unlock(&tp->mMutex);
200    assert(0 == ok);
201    return true;
202}
203
204Closure *ThreadPool_remove(ThreadPool *tp)
205{
206    Closure *pClosure;
207    int ok;
208    ok = pthread_mutex_lock(&tp->mMutex);
209    assert(0 == ok);
210    for (;;) {
211        Closure **oldFront = tp->mClosureFront;
212        if (oldFront == tp->mClosureRear) {
213            ++tp->mWaitingNotEmpty;
214            ok = pthread_cond_wait(&tp->mCondNotEmpty, &tp->mMutex);
215            assert(0 == ok);
216            if (tp->mShutdown) {
217                assert(0 < tp->mWaitingNotEmpty);
218                --tp->mWaitingNotEmpty;
219                pClosure = NULL;
220                break;
221            }
222            continue;
223        }
224        Closure **newFront = oldFront;
225        if (++newFront == &tp->mClosureArray[tp->mMaxClosures + 1])
226            newFront = tp->mClosureArray;
227        pClosure = *oldFront;
228        tp->mClosureFront = newFront;
229        if (0 < tp->mWaitingNotFull) {
230            --tp->mWaitingNotFull;
231            ok = pthread_cond_signal(&tp->mCondNotFull);
232            assert(0 == ok);
233        }
234        break;
235    }
236    ok = pthread_mutex_unlock(&tp->mMutex);
237    assert(0 == ok);
238    return pClosure;
239}
240