ThreadPool.cpp revision 4597a7427b697df31d0bbf4c2040806d0c27b6e0
1d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten/*
2d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten * Copyright (C) 2010 The Android Open Source Project
3d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten *
4d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten * Licensed under the Apache License, Version 2.0 (the "License");
5d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten * you may not use this file except in compliance with the License.
6d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten * You may obtain a copy of the License at
7d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten *
8d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten *      http://www.apache.org/licenses/LICENSE-2.0
9d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten *
10d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten * Unless required by applicable law or agreed to in writing, software
11d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten * distributed under the License is distributed on an "AS IS" BASIS,
12d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten * See the License for the specific language governing permissions and
14d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten * limitations under the License.
15d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten */
16d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten
17d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten/* ThreadPool */
18d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten
19d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten#include "sles_allinclusive.h"
20d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten
21d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten// Entry point for each worker thread
22d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten
23d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kastenstatic void *ThreadPool_start(void *context)
24d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten{
25d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    ThreadPool *tp = (ThreadPool *) context;
26d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    assert(NULL != tp);
27d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    for (;;) {
28d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        Closure *pClosure = ThreadPool_remove(tp);
29510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten        // closure is NULL when thread pool is being destroyed
30d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        if (NULL == pClosure)
31d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            break;
32510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten        void (*handler)(void *, int);
33d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        handler = pClosure->mHandler;
34510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten        void *context = pClosure->mContext;
35510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten        int parameter = pClosure->mParameter;
36510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten        free(pClosure);
37d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        assert(NULL != handler);
38510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten        (*handler)(context, parameter);
39d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    }
40d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    return NULL;
41d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten}
42d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten
43d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten#define INITIALIZED_NONE         0
44d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten#define INITIALIZED_MUTEX        1
45d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten#define INITIALIZED_CONDNOTFULL  2
46d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten#define INITIALIZED_CONDNOTEMPTY 4
47d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten#define INITIALIZED_ALL          7
48d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten
49d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kastenstatic void ThreadPool_deinit_internal(ThreadPool *tp, unsigned initialized, unsigned nThreads);
50d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten
51d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten// Initialize a ThreadPool
52d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten// maxClosures defaults to CLOSURE_TYPICAL if 0
53d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten// maxThreads defaults to THREAD_TYPICAL if 0
54d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten
55d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn KastenSLresult ThreadPool_init(ThreadPool *tp, unsigned maxClosures, unsigned maxThreads)
56d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten{
57d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    assert(NULL != tp);
58d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    memset(tp, 0, sizeof(ThreadPool));
59d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    tp->mShutdown = SL_BOOLEAN_FALSE;
60510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    unsigned initialized = INITIALIZED_NONE;    // which objects were successfully initialized
61510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    unsigned nThreads = 0;                      // number of threads successfully created
62510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    int err;
63d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    SLresult result;
64510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten
65510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    // initialize mutex and condition variables
66510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    err = pthread_mutex_init(&tp->mMutex, (const pthread_mutexattr_t *) NULL);
67510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    result = err_to_result(err);
68d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    if (SL_RESULT_SUCCESS != result)
69d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        goto fail;
70d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    initialized |= INITIALIZED_MUTEX;
71510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    err = pthread_cond_init(&tp->mCondNotFull, (const pthread_condattr_t *) NULL);
72510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    result = err_to_result(err);
73d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    if (SL_RESULT_SUCCESS != result)
74d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        goto fail;
75d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    initialized |= INITIALIZED_CONDNOTFULL;
76510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    err = pthread_cond_init(&tp->mCondNotEmpty, (const pthread_condattr_t *) NULL);
77510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    result = err_to_result(err);
78d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    if (SL_RESULT_SUCCESS != result)
79d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        goto fail;
80d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    initialized |= INITIALIZED_CONDNOTEMPTY;
81510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten
82510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    // use default values for parameters, if not specified explicitly
83d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    tp->mWaitingNotFull = 0;
84d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    tp->mWaitingNotEmpty = 0;
85d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    if (0 == maxClosures)
86d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        maxClosures = CLOSURE_TYPICAL;
87d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    tp->mMaxClosures = maxClosures;
88d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    if (0 == maxThreads)
89d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        maxThreads = THREAD_TYPICAL;
90d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    tp->mMaxThreads = maxThreads;
91510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten
92510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    // initialize circular buffer for closures
93d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    if (CLOSURE_TYPICAL >= maxClosures) {
94d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        tp->mClosureArray = tp->mClosureTypical;
95d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    } else {
96d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        tp->mClosureArray = (Closure **) malloc((maxClosures + 1) * sizeof(Closure *));
97d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        if (NULL == tp->mClosureArray) {
98d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            result = SL_RESULT_RESOURCE_ERROR;
99d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            goto fail;
100d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        }
101d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    }
102d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    tp->mClosureFront = tp->mClosureArray;
103d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    tp->mClosureRear = tp->mClosureArray;
104510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten
105510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    // initialize thread pool
106d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    if (THREAD_TYPICAL >= maxThreads) {
107d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        tp->mThreadArray = tp->mThreadTypical;
108d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    } else {
109d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        tp->mThreadArray = (pthread_t *) malloc(maxThreads * sizeof(pthread_t));
110d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        if (NULL == tp->mThreadArray) {
111d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            result = SL_RESULT_RESOURCE_ERROR;
112d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            goto fail;
113d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        }
114d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    }
115d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    unsigned i;
116d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    for (i = 0; i < maxThreads; ++i) {
117510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten        int err = pthread_create(&tp->mThreadArray[i], (const pthread_attr_t *) NULL,
118510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten            ThreadPool_start, tp);
119510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten        result = err_to_result(err);
120d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        if (SL_RESULT_SUCCESS != result)
121d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            goto fail;
122d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        ++nThreads;
123d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    }
124d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    tp->mInitialized = initialized;
125510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten
126510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    // done
127d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    return SL_RESULT_SUCCESS;
128510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten
129510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    // here on any kind of error
130d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kastenfail:
131d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    ThreadPool_deinit_internal(tp, initialized, nThreads);
132d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    return result;
133d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten}
134d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten
135d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kastenstatic void ThreadPool_deinit_internal(ThreadPool *tp, unsigned initialized, unsigned nThreads)
136d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten{
1374597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten    int ok;
1384597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten
139d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    assert(NULL != tp);
140d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    // Destroy all threads
141d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    if (0 < nThreads) {
142d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        assert(INITIALIZED_ALL == initialized);
143d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        ok = pthread_mutex_lock(&tp->mMutex);
144d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        assert(0 == ok);
145d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        tp->mShutdown = SL_BOOLEAN_TRUE;
146d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        ok = pthread_cond_broadcast(&tp->mCondNotEmpty);
147d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        assert(0 == ok);
148d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        ok = pthread_cond_broadcast(&tp->mCondNotFull);
149d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        assert(0 == ok);
150d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        ok = pthread_mutex_unlock(&tp->mMutex);
151d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        assert(0 == ok);
152d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        unsigned i;
153d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        for (i = 0; i < nThreads; ++i) {
154d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            ok = pthread_join(tp->mThreadArray[i], (void **) NULL);
155d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            assert(ok == 0);
156d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        }
157510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten
158510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten        // Empty out the circular buffer of closures
159d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        ok = pthread_mutex_lock(&tp->mMutex);
160d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        assert(0 == ok);
161d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        assert(0 == tp->mWaitingNotEmpty);
162510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten        Closure **oldFront = tp->mClosureFront;
163510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten        while (oldFront != tp->mClosureRear) {
164510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten            Closure **newFront = oldFront;
165510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten            if (++newFront == &tp->mClosureArray[tp->mMaxClosures + 1])
166510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten                newFront = tp->mClosureArray;
167510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten            Closure *pClosure = *oldFront;
168510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten            assert(NULL != pClosure);
169510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten            *oldFront = NULL;
170510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten            tp->mClosureFront = newFront;
171510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten            ok = pthread_mutex_unlock(&tp->mMutex);
172510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten            assert(0 == ok);
173510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten            free(pClosure);
174510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten            ok = pthread_mutex_lock(&tp->mMutex);
175510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten            assert(0 == ok);
176510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten        }
177d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        ok = pthread_mutex_unlock(&tp->mMutex);
178d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        assert(0 == ok);
179d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        // Note that we can't be sure when mWaitingNotFull will drop to zero
180d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    }
181510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten
182510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    // destroy the mutex and condition variables
1834597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten    if (initialized & INITIALIZED_CONDNOTEMPTY) {
1844597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten        ok = pthread_cond_destroy(&tp->mCondNotEmpty);
1854597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten        assert(0 == ok);
1864597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten    }
1874597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten    if (initialized & INITIALIZED_CONDNOTFULL) {
1884597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten        ok = pthread_cond_destroy(&tp->mCondNotFull);
1894597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten        assert(0 == ok);
1904597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten    }
1914597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten    if (initialized & INITIALIZED_MUTEX) {
1924597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten        ok = pthread_mutex_destroy(&tp->mMutex);
1934597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten        assert(0 == ok);
1944597a7427b697df31d0bbf4c2040806d0c27b6e0Glenn Kasten    }
195d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    tp->mInitialized = INITIALIZED_NONE;
196510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten
197510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    // release the closure circular buffer
198d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    if (tp->mClosureTypical != tp->mClosureArray && NULL != tp->mClosureArray) {
199d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        free(tp->mClosureArray);
200d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        tp->mClosureArray = NULL;
201d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    }
202510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten
203510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    // release the thread pool
204d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    if (tp->mThreadTypical != tp->mThreadArray && NULL != tp->mThreadArray) {
205d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        free(tp->mThreadArray);
206d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        tp->mThreadArray = NULL;
207d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    }
208510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten
209d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten}
210d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten
211d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kastenvoid ThreadPool_deinit(ThreadPool *tp)
212d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten{
213d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    ThreadPool_deinit_internal(tp, tp->mInitialized, tp->mMaxThreads);
214d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten}
215d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten
216510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten// Enqueue a closure to be executed later by a worker thread
217510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn KastenSLresult ThreadPool_add(ThreadPool *tp, void (*handler)(void *, int), void *context, int parameter)
218d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten{
219d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    assert(NULL != tp);
220510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    assert(NULL != handler);
221510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    Closure *closure = (Closure *) malloc(sizeof(Closure));
222510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    if (NULL == closure)
223510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten        return SL_RESULT_RESOURCE_ERROR;
224510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    closure->mHandler = handler;
225510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    closure->mContext = context;
226510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    closure->mParameter = parameter;
227d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    int ok;
228d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    ok = pthread_mutex_lock(&tp->mMutex);
229d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    assert(0 == ok);
230510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    // can't enqueue while thread pool shutting down
231510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    if (tp->mShutdown) {
232510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten        ok = pthread_mutex_unlock(&tp->mMutex);
233510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten        assert(0 == ok);
234510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten        free(closure);
235510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten        return SL_RESULT_PRECONDITIONS_VIOLATED;
236510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    }
237d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    for (;;) {
238d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        Closure **oldRear = tp->mClosureRear;
239d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        Closure **newRear = oldRear;
240d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        if (++newRear == &tp->mClosureArray[tp->mMaxClosures + 1])
241d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            newRear = tp->mClosureArray;
242510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten        // if closure circular buffer is full, then wait for it to become non-full
243d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        if (newRear == tp->mClosureFront) {
244d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            ++tp->mWaitingNotFull;
245d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            ok = pthread_cond_wait(&tp->mCondNotFull, &tp->mMutex);
246d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            assert(0 == ok);
247510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten            // can't enqueue while thread pool shutting down
248d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            if (tp->mShutdown) {
249d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten                assert(0 < tp->mWaitingNotFull);
250d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten                --tp->mWaitingNotFull;
251d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten                ok = pthread_mutex_unlock(&tp->mMutex);
252d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten                assert(0 == ok);
253510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten                free(closure);
254510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten                return SL_RESULT_PRECONDITIONS_VIOLATED;
255d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            }
256d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            continue;
257d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        }
258d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        assert(NULL == *oldRear);
259d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        *oldRear = closure;
260d4b099b83a686a7bb88f5e36c063f5efe623b56dGlenn Kasten        tp->mClosureRear = newRear;
261510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten        // if a worker thread was waiting to dequeue, then suggest that it try again
262d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        if (0 < tp->mWaitingNotEmpty) {
263d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            --tp->mWaitingNotEmpty;
264d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            ok = pthread_cond_signal(&tp->mCondNotEmpty);
265d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            assert(0 == ok);
266d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        }
267d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        break;
268d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    }
269d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    ok = pthread_mutex_unlock(&tp->mMutex);
270d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    assert(0 == ok);
271510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten    return SL_RESULT_SUCCESS;
272d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten}
273d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten
274510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten// Called by a worker thread when it is ready to accept the next closure to execute
275d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn KastenClosure *ThreadPool_remove(ThreadPool *tp)
276d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten{
277d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    Closure *pClosure;
278d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    int ok;
279d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    ok = pthread_mutex_lock(&tp->mMutex);
280d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    assert(0 == ok);
281d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    for (;;) {
282d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        Closure **oldFront = tp->mClosureFront;
283510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten        // if closure circular buffer is empty, then wait for it to become non-empty
284d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        if (oldFront == tp->mClosureRear) {
285d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            ++tp->mWaitingNotEmpty;
286d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            ok = pthread_cond_wait(&tp->mCondNotEmpty, &tp->mMutex);
287d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            assert(0 == ok);
288510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten            // fail if thread pool is shutting down
289d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            if (tp->mShutdown) {
290d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten                assert(0 < tp->mWaitingNotEmpty);
291d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten                --tp->mWaitingNotEmpty;
292d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten                pClosure = NULL;
293d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten                break;
294d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            }
295510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten            // try again
296d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            continue;
297d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        }
298510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten        // dequeue the closure at front of circular buffer
299d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        Closure **newFront = oldFront;
300d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        if (++newFront == &tp->mClosureArray[tp->mMaxClosures + 1])
301d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            newFront = tp->mClosureArray;
302d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        pClosure = *oldFront;
303510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten        assert(NULL != pClosure);
304510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten        *oldFront = NULL;
305d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        tp->mClosureFront = newFront;
306510f3671f716f6835282e4b0fd0275c20e9dadd8Glenn Kasten        // if a client thread was waiting to enqueue, then suggest that it try again
307d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        if (0 < tp->mWaitingNotFull) {
308d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            --tp->mWaitingNotFull;
309d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            ok = pthread_cond_signal(&tp->mCondNotFull);
310d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten            assert(0 == ok);
311d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        }
312d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten        break;
313d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    }
314d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    ok = pthread_mutex_unlock(&tp->mMutex);
315d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    assert(0 == ok);
316d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten    return pClosure;
317d2a7f0d6883a6d3835642e7b282f05ed1c54fe63Glenn Kasten}
318