thread.c revision a2415724fb3466168b2af5b08bd94ba732c0e753
1// Copyright 2011 Google Inc. All Rights Reserved.
2//
3// This code is licensed under the same terms as WebM:
4//  Software License Agreement:  http://www.webmproject.org/license/software/
5//  Additional IP Rights Grant:  http://www.webmproject.org/license/additional/
6// -----------------------------------------------------------------------------
7//
8// Multi-threaded worker
9//
10// Author: Skal (pascal.massimino@gmail.com)
11
12#ifdef HAVE_CONFIG_H
13#include "config.h"
14#endif
15
16#include <assert.h>
17#include <string.h>   // for memset()
18#include "./thread.h"
19
20#if defined(__cplusplus) || defined(c_plusplus)
21extern "C" {
22#endif
23
24#ifdef WEBP_USE_THREAD
25
26#if defined(_WIN32)
27
28//------------------------------------------------------------------------------
29// simplistic pthread emulation layer
30
31#include <process.h>
32
33// _beginthreadex requires __stdcall
34#define THREADFN unsigned int __stdcall
35#define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val)
36
37static int pthread_create(pthread_t* const thread, const void* attr,
38                          unsigned int (__stdcall *start)(void*), void* arg) {
39  (void)attr;
40  *thread = (pthread_t)_beginthreadex(NULL,   /* void *security */
41                                      0,      /* unsigned stack_size */
42                                      start,
43                                      arg,
44                                      0,      /* unsigned initflag */
45                                      NULL);  /* unsigned *thrdaddr */
46  if (*thread == NULL) return 1;
47  SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL);
48  return 0;
49}
50
51static int pthread_join(pthread_t thread, void** value_ptr) {
52  (void)value_ptr;
53  return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 ||
54          CloseHandle(thread) == 0);
55}
56
57// Mutex
58static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) {
59  (void)mutexattr;
60  InitializeCriticalSection(mutex);
61  return 0;
62}
63
64static int pthread_mutex_lock(pthread_mutex_t* const mutex) {
65  EnterCriticalSection(mutex);
66  return 0;
67}
68
69static int pthread_mutex_unlock(pthread_mutex_t* const mutex) {
70  LeaveCriticalSection(mutex);
71  return 0;
72}
73
74static int pthread_mutex_destroy(pthread_mutex_t* const mutex) {
75  DeleteCriticalSection(mutex);
76  return 0;
77}
78
79// Condition
80static int pthread_cond_destroy(pthread_cond_t* const condition) {
81  int ok = 1;
82  ok &= (CloseHandle(condition->waiting_sem_) != 0);
83  ok &= (CloseHandle(condition->received_sem_) != 0);
84  ok &= (CloseHandle(condition->signal_event_) != 0);
85  return !ok;
86}
87
88static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) {
89  (void)cond_attr;
90  condition->waiting_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
91  condition->received_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
92  condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL);
93  if (condition->waiting_sem_ == NULL ||
94      condition->received_sem_ == NULL ||
95      condition->signal_event_ == NULL) {
96    pthread_cond_destroy(condition);
97    return 1;
98  }
99  return 0;
100}
101
102static int pthread_cond_signal(pthread_cond_t* const condition) {
103  int ok = 1;
104  if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) {
105    // a thread is waiting in pthread_cond_wait: allow it to be notified
106    ok = SetEvent(condition->signal_event_);
107    // wait until the event is consumed so the signaler cannot consume
108    // the event via its own pthread_cond_wait.
109    ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) !=
110           WAIT_OBJECT_0);
111  }
112  return !ok;
113}
114
115static int pthread_cond_wait(pthread_cond_t* const condition,
116                             pthread_mutex_t* const mutex) {
117  int ok;
118  // note that there is a consumer available so the signal isn't dropped in
119  // pthread_cond_signal
120  if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL))
121    return 1;
122  // now unlock the mutex so pthread_cond_signal may be issued
123  pthread_mutex_unlock(mutex);
124  ok = (WaitForSingleObject(condition->signal_event_, INFINITE) ==
125        WAIT_OBJECT_0);
126  ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL);
127  pthread_mutex_lock(mutex);
128  return !ok;
129}
130
131#else  // _WIN32
132# define THREADFN void*
133# define THREAD_RETURN(val) val
134#endif
135
136//------------------------------------------------------------------------------
137
138static THREADFN WebPWorkerThreadLoop(void *ptr) {    // thread loop
139  WebPWorker* const worker = (WebPWorker*)ptr;
140  int done = 0;
141  while (!done) {
142    pthread_mutex_lock(&worker->mutex_);
143    while (worker->status_ == OK) {   // wait in idling mode
144      pthread_cond_wait(&worker->condition_, &worker->mutex_);
145    }
146    if (worker->status_ == WORK) {
147      if (worker->hook) {
148        worker->had_error |= !worker->hook(worker->data1, worker->data2);
149      }
150      worker->status_ = OK;
151    } else if (worker->status_ == NOT_OK) {   // finish the worker
152      done = 1;
153    }
154    // signal to the main thread that we're done (for Sync())
155    pthread_cond_signal(&worker->condition_);
156    pthread_mutex_unlock(&worker->mutex_);
157  }
158  return THREAD_RETURN(NULL);    // Thread is finished
159}
160
161// main thread state control
162static void WebPWorkerChangeState(WebPWorker* const worker,
163                                  WebPWorkerStatus new_status) {
164  // no-op when attempting to change state on a thread that didn't come up
165  if (worker->status_ < OK) return;
166
167  pthread_mutex_lock(&worker->mutex_);
168  // wait for the worker to finish
169  while (worker->status_ != OK) {
170    pthread_cond_wait(&worker->condition_, &worker->mutex_);
171  }
172  // assign new status and release the working thread if needed
173  if (new_status != OK) {
174    worker->status_ = new_status;
175    pthread_cond_signal(&worker->condition_);
176  }
177  pthread_mutex_unlock(&worker->mutex_);
178}
179
180#endif
181
182//------------------------------------------------------------------------------
183
184void WebPWorkerInit(WebPWorker* const worker) {
185  memset(worker, 0, sizeof(*worker));
186  worker->status_ = NOT_OK;
187}
188
189int WebPWorkerSync(WebPWorker* const worker) {
190#ifdef WEBP_USE_THREAD
191  WebPWorkerChangeState(worker, OK);
192#endif
193  assert(worker->status_ <= OK);
194  return !worker->had_error;
195}
196
197int WebPWorkerReset(WebPWorker* const worker) {
198  int ok = 1;
199  worker->had_error = 0;
200  if (worker->status_ < OK) {
201#ifdef WEBP_USE_THREAD
202    if (pthread_mutex_init(&worker->mutex_, NULL) ||
203        pthread_cond_init(&worker->condition_, NULL)) {
204      return 0;
205    }
206    pthread_mutex_lock(&worker->mutex_);
207    ok = !pthread_create(&worker->thread_, NULL, WebPWorkerThreadLoop, worker);
208    if (ok) worker->status_ = OK;
209    pthread_mutex_unlock(&worker->mutex_);
210#else
211    worker->status_ = OK;
212#endif
213  } else if (worker->status_ > OK) {
214    ok = WebPWorkerSync(worker);
215  }
216  assert(!ok || (worker->status_ == OK));
217  return ok;
218}
219
220void WebPWorkerLaunch(WebPWorker* const worker) {
221#ifdef WEBP_USE_THREAD
222  WebPWorkerChangeState(worker, WORK);
223#else
224  if (worker->hook)
225    worker->had_error |= !worker->hook(worker->data1, worker->data2);
226#endif
227}
228
229void WebPWorkerEnd(WebPWorker* const worker) {
230  if (worker->status_ >= OK) {
231#ifdef WEBP_USE_THREAD
232    WebPWorkerChangeState(worker, NOT_OK);
233    pthread_join(worker->thread_, NULL);
234    pthread_mutex_destroy(&worker->mutex_);
235    pthread_cond_destroy(&worker->condition_);
236#else
237    worker->status_ = NOT_OK;
238#endif
239  }
240  assert(worker->status_ == NOT_OK);
241}
242
243//------------------------------------------------------------------------------
244
245#if defined(__cplusplus) || defined(c_plusplus)
246}    // extern "C"
247#endif
248