1// Copyright 2011 Google Inc. All Rights Reserved.
2//
3// Use of this source code is governed by a BSD-style license
4// that can be found in the COPYING file in the root of the source
5// tree. An additional intellectual property rights grant can be found
6// in the file PATENTS. All contributing project authors may
7// be found in the AUTHORS file in the root of the source tree.
8// -----------------------------------------------------------------------------
9//
10// Multi-threaded worker
11//
12// Author: Skal (pascal.massimino@gmail.com)
13
14#include <assert.h>
15#include <string.h>   // for memset()
16#include "./thread.h"
17#include "./utils.h"
18
19#ifdef WEBP_USE_THREAD
20
21#if defined(_WIN32)
22
23#include <windows.h>
24typedef HANDLE pthread_t;
25typedef CRITICAL_SECTION pthread_mutex_t;
26typedef struct {
27  HANDLE waiting_sem_;
28  HANDLE received_sem_;
29  HANDLE signal_event_;
30} pthread_cond_t;
31
32#else  // !_WIN32
33
34#include <pthread.h>
35
36#endif  // _WIN32
37
38struct WebPWorkerImpl {
39  pthread_mutex_t mutex_;
40  pthread_cond_t  condition_;
41  pthread_t       thread_;
42};
43
44#if defined(_WIN32)
45
46//------------------------------------------------------------------------------
47// simplistic pthread emulation layer
48
49#include <process.h>
50
51// _beginthreadex requires __stdcall
52#define THREADFN unsigned int __stdcall
53#define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val)
54
55static int pthread_create(pthread_t* const thread, const void* attr,
56                          unsigned int (__stdcall *start)(void*), void* arg) {
57  (void)attr;
58  *thread = (pthread_t)_beginthreadex(NULL,   /* void *security */
59                                      0,      /* unsigned stack_size */
60                                      start,
61                                      arg,
62                                      0,      /* unsigned initflag */
63                                      NULL);  /* unsigned *thrdaddr */
64  if (*thread == NULL) return 1;
65  SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL);
66  return 0;
67}
68
69static int pthread_join(pthread_t thread, void** value_ptr) {
70  (void)value_ptr;
71  return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 ||
72          CloseHandle(thread) == 0);
73}
74
75// Mutex
76static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) {
77  (void)mutexattr;
78  InitializeCriticalSection(mutex);
79  return 0;
80}
81
82static int pthread_mutex_lock(pthread_mutex_t* const mutex) {
83  EnterCriticalSection(mutex);
84  return 0;
85}
86
87static int pthread_mutex_unlock(pthread_mutex_t* const mutex) {
88  LeaveCriticalSection(mutex);
89  return 0;
90}
91
92static int pthread_mutex_destroy(pthread_mutex_t* const mutex) {
93  DeleteCriticalSection(mutex);
94  return 0;
95}
96
97// Condition
98static int pthread_cond_destroy(pthread_cond_t* const condition) {
99  int ok = 1;
100  ok &= (CloseHandle(condition->waiting_sem_) != 0);
101  ok &= (CloseHandle(condition->received_sem_) != 0);
102  ok &= (CloseHandle(condition->signal_event_) != 0);
103  return !ok;
104}
105
106static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) {
107  (void)cond_attr;
108  condition->waiting_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
109  condition->received_sem_ = CreateSemaphore(NULL, 0, 1, NULL);
110  condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL);
111  if (condition->waiting_sem_ == NULL ||
112      condition->received_sem_ == NULL ||
113      condition->signal_event_ == NULL) {
114    pthread_cond_destroy(condition);
115    return 1;
116  }
117  return 0;
118}
119
120static int pthread_cond_signal(pthread_cond_t* const condition) {
121  int ok = 1;
122  if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) {
123    // a thread is waiting in pthread_cond_wait: allow it to be notified
124    ok = SetEvent(condition->signal_event_);
125    // wait until the event is consumed so the signaler cannot consume
126    // the event via its own pthread_cond_wait.
127    ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) !=
128           WAIT_OBJECT_0);
129  }
130  return !ok;
131}
132
133static int pthread_cond_wait(pthread_cond_t* const condition,
134                             pthread_mutex_t* const mutex) {
135  int ok;
136  // note that there is a consumer available so the signal isn't dropped in
137  // pthread_cond_signal
138  if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL))
139    return 1;
140  // now unlock the mutex so pthread_cond_signal may be issued
141  pthread_mutex_unlock(mutex);
142  ok = (WaitForSingleObject(condition->signal_event_, INFINITE) ==
143        WAIT_OBJECT_0);
144  ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL);
145  pthread_mutex_lock(mutex);
146  return !ok;
147}
148
149#else  // !_WIN32
150# define THREADFN void*
151# define THREAD_RETURN(val) val
152#endif  // _WIN32
153
154//------------------------------------------------------------------------------
155
156static void Execute(WebPWorker* const worker);  // Forward declaration.
157
158static THREADFN ThreadLoop(void* ptr) {
159  WebPWorker* const worker = (WebPWorker*)ptr;
160  int done = 0;
161  while (!done) {
162    pthread_mutex_lock(&worker->impl_->mutex_);
163    while (worker->status_ == OK) {   // wait in idling mode
164      pthread_cond_wait(&worker->impl_->condition_, &worker->impl_->mutex_);
165    }
166    if (worker->status_ == WORK) {
167      Execute(worker);
168      worker->status_ = OK;
169    } else if (worker->status_ == NOT_OK) {   // finish the worker
170      done = 1;
171    }
172    // signal to the main thread that we're done (for Sync())
173    pthread_cond_signal(&worker->impl_->condition_);
174    pthread_mutex_unlock(&worker->impl_->mutex_);
175  }
176  return THREAD_RETURN(NULL);    // Thread is finished
177}
178
179// main thread state control
180static void ChangeState(WebPWorker* const worker,
181                        WebPWorkerStatus new_status) {
182  // No-op when attempting to change state on a thread that didn't come up.
183  // Checking status_ without acquiring the lock first would result in a data
184  // race.
185  if (worker->impl_ == NULL) return;
186
187  pthread_mutex_lock(&worker->impl_->mutex_);
188  if (worker->status_ >= OK) {
189    // wait for the worker to finish
190    while (worker->status_ != OK) {
191      pthread_cond_wait(&worker->impl_->condition_, &worker->impl_->mutex_);
192    }
193    // assign new status and release the working thread if needed
194    if (new_status != OK) {
195      worker->status_ = new_status;
196      pthread_cond_signal(&worker->impl_->condition_);
197    }
198  }
199  pthread_mutex_unlock(&worker->impl_->mutex_);
200}
201
202#endif  // WEBP_USE_THREAD
203
204//------------------------------------------------------------------------------
205
206static void Init(WebPWorker* const worker) {
207  memset(worker, 0, sizeof(*worker));
208  worker->status_ = NOT_OK;
209}
210
211static int Sync(WebPWorker* const worker) {
212#ifdef WEBP_USE_THREAD
213  ChangeState(worker, OK);
214#endif
215  assert(worker->status_ <= OK);
216  return !worker->had_error;
217}
218
219static int Reset(WebPWorker* const worker) {
220  int ok = 1;
221  worker->had_error = 0;
222  if (worker->status_ < OK) {
223#ifdef WEBP_USE_THREAD
224    worker->impl_ = (WebPWorkerImpl*)WebPSafeCalloc(1, sizeof(*worker->impl_));
225    if (worker->impl_ == NULL) {
226      return 0;
227    }
228    if (pthread_mutex_init(&worker->impl_->mutex_, NULL)) {
229      goto Error;
230    }
231    if (pthread_cond_init(&worker->impl_->condition_, NULL)) {
232      pthread_mutex_destroy(&worker->impl_->mutex_);
233      goto Error;
234    }
235    pthread_mutex_lock(&worker->impl_->mutex_);
236    ok = !pthread_create(&worker->impl_->thread_, NULL, ThreadLoop, worker);
237    if (ok) worker->status_ = OK;
238    pthread_mutex_unlock(&worker->impl_->mutex_);
239    if (!ok) {
240      pthread_mutex_destroy(&worker->impl_->mutex_);
241      pthread_cond_destroy(&worker->impl_->condition_);
242 Error:
243      WebPSafeFree(worker->impl_);
244      worker->impl_ = NULL;
245      return 0;
246    }
247#else
248    worker->status_ = OK;
249#endif
250  } else if (worker->status_ > OK) {
251    ok = Sync(worker);
252  }
253  assert(!ok || (worker->status_ == OK));
254  return ok;
255}
256
257static void Execute(WebPWorker* const worker) {
258  if (worker->hook != NULL) {
259    worker->had_error |= !worker->hook(worker->data1, worker->data2);
260  }
261}
262
263static void Launch(WebPWorker* const worker) {
264#ifdef WEBP_USE_THREAD
265  ChangeState(worker, WORK);
266#else
267  Execute(worker);
268#endif
269}
270
271static void End(WebPWorker* const worker) {
272#ifdef WEBP_USE_THREAD
273  if (worker->impl_ != NULL) {
274    ChangeState(worker, NOT_OK);
275    pthread_join(worker->impl_->thread_, NULL);
276    pthread_mutex_destroy(&worker->impl_->mutex_);
277    pthread_cond_destroy(&worker->impl_->condition_);
278    WebPSafeFree(worker->impl_);
279    worker->impl_ = NULL;
280  }
281#else
282  worker->status_ = NOT_OK;
283  assert(worker->impl_ == NULL);
284#endif
285  assert(worker->status_ == NOT_OK);
286}
287
288//------------------------------------------------------------------------------
289
290static WebPWorkerInterface g_worker_interface = {
291  Init, Reset, Sync, Launch, Execute, End
292};
293
294int WebPSetWorkerInterface(const WebPWorkerInterface* const winterface) {
295  if (winterface == NULL ||
296      winterface->Init == NULL || winterface->Reset == NULL ||
297      winterface->Sync == NULL || winterface->Launch == NULL ||
298      winterface->Execute == NULL || winterface->End == NULL) {
299    return 0;
300  }
301  g_worker_interface = *winterface;
302  return 1;
303}
304
305const WebPWorkerInterface* WebPGetWorkerInterface(void) {
306  return &g_worker_interface;
307}
308
309//------------------------------------------------------------------------------
310