thread.c revision 8b720228d581a84fd173b6dcb2fa295b59db489a
1// Copyright 2011 Google Inc. All Rights Reserved.
2//
3// Use of this source code is governed by a BSD-style license
4// that can be found in the COPYING file in the root of the source
5// tree. An additional intellectual property rights grant can be found
6// in the file PATENTS. All contributing project authors may
7// be found in the AUTHORS file in the root of the source tree.
8// -----------------------------------------------------------------------------
9//
10// Multi-threaded worker
11//
12// Author: Skal (pascal.massimino@gmail.com)
13
14#include <assert.h>
15#include <string.h>   // for memset()
16#include "./thread.h"
17
18#ifdef WEBP_USE_THREAD
19
20#if defined(_WIN32)
21
22//------------------------------------------------------------------------------
23// simplistic pthread emulation layer
24
25#include <process.h>
26
27// _beginthreadex requires __stdcall
28#define THREADFN unsigned int __stdcall
29#define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val)
30
31static int pthread_create(pthread_t* const thread, const void* attr,
32                          unsigned int (__stdcall *start)(void*), void* arg) {
33  (void)attr;
34  *thread = (pthread_t)_beginthreadex(NULL,   /* void *security */
35                                      0,      /* unsigned stack_size */
36                                      start,
37                                      arg,
38                                      0,      /* unsigned initflag */
39                                      NULL);  /* unsigned *thrdaddr */
40  if (*thread == NULL) return 1;
41  SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL);
42  return 0;
43}
44
45static int pthread_join(pthread_t thread, void** value_ptr) {
46  (void)value_ptr;
47  return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 ||
48          CloseHandle(thread) == 0);
49}
50
51// Mutex
52static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) {
53  (void)mutexattr;
54  InitializeCriticalSection(mutex);
55  return 0;
56}
57
58static int pthread_mutex_lock(pthread_mutex_t* const mutex) {
59  EnterCriticalSection(mutex);
60  return 0;
61}
62
63static int pthread_mutex_unlock(pthread_mutex_t* const mutex) {
64  LeaveCriticalSection(mutex);
65  return 0;
66}
67
68static int pthread_mutex_destroy(pthread_mutex_t* const mutex) {
69  DeleteCriticalSection(mutex);
70  return 0;
71}
72
73// Condition
74static int pthread_cond_destroy(pthread_cond_t* const condition) {
75  int ok = 1;
76  ok &= (CloseHandle(condition->waiting_sem_) != 0);
77  ok &= (CloseHandle(condition->received_sem_) != 0);
78  ok &= (CloseHandle(condition->signal_event_) != 0);
79  return !ok;
80}
81
82static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) {
83  (void)cond_attr;
84  condition->waiting_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
85  condition->received_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
86  condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL);
87  if (condition->waiting_sem_ == NULL ||
88      condition->received_sem_ == NULL ||
89      condition->signal_event_ == NULL) {
90    pthread_cond_destroy(condition);
91    return 1;
92  }
93  return 0;
94}
95
96static int pthread_cond_signal(pthread_cond_t* const condition) {
97  int ok = 1;
98  if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) {
99    // a thread is waiting in pthread_cond_wait: allow it to be notified
100    ok = SetEvent(condition->signal_event_);
101    // wait until the event is consumed so the signaler cannot consume
102    // the event via its own pthread_cond_wait.
103    ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) !=
104           WAIT_OBJECT_0);
105  }
106  return !ok;
107}
108
109static int pthread_cond_wait(pthread_cond_t* const condition,
110                             pthread_mutex_t* const mutex) {
111  int ok;
112  // note that there is a consumer available so the signal isn't dropped in
113  // pthread_cond_signal
114  if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL))
115    return 1;
116  // now unlock the mutex so pthread_cond_signal may be issued
117  pthread_mutex_unlock(mutex);
118  ok = (WaitForSingleObject(condition->signal_event_, INFINITE) ==
119        WAIT_OBJECT_0);
120  ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL);
121  pthread_mutex_lock(mutex);
122  return !ok;
123}
124
125#else  // !_WIN32
126# define THREADFN void*
127# define THREAD_RETURN(val) val
128#endif  // _WIN32
129
130//------------------------------------------------------------------------------
131
132static THREADFN ThreadLoop(void* ptr) {
133  WebPWorker* const worker = (WebPWorker*)ptr;
134  int done = 0;
135  while (!done) {
136    pthread_mutex_lock(&worker->mutex_);
137    while (worker->status_ == OK) {   // wait in idling mode
138      pthread_cond_wait(&worker->condition_, &worker->mutex_);
139    }
140    if (worker->status_ == WORK) {
141      WebPWorkerExecute(worker);
142      worker->status_ = OK;
143    } else if (worker->status_ == NOT_OK) {   // finish the worker
144      done = 1;
145    }
146    // signal to the main thread that we're done (for Sync())
147    pthread_cond_signal(&worker->condition_);
148    pthread_mutex_unlock(&worker->mutex_);
149  }
150  return THREAD_RETURN(NULL);    // Thread is finished
151}
152
153// main thread state control
154static void ChangeState(WebPWorker* const worker,
155                        WebPWorkerStatus new_status) {
156  // no-op when attempting to change state on a thread that didn't come up
157  if (worker->status_ < OK) return;
158
159  pthread_mutex_lock(&worker->mutex_);
160  // wait for the worker to finish
161  while (worker->status_ != OK) {
162    pthread_cond_wait(&worker->condition_, &worker->mutex_);
163  }
164  // assign new status and release the working thread if needed
165  if (new_status != OK) {
166    worker->status_ = new_status;
167    pthread_cond_signal(&worker->condition_);
168  }
169  pthread_mutex_unlock(&worker->mutex_);
170}
171
172#endif  // WEBP_USE_THREAD
173
174//------------------------------------------------------------------------------
175
176void WebPWorkerInit(WebPWorker* const worker) {
177  memset(worker, 0, sizeof(*worker));
178  worker->status_ = NOT_OK;
179}
180
181int WebPWorkerSync(WebPWorker* const worker) {
182#ifdef WEBP_USE_THREAD
183  ChangeState(worker, OK);
184#endif
185  assert(worker->status_ <= OK);
186  return !worker->had_error;
187}
188
189int WebPWorkerReset(WebPWorker* const worker) {
190  int ok = 1;
191  worker->had_error = 0;
192  if (worker->status_ < OK) {
193#ifdef WEBP_USE_THREAD
194    if (pthread_mutex_init(&worker->mutex_, NULL) ||
195        pthread_cond_init(&worker->condition_, NULL)) {
196      return 0;
197    }
198    pthread_mutex_lock(&worker->mutex_);
199    ok = !pthread_create(&worker->thread_, NULL, ThreadLoop, worker);
200    if (ok) worker->status_ = OK;
201    pthread_mutex_unlock(&worker->mutex_);
202#else
203    worker->status_ = OK;
204#endif
205  } else if (worker->status_ > OK) {
206    ok = WebPWorkerSync(worker);
207  }
208  assert(!ok || (worker->status_ == OK));
209  return ok;
210}
211
212void WebPWorkerExecute(WebPWorker* const worker) {
213  if (worker->hook != NULL) {
214    worker->had_error |= !worker->hook(worker->data1, worker->data2);
215  }
216}
217
218void WebPWorkerLaunch(WebPWorker* const worker) {
219#ifdef WEBP_USE_THREAD
220  ChangeState(worker, WORK);
221#else
222  WebPWorkerExecute(worker);
223#endif
224}
225
226void WebPWorkerEnd(WebPWorker* const worker) {
227  if (worker->status_ >= OK) {
228#ifdef WEBP_USE_THREAD
229    ChangeState(worker, NOT_OK);
230    pthread_join(worker->thread_, NULL);
231    pthread_mutex_destroy(&worker->mutex_);
232    pthread_cond_destroy(&worker->condition_);
233#else
234    worker->status_ = NOT_OK;
235#endif
236  }
237  assert(worker->status_ == NOT_OK);
238}
239
240//------------------------------------------------------------------------------
241
242