thread.c revision 8b720228d581a84fd173b6dcb2fa295b59db489a
1// Copyright 2011 Google Inc. All Rights Reserved. 2// 3// Use of this source code is governed by a BSD-style license 4// that can be found in the COPYING file in the root of the source 5// tree. An additional intellectual property rights grant can be found 6// in the file PATENTS. All contributing project authors may 7// be found in the AUTHORS file in the root of the source tree. 8// ----------------------------------------------------------------------------- 9// 10// Multi-threaded worker 11// 12// Author: Skal (pascal.massimino@gmail.com) 13 14#include <assert.h> 15#include <string.h> // for memset() 16#include "./thread.h" 17 18#ifdef WEBP_USE_THREAD 19 20#if defined(_WIN32) 21 22//------------------------------------------------------------------------------ 23// simplistic pthread emulation layer 24 25#include <process.h> 26 27// _beginthreadex requires __stdcall 28#define THREADFN unsigned int __stdcall 29#define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val) 30 31static int pthread_create(pthread_t* const thread, const void* attr, 32 unsigned int (__stdcall *start)(void*), void* arg) { 33 (void)attr; 34 *thread = (pthread_t)_beginthreadex(NULL, /* void *security */ 35 0, /* unsigned stack_size */ 36 start, 37 arg, 38 0, /* unsigned initflag */ 39 NULL); /* unsigned *thrdaddr */ 40 if (*thread == NULL) return 1; 41 SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL); 42 return 0; 43} 44 45static int pthread_join(pthread_t thread, void** value_ptr) { 46 (void)value_ptr; 47 return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 || 48 CloseHandle(thread) == 0); 49} 50 51// Mutex 52static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) { 53 (void)mutexattr; 54 InitializeCriticalSection(mutex); 55 return 0; 56} 57 58static int pthread_mutex_lock(pthread_mutex_t* const mutex) { 59 EnterCriticalSection(mutex); 60 return 0; 61} 62 63static int pthread_mutex_unlock(pthread_mutex_t* const mutex) { 64 LeaveCriticalSection(mutex); 65 return 0; 66} 67 68static int pthread_mutex_destroy(pthread_mutex_t* const mutex) { 69 DeleteCriticalSection(mutex); 70 return 0; 71} 72 73// Condition 74static int pthread_cond_destroy(pthread_cond_t* const condition) { 75 int ok = 1; 76 ok &= (CloseHandle(condition->waiting_sem_) != 0); 77 ok &= (CloseHandle(condition->received_sem_) != 0); 78 ok &= (CloseHandle(condition->signal_event_) != 0); 79 return !ok; 80} 81 82static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) { 83 (void)cond_attr; 84 condition->waiting_sem_ = CreateSemaphore(NULL, 0, 1, NULL); 85 condition->received_sem_ = CreateSemaphore(NULL, 0, 1, NULL); 86 condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL); 87 if (condition->waiting_sem_ == NULL || 88 condition->received_sem_ == NULL || 89 condition->signal_event_ == NULL) { 90 pthread_cond_destroy(condition); 91 return 1; 92 } 93 return 0; 94} 95 96static int pthread_cond_signal(pthread_cond_t* const condition) { 97 int ok = 1; 98 if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) { 99 // a thread is waiting in pthread_cond_wait: allow it to be notified 100 ok = SetEvent(condition->signal_event_); 101 // wait until the event is consumed so the signaler cannot consume 102 // the event via its own pthread_cond_wait. 103 ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) != 104 WAIT_OBJECT_0); 105 } 106 return !ok; 107} 108 109static int pthread_cond_wait(pthread_cond_t* const condition, 110 pthread_mutex_t* const mutex) { 111 int ok; 112 // note that there is a consumer available so the signal isn't dropped in 113 // pthread_cond_signal 114 if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL)) 115 return 1; 116 // now unlock the mutex so pthread_cond_signal may be issued 117 pthread_mutex_unlock(mutex); 118 ok = (WaitForSingleObject(condition->signal_event_, INFINITE) == 119 WAIT_OBJECT_0); 120 ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL); 121 pthread_mutex_lock(mutex); 122 return !ok; 123} 124 125#else // !_WIN32 126# define THREADFN void* 127# define THREAD_RETURN(val) val 128#endif // _WIN32 129 130//------------------------------------------------------------------------------ 131 132static THREADFN ThreadLoop(void* ptr) { 133 WebPWorker* const worker = (WebPWorker*)ptr; 134 int done = 0; 135 while (!done) { 136 pthread_mutex_lock(&worker->mutex_); 137 while (worker->status_ == OK) { // wait in idling mode 138 pthread_cond_wait(&worker->condition_, &worker->mutex_); 139 } 140 if (worker->status_ == WORK) { 141 WebPWorkerExecute(worker); 142 worker->status_ = OK; 143 } else if (worker->status_ == NOT_OK) { // finish the worker 144 done = 1; 145 } 146 // signal to the main thread that we're done (for Sync()) 147 pthread_cond_signal(&worker->condition_); 148 pthread_mutex_unlock(&worker->mutex_); 149 } 150 return THREAD_RETURN(NULL); // Thread is finished 151} 152 153// main thread state control 154static void ChangeState(WebPWorker* const worker, 155 WebPWorkerStatus new_status) { 156 // no-op when attempting to change state on a thread that didn't come up 157 if (worker->status_ < OK) return; 158 159 pthread_mutex_lock(&worker->mutex_); 160 // wait for the worker to finish 161 while (worker->status_ != OK) { 162 pthread_cond_wait(&worker->condition_, &worker->mutex_); 163 } 164 // assign new status and release the working thread if needed 165 if (new_status != OK) { 166 worker->status_ = new_status; 167 pthread_cond_signal(&worker->condition_); 168 } 169 pthread_mutex_unlock(&worker->mutex_); 170} 171 172#endif // WEBP_USE_THREAD 173 174//------------------------------------------------------------------------------ 175 176void WebPWorkerInit(WebPWorker* const worker) { 177 memset(worker, 0, sizeof(*worker)); 178 worker->status_ = NOT_OK; 179} 180 181int WebPWorkerSync(WebPWorker* const worker) { 182#ifdef WEBP_USE_THREAD 183 ChangeState(worker, OK); 184#endif 185 assert(worker->status_ <= OK); 186 return !worker->had_error; 187} 188 189int WebPWorkerReset(WebPWorker* const worker) { 190 int ok = 1; 191 worker->had_error = 0; 192 if (worker->status_ < OK) { 193#ifdef WEBP_USE_THREAD 194 if (pthread_mutex_init(&worker->mutex_, NULL) || 195 pthread_cond_init(&worker->condition_, NULL)) { 196 return 0; 197 } 198 pthread_mutex_lock(&worker->mutex_); 199 ok = !pthread_create(&worker->thread_, NULL, ThreadLoop, worker); 200 if (ok) worker->status_ = OK; 201 pthread_mutex_unlock(&worker->mutex_); 202#else 203 worker->status_ = OK; 204#endif 205 } else if (worker->status_ > OK) { 206 ok = WebPWorkerSync(worker); 207 } 208 assert(!ok || (worker->status_ == OK)); 209 return ok; 210} 211 212void WebPWorkerExecute(WebPWorker* const worker) { 213 if (worker->hook != NULL) { 214 worker->had_error |= !worker->hook(worker->data1, worker->data2); 215 } 216} 217 218void WebPWorkerLaunch(WebPWorker* const worker) { 219#ifdef WEBP_USE_THREAD 220 ChangeState(worker, WORK); 221#else 222 WebPWorkerExecute(worker); 223#endif 224} 225 226void WebPWorkerEnd(WebPWorker* const worker) { 227 if (worker->status_ >= OK) { 228#ifdef WEBP_USE_THREAD 229 ChangeState(worker, NOT_OK); 230 pthread_join(worker->thread_, NULL); 231 pthread_mutex_destroy(&worker->mutex_); 232 pthread_cond_destroy(&worker->condition_); 233#else 234 worker->status_ = NOT_OK; 235#endif 236 } 237 assert(worker->status_ == NOT_OK); 238} 239 240//------------------------------------------------------------------------------ 241 242