1/*
2  This file is part of ThreadSanitizer, a dynamic data race detector.
3
4  Copyright (C) 2008-2009 Google Inc
5     opensource@google.com
6
7  This program is free software; you can redistribute it and/or
8  modify it under the terms of the GNU General Public License as
9  published by the Free Software Foundation; either version 2 of the
10  License, or (at your option) any later version.
11
12  This program is distributed in the hope that it will be useful, but
13  WITHOUT ANY WARRANTY; without even the implied warranty of
14  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  General Public License for more details.
16
17  You should have received a copy of the GNU General Public License
18  along with this program; if not, write to the Free Software
19  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
20  02111-1307, USA.
21
22  The GNU General Public License is contained in the file COPYING.
23*/
24
25// Author: Konstantin Serebryany <opensource@google.com>
26//
27// Here we define a few simple classes that wrap threading primitives.
28//
29// We need this to create unit tests for ThreadSanitizer (or similar tools)
30// that will work with different threading frameworks.
31//
32// Note, that some of the methods defined here are annotated with
33// ANNOTATE_* macros defined in dynamic_annotations.h.
34//
35// DISCLAIMER: the classes defined in this header file
36// are NOT intended for general use -- only for unit tests.
37
38#ifndef THREAD_WRAPPERS_H
39#define THREAD_WRAPPERS_H
40
41#include <assert.h>
42#include <limits.h>   // INT_MAX
43#include <queue>
44#include <stdio.h>
45#include <string>
46#include <time.h>
47
48#include "dynamic_annotations.h"
49
50using namespace std;
51
52#ifdef NDEBUG
53# error "Pleeease, do not define NDEBUG"
54#endif
55
56#ifdef WIN32
57# define CHECK(x) do { if (!(x)) { \
58   fprintf(stderr, "Assertion failed: %s (%s:%d) %s\n", \
59          __FUNCTION__, __FILE__, __LINE__, #x); \
60   exit(1); }} while (0)
61#else
62# define CHECK assert
63#endif
64
65/// Just a boolean condition. Used by Mutex::LockWhen and similar.
66class Condition {
67 public:
68  typedef bool (*func_t)(void*);
69
70  template <typename T>
71  Condition(bool (*func)(T*), T* arg)
72  : func_(reinterpret_cast<func_t>(func)), arg_(arg) {}
73
74  Condition(bool (*func)())
75  : func_(reinterpret_cast<func_t>(func)), arg_(NULL) {}
76
77  bool Eval() { return func_(arg_); }
78 private:
79  func_t func_;
80  void *arg_;
81};
82
83// Define platform-specific types, constant and functions {{{1
84static int AtomicIncrement(volatile int *value, int increment);
85static int GetTimeInMs();
86
87class CondVar;
88class MyThread;
89class Mutex;
90//}}}
91
92// Include platform-specific header with declaraions.
93#ifndef WIN32
94// Include pthread primitives (Linux, Mac)
95#include "thread_wrappers_pthread.h"
96#else
97// Include Windows primitives
98#include "thread_wrappers_win.h"
99#endif
100
101// Define cross-platform types synchronization primitives {{{1
102/// Just a message queue.
103class ProducerConsumerQueue {
104 public:
105  ProducerConsumerQueue(int unused) {
106    //ANNOTATE_PCQ_CREATE(this);
107  }
108  ~ProducerConsumerQueue() {
109    CHECK(q_.empty());
110    //ANNOTATE_PCQ_DESTROY(this);
111  }
112
113  // Put.
114  void Put(void *item) {
115    mu_.Lock();
116      q_.push(item);
117      ANNOTATE_CONDVAR_SIGNAL(&mu_); // LockWhen in Get()
118      //ANNOTATE_PCQ_PUT(this);
119    mu_.Unlock();
120  }
121
122  // Get.
123  // Blocks if the queue is empty.
124  void *Get() {
125    mu_.LockWhen(Condition(IsQueueNotEmpty, &q_));
126      void * item;
127      bool ok = TryGetInternal(&item);
128      CHECK(ok);
129    mu_.Unlock();
130    return item;
131  }
132
133  // If queue is not empty,
134  // remove an element from queue, put it into *res and return true.
135  // Otherwise return false.
136  bool TryGet(void **res) {
137    mu_.Lock();
138      bool ok = TryGetInternal(res);
139    mu_.Unlock();
140    return ok;
141  }
142
143 private:
144  Mutex mu_;
145  std::queue<void*> q_; // protected by mu_
146
147  // Requires mu_
148  bool TryGetInternal(void ** item_ptr) {
149    if (q_.empty())
150      return false;
151    *item_ptr = q_.front();
152    q_.pop();
153    //ANNOTATE_PCQ_GET(this);
154    return true;
155  }
156
157  static bool IsQueueNotEmpty(std::queue<void*> * queue) {
158     return !queue->empty();
159  }
160};
161
162/// Function pointer with zero, one or two parameters.
163struct Closure {
164  typedef void (*F0)();
165  typedef void (*F1)(void *arg1);
166  typedef void (*F2)(void *arg1, void *arg2);
167  int  n_params;
168  void *f;
169  void *param1;
170  void *param2;
171
172  void Execute() {
173    if (n_params == 0) {
174      (F0(f))();
175    } else if (n_params == 1) {
176      (F1(f))(param1);
177    } else {
178      CHECK(n_params == 2);
179      (F2(f))(param1, param2);
180    }
181    delete this;
182  }
183};
184
185static Closure *NewCallback(void (*f)()) {
186  Closure *res = new Closure;
187  res->n_params = 0;
188  res->f = (void*)(f);
189  res->param1 = NULL;
190  res->param2 = NULL;
191  return res;
192}
193
194template <class P1>
195Closure *NewCallback(void (*f)(P1), P1 p1) {
196  CHECK(sizeof(P1) <= sizeof(void*));
197  Closure *res = new Closure;
198  res->n_params = 1;
199  res->f = (void*)(f);
200  res->param1 = (void*)(intptr_t)p1;
201  res->param2 = NULL;
202  return res;
203}
204
205template <class P1, class P2>
206Closure *NewCallback(void (*f)(P1, P2), P1 p1, P2 p2) {
207  CHECK(sizeof(P1) <= sizeof(void*));
208  CHECK(sizeof(P2) <= sizeof(void*));
209  Closure *res = new Closure;
210  res->n_params = 2;
211  res->f = (void*)(f);
212  res->param1 = (void*)p1;
213  res->param2 = (void*)p2;
214  return res;
215}
216
217/*! A thread pool that uses ProducerConsumerQueue.
218  Usage:
219  {
220    ThreadPool pool(n_workers);
221    pool.StartWorkers();
222    pool.Add(NewCallback(func_with_no_args));
223    pool.Add(NewCallback(func_with_one_arg, arg));
224    pool.Add(NewCallback(func_with_two_args, arg1, arg2));
225    ... // more calls to pool.Add()
226
227    // the ~ThreadPool() is called: we wait workers to finish
228    // and then join all threads in the pool.
229  }
230*/
231class ThreadPool {
232 public:
233  //! Create n_threads threads, but do not start.
234  explicit ThreadPool(int n_threads)
235    : queue_(INT_MAX) {
236    for (int i = 0; i < n_threads; i++) {
237      MyThread *thread = new MyThread(&ThreadPool::Worker, this);
238      workers_.push_back(thread);
239    }
240  }
241
242  //! Start all threads.
243  void StartWorkers() {
244    for (size_t i = 0; i < workers_.size(); i++) {
245      workers_[i]->Start();
246    }
247  }
248
249  //! Add a closure.
250  void Add(Closure *closure) {
251    queue_.Put(closure);
252  }
253
254  int num_threads() { return workers_.size();}
255
256  //! Wait workers to finish, then join all threads.
257  ~ThreadPool() {
258    for (size_t i = 0; i < workers_.size(); i++) {
259      Add(NULL);
260    }
261    for (size_t i = 0; i < workers_.size(); i++) {
262      workers_[i]->Join();
263      delete workers_[i];
264    }
265  }
266 private:
267  std::vector<MyThread*>   workers_;
268  ProducerConsumerQueue  queue_;
269
270  static void *Worker(void *p) {
271    ThreadPool *pool = reinterpret_cast<ThreadPool*>(p);
272    while (true) {
273      Closure *closure = reinterpret_cast<Closure*>(pool->queue_.Get());
274      if(closure == NULL) {
275        return NULL;
276      }
277      closure->Execute();
278    }
279  }
280};
281
282class MutexLock {  // Scoped Mutex Locker/Unlocker
283 public:
284  MutexLock(Mutex *mu)
285    : mu_(mu) {
286    mu_->Lock();
287  }
288  ~MutexLock() {
289    mu_->Unlock();
290  }
291 private:
292  Mutex *mu_;
293};
294
295class BlockingCounter {
296 public:
297  explicit BlockingCounter(int initial_count) :
298    count_(initial_count) {}
299  bool DecrementCount() {
300    MutexLock lock(&mu_);
301    count_--;
302    return count_ == 0;
303  }
304  void Wait() {
305    mu_.LockWhen(Condition(&IsZero, &count_));
306    mu_.Unlock();
307  }
308 private:
309  static bool IsZero(int *arg) { return *arg == 0; }
310  Mutex mu_;
311  int count_;
312};
313
314//}}}
315
316#endif // THREAD_WRAPPERS_H
317// vim:shiftwidth=2:softtabstop=2:expandtab:foldmethod=marker
318