1// Copyright 2011 Google Inc. All Rights Reserved. 2// 3// Use of this source code is governed by a BSD-style license 4// that can be found in the COPYING file in the root of the source 5// tree. An additional intellectual property rights grant can be found 6// in the file PATENTS. All contributing project authors may 7// be found in the AUTHORS file in the root of the source tree. 8// ----------------------------------------------------------------------------- 9// 10// Multi-threaded worker 11// 12// Author: Skal (pascal.massimino@gmail.com) 13 14#include <assert.h> 15#include <string.h> // for memset() 16#include "./thread.h" 17#include "./utils.h" 18 19#ifdef WEBP_USE_THREAD 20 21#if defined(_WIN32) 22 23#include <windows.h> 24typedef HANDLE pthread_t; 25typedef CRITICAL_SECTION pthread_mutex_t; 26typedef struct { 27 HANDLE waiting_sem_; 28 HANDLE received_sem_; 29 HANDLE signal_event_; 30} pthread_cond_t; 31 32#else // !_WIN32 33 34#include <pthread.h> 35 36#endif // _WIN32 37 38struct WebPWorkerImpl { 39 pthread_mutex_t mutex_; 40 pthread_cond_t condition_; 41 pthread_t thread_; 42}; 43 44#if defined(_WIN32) 45 46//------------------------------------------------------------------------------ 47// simplistic pthread emulation layer 48 49#include <process.h> 50 51// _beginthreadex requires __stdcall 52#define THREADFN unsigned int __stdcall 53#define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val) 54 55static int pthread_create(pthread_t* const thread, const void* attr, 56 unsigned int (__stdcall *start)(void*), void* arg) { 57 (void)attr; 58 *thread = (pthread_t)_beginthreadex(NULL, /* void *security */ 59 0, /* unsigned stack_size */ 60 start, 61 arg, 62 0, /* unsigned initflag */ 63 NULL); /* unsigned *thrdaddr */ 64 if (*thread == NULL) return 1; 65 SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL); 66 return 0; 67} 68 69static int pthread_join(pthread_t thread, void** value_ptr) { 70 (void)value_ptr; 71 return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 || 72 CloseHandle(thread) == 0); 73} 74 75// Mutex 76static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) { 77 (void)mutexattr; 78 InitializeCriticalSection(mutex); 79 return 0; 80} 81 82static int pthread_mutex_lock(pthread_mutex_t* const mutex) { 83 EnterCriticalSection(mutex); 84 return 0; 85} 86 87static int pthread_mutex_unlock(pthread_mutex_t* const mutex) { 88 LeaveCriticalSection(mutex); 89 return 0; 90} 91 92static int pthread_mutex_destroy(pthread_mutex_t* const mutex) { 93 DeleteCriticalSection(mutex); 94 return 0; 95} 96 97// Condition 98static int pthread_cond_destroy(pthread_cond_t* const condition) { 99 int ok = 1; 100 ok &= (CloseHandle(condition->waiting_sem_) != 0); 101 ok &= (CloseHandle(condition->received_sem_) != 0); 102 ok &= (CloseHandle(condition->signal_event_) != 0); 103 return !ok; 104} 105 106static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) { 107 (void)cond_attr; 108 condition->waiting_sem_ = CreateSemaphore(NULL, 0, 1, NULL); 109 condition->received_sem_ = CreateSemaphore(NULL, 0, 1, NULL); 110 condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL); 111 if (condition->waiting_sem_ == NULL || 112 condition->received_sem_ == NULL || 113 condition->signal_event_ == NULL) { 114 pthread_cond_destroy(condition); 115 return 1; 116 } 117 return 0; 118} 119 120static int pthread_cond_signal(pthread_cond_t* const condition) { 121 int ok = 1; 122 if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) { 123 // a thread is waiting in pthread_cond_wait: allow it to be notified 124 ok = SetEvent(condition->signal_event_); 125 // wait until the event is consumed so the signaler cannot consume 126 // the event via its own pthread_cond_wait. 127 ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) != 128 WAIT_OBJECT_0); 129 } 130 return !ok; 131} 132 133static int pthread_cond_wait(pthread_cond_t* const condition, 134 pthread_mutex_t* const mutex) { 135 int ok; 136 // note that there is a consumer available so the signal isn't dropped in 137 // pthread_cond_signal 138 if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL)) 139 return 1; 140 // now unlock the mutex so pthread_cond_signal may be issued 141 pthread_mutex_unlock(mutex); 142 ok = (WaitForSingleObject(condition->signal_event_, INFINITE) == 143 WAIT_OBJECT_0); 144 ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL); 145 pthread_mutex_lock(mutex); 146 return !ok; 147} 148 149#else // !_WIN32 150# define THREADFN void* 151# define THREAD_RETURN(val) val 152#endif // _WIN32 153 154//------------------------------------------------------------------------------ 155 156static void Execute(WebPWorker* const worker); // Forward declaration. 157 158static THREADFN ThreadLoop(void* ptr) { 159 WebPWorker* const worker = (WebPWorker*)ptr; 160 int done = 0; 161 while (!done) { 162 pthread_mutex_lock(&worker->impl_->mutex_); 163 while (worker->status_ == OK) { // wait in idling mode 164 pthread_cond_wait(&worker->impl_->condition_, &worker->impl_->mutex_); 165 } 166 if (worker->status_ == WORK) { 167 Execute(worker); 168 worker->status_ = OK; 169 } else if (worker->status_ == NOT_OK) { // finish the worker 170 done = 1; 171 } 172 // signal to the main thread that we're done (for Sync()) 173 pthread_cond_signal(&worker->impl_->condition_); 174 pthread_mutex_unlock(&worker->impl_->mutex_); 175 } 176 return THREAD_RETURN(NULL); // Thread is finished 177} 178 179// main thread state control 180static void ChangeState(WebPWorker* const worker, 181 WebPWorkerStatus new_status) { 182 // No-op when attempting to change state on a thread that didn't come up. 183 // Checking status_ without acquiring the lock first would result in a data 184 // race. 185 if (worker->impl_ == NULL) return; 186 187 pthread_mutex_lock(&worker->impl_->mutex_); 188 if (worker->status_ >= OK) { 189 // wait for the worker to finish 190 while (worker->status_ != OK) { 191 pthread_cond_wait(&worker->impl_->condition_, &worker->impl_->mutex_); 192 } 193 // assign new status and release the working thread if needed 194 if (new_status != OK) { 195 worker->status_ = new_status; 196 pthread_cond_signal(&worker->impl_->condition_); 197 } 198 } 199 pthread_mutex_unlock(&worker->impl_->mutex_); 200} 201 202#endif // WEBP_USE_THREAD 203 204//------------------------------------------------------------------------------ 205 206static void Init(WebPWorker* const worker) { 207 memset(worker, 0, sizeof(*worker)); 208 worker->status_ = NOT_OK; 209} 210 211static int Sync(WebPWorker* const worker) { 212#ifdef WEBP_USE_THREAD 213 ChangeState(worker, OK); 214#endif 215 assert(worker->status_ <= OK); 216 return !worker->had_error; 217} 218 219static int Reset(WebPWorker* const worker) { 220 int ok = 1; 221 worker->had_error = 0; 222 if (worker->status_ < OK) { 223#ifdef WEBP_USE_THREAD 224 worker->impl_ = (WebPWorkerImpl*)WebPSafeCalloc(1, sizeof(*worker->impl_)); 225 if (worker->impl_ == NULL) { 226 return 0; 227 } 228 if (pthread_mutex_init(&worker->impl_->mutex_, NULL)) { 229 goto Error; 230 } 231 if (pthread_cond_init(&worker->impl_->condition_, NULL)) { 232 pthread_mutex_destroy(&worker->impl_->mutex_); 233 goto Error; 234 } 235 pthread_mutex_lock(&worker->impl_->mutex_); 236 ok = !pthread_create(&worker->impl_->thread_, NULL, ThreadLoop, worker); 237 if (ok) worker->status_ = OK; 238 pthread_mutex_unlock(&worker->impl_->mutex_); 239 if (!ok) { 240 pthread_mutex_destroy(&worker->impl_->mutex_); 241 pthread_cond_destroy(&worker->impl_->condition_); 242 Error: 243 WebPSafeFree(worker->impl_); 244 worker->impl_ = NULL; 245 return 0; 246 } 247#else 248 worker->status_ = OK; 249#endif 250 } else if (worker->status_ > OK) { 251 ok = Sync(worker); 252 } 253 assert(!ok || (worker->status_ == OK)); 254 return ok; 255} 256 257static void Execute(WebPWorker* const worker) { 258 if (worker->hook != NULL) { 259 worker->had_error |= !worker->hook(worker->data1, worker->data2); 260 } 261} 262 263static void Launch(WebPWorker* const worker) { 264#ifdef WEBP_USE_THREAD 265 ChangeState(worker, WORK); 266#else 267 Execute(worker); 268#endif 269} 270 271static void End(WebPWorker* const worker) { 272#ifdef WEBP_USE_THREAD 273 if (worker->impl_ != NULL) { 274 ChangeState(worker, NOT_OK); 275 pthread_join(worker->impl_->thread_, NULL); 276 pthread_mutex_destroy(&worker->impl_->mutex_); 277 pthread_cond_destroy(&worker->impl_->condition_); 278 WebPSafeFree(worker->impl_); 279 worker->impl_ = NULL; 280 } 281#else 282 worker->status_ = NOT_OK; 283 assert(worker->impl_ == NULL); 284#endif 285 assert(worker->status_ == NOT_OK); 286} 287 288//------------------------------------------------------------------------------ 289 290static WebPWorkerInterface g_worker_interface = { 291 Init, Reset, Sync, Launch, Execute, End 292}; 293 294int WebPSetWorkerInterface(const WebPWorkerInterface* const winterface) { 295 if (winterface == NULL || 296 winterface->Init == NULL || winterface->Reset == NULL || 297 winterface->Sync == NULL || winterface->Launch == NULL || 298 winterface->Execute == NULL || winterface->End == NULL) { 299 return 0; 300 } 301 g_worker_interface = *winterface; 302 return 1; 303} 304 305const WebPWorkerInterface* WebPGetWorkerInterface(void) { 306 return &g_worker_interface; 307} 308 309//------------------------------------------------------------------------------ 310