thread.c revision 1e7bf8805bd030c19924a5306837ecd72c295751
1// Copyright 2011 Google Inc. All Rights Reserved.
2//
3// This code is licensed under the same terms as WebM:
4//  Software License Agreement:  http://www.webmproject.org/license/software/
5//  Additional IP Rights Grant:  http://www.webmproject.org/license/additional/
6// -----------------------------------------------------------------------------
7//
8// Multi-threaded worker
9//
10// Author: Skal (pascal.massimino@gmail.com)
11
12#include <assert.h>
13#include <string.h>   // for memset()
14#include "./thread.h"
15
16#if defined(__cplusplus) || defined(c_plusplus)
17extern "C" {
18#endif
19
20#ifdef WEBP_USE_THREAD
21
22#if defined(_WIN32)
23
24//------------------------------------------------------------------------------
25// simplistic pthread emulation layer
26
27#include <process.h>
28
29// _beginthreadex requires __stdcall
30#define THREADFN unsigned int __stdcall
31#define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val)
32
33static int pthread_create(pthread_t* const thread, const void* attr,
34                          unsigned int (__stdcall *start)(void*), void* arg) {
35  (void)attr;
36  *thread = (pthread_t)_beginthreadex(NULL,   /* void *security */
37                                      0,      /* unsigned stack_size */
38                                      start,
39                                      arg,
40                                      0,      /* unsigned initflag */
41                                      NULL);  /* unsigned *thrdaddr */
42  if (*thread == NULL) return 1;
43  SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL);
44  return 0;
45}
46
47static int pthread_join(pthread_t thread, void** value_ptr) {
48  (void)value_ptr;
49  return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 ||
50          CloseHandle(thread) == 0);
51}
52
53// Mutex
54static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) {
55  (void)mutexattr;
56  InitializeCriticalSection(mutex);
57  return 0;
58}
59
60static int pthread_mutex_lock(pthread_mutex_t* const mutex) {
61  EnterCriticalSection(mutex);
62  return 0;
63}
64
65static int pthread_mutex_unlock(pthread_mutex_t* const mutex) {
66  LeaveCriticalSection(mutex);
67  return 0;
68}
69
70static int pthread_mutex_destroy(pthread_mutex_t* const mutex) {
71  DeleteCriticalSection(mutex);
72  return 0;
73}
74
75// Condition
76static int pthread_cond_destroy(pthread_cond_t* const condition) {
77  int ok = 1;
78  ok &= (CloseHandle(condition->waiting_sem_) != 0);
79  ok &= (CloseHandle(condition->received_sem_) != 0);
80  ok &= (CloseHandle(condition->signal_event_) != 0);
81  return !ok;
82}
83
84static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) {
85  (void)cond_attr;
86  condition->waiting_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
87  condition->received_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
88  condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL);
89  if (condition->waiting_sem_ == NULL ||
90      condition->received_sem_ == NULL ||
91      condition->signal_event_ == NULL) {
92    pthread_cond_destroy(condition);
93    return 1;
94  }
95  return 0;
96}
97
98static int pthread_cond_signal(pthread_cond_t* const condition) {
99  int ok = 1;
100  if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) {
101    // a thread is waiting in pthread_cond_wait: allow it to be notified
102    ok = SetEvent(condition->signal_event_);
103    // wait until the event is consumed so the signaler cannot consume
104    // the event via its own pthread_cond_wait.
105    ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) !=
106           WAIT_OBJECT_0);
107  }
108  return !ok;
109}
110
111static int pthread_cond_wait(pthread_cond_t* const condition,
112                             pthread_mutex_t* const mutex) {
113  int ok;
114  // note that there is a consumer available so the signal isn't dropped in
115  // pthread_cond_signal
116  if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL))
117    return 1;
118  // now unlock the mutex so pthread_cond_signal may be issued
119  pthread_mutex_unlock(mutex);
120  ok = (WaitForSingleObject(condition->signal_event_, INFINITE) ==
121        WAIT_OBJECT_0);
122  ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL);
123  pthread_mutex_lock(mutex);
124  return !ok;
125}
126
127#else  // _WIN32
128# define THREADFN void*
129# define THREAD_RETURN(val) val
130#endif
131
132//------------------------------------------------------------------------------
133
134static THREADFN WebPWorkerThreadLoop(void *ptr) {    // thread loop
135  WebPWorker* const worker = (WebPWorker*)ptr;
136  int done = 0;
137  while (!done) {
138    pthread_mutex_lock(&worker->mutex_);
139    while (worker->status_ == OK) {   // wait in idling mode
140      pthread_cond_wait(&worker->condition_, &worker->mutex_);
141    }
142    if (worker->status_ == WORK) {
143      if (worker->hook) {
144        worker->had_error |= !worker->hook(worker->data1, worker->data2);
145      }
146      worker->status_ = OK;
147    } else if (worker->status_ == NOT_OK) {   // finish the worker
148      done = 1;
149    }
150    // signal to the main thread that we're done (for Sync())
151    pthread_cond_signal(&worker->condition_);
152    pthread_mutex_unlock(&worker->mutex_);
153  }
154  return THREAD_RETURN(NULL);    // Thread is finished
155}
156
157// main thread state control
158static void WebPWorkerChangeState(WebPWorker* const worker,
159                                  WebPWorkerStatus new_status) {
160  // no-op when attempting to change state on a thread that didn't come up
161  if (worker->status_ < OK) return;
162
163  pthread_mutex_lock(&worker->mutex_);
164  // wait for the worker to finish
165  while (worker->status_ != OK) {
166    pthread_cond_wait(&worker->condition_, &worker->mutex_);
167  }
168  // assign new status and release the working thread if needed
169  if (new_status != OK) {
170    worker->status_ = new_status;
171    pthread_cond_signal(&worker->condition_);
172  }
173  pthread_mutex_unlock(&worker->mutex_);
174}
175
176#endif
177
178//------------------------------------------------------------------------------
179
180void WebPWorkerInit(WebPWorker* const worker) {
181  memset(worker, 0, sizeof(*worker));
182  worker->status_ = NOT_OK;
183}
184
185int WebPWorkerSync(WebPWorker* const worker) {
186#ifdef WEBP_USE_THREAD
187  WebPWorkerChangeState(worker, OK);
188#endif
189  assert(worker->status_ <= OK);
190  return !worker->had_error;
191}
192
193int WebPWorkerReset(WebPWorker* const worker) {
194  int ok = 1;
195  worker->had_error = 0;
196  if (worker->status_ < OK) {
197#ifdef WEBP_USE_THREAD
198    if (pthread_mutex_init(&worker->mutex_, NULL) ||
199        pthread_cond_init(&worker->condition_, NULL)) {
200      return 0;
201    }
202    pthread_mutex_lock(&worker->mutex_);
203    ok = !pthread_create(&worker->thread_, NULL, WebPWorkerThreadLoop, worker);
204    if (ok) worker->status_ = OK;
205    pthread_mutex_unlock(&worker->mutex_);
206#else
207    worker->status_ = OK;
208#endif
209  } else if (worker->status_ > OK) {
210    ok = WebPWorkerSync(worker);
211  }
212  assert(!ok || (worker->status_ == OK));
213  return ok;
214}
215
216void WebPWorkerLaunch(WebPWorker* const worker) {
217#ifdef WEBP_USE_THREAD
218  WebPWorkerChangeState(worker, WORK);
219#else
220  if (worker->hook)
221    worker->had_error |= !worker->hook(worker->data1, worker->data2);
222#endif
223}
224
225void WebPWorkerEnd(WebPWorker* const worker) {
226  if (worker->status_ >= OK) {
227#ifdef WEBP_USE_THREAD
228    WebPWorkerChangeState(worker, NOT_OK);
229    pthread_join(worker->thread_, NULL);
230    pthread_mutex_destroy(&worker->mutex_);
231    pthread_cond_destroy(&worker->condition_);
232#else
233    worker->status_ = NOT_OK;
234#endif
235  }
236  assert(worker->status_ == NOT_OK);
237}
238
239//------------------------------------------------------------------------------
240
241#if defined(__cplusplus) || defined(c_plusplus)
242}    // extern "C"
243#endif
244