1/* 2 This file is part of Valgrind, a dynamic binary instrumentation 3 framework. 4 5 Copyright (C) 2008-2008 Google Inc 6 opensource@google.com 7 8 This program is free software; you can redistribute it and/or 9 modify it under the terms of the GNU General Public License as 10 published by the Free Software Foundation; either version 2 of the 11 License, or (at your option) any later version. 12 13 This program is distributed in the hope that it will be useful, but 14 WITHOUT ANY WARRANTY; without even the implied warranty of 15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 16 General Public License for more details. 17 18 You should have received a copy of the GNU General Public License 19 along with this program; if not, write to the Free Software 20 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 21 02111-1307, USA. 22 23 The GNU General Public License is contained in the file COPYING. 24*/ 25 26// Author: Konstantin Serebryany <opensource@google.com> 27// 28// Here we define few simple classes that wrap pthread primitives. 29// 30// We need this to create unit tests for helgrind (or similar tool) 31// that will work with different threading frameworks. 32// 33// If one needs to test helgrind's support for another threading library, 34// he/she can create a copy of this file and replace pthread_ calls 35// with appropriate calls to his/her library. 36// 37// Note, that some of the methods defined here are annotated with 38// ANNOTATE_* macros defined in dynamic_annotations.h. 39// 40// DISCLAIMER: the classes defined in this header file 41// are NOT intended for general use -- only for unit tests. 42// 43 44#ifndef THREAD_WRAPPERS_PTHREAD_H 45#define THREAD_WRAPPERS_PTHREAD_H 46 47#include <pthread.h> 48#include <semaphore.h> 49#include <unistd.h> 50#include <queue> 51#include <stdio.h> 52#include <limits.h> // INT_MAX 53 54#ifdef VGO_darwin 55#include <libkern/OSAtomic.h> 56#define NO_BARRIER 57#define NO_TLS 58#endif 59 60#include <string> 61using namespace std; 62 63#include <sys/time.h> 64#include <time.h> 65 66#include "../../drd/drd.h" 67#define ANNOTATE_NO_OP(arg) do { } while(0) 68#define ANNOTATE_EXPECT_RACE(addr, descr) \ 69 ANNOTATE_BENIGN_RACE_SIZED(addr, 4, "expected race") 70static inline bool RunningOnValgrind() { return RUNNING_ON_VALGRIND; } 71 72#include <assert.h> 73#ifdef NDEBUG 74# error "Pleeease, do not define NDEBUG" 75#endif 76#define CHECK assert 77 78/// Set this to true if malloc() uses mutex on your platform as this may 79/// introduce a happens-before arc for a pure happens-before race detector. 80const bool kMallocUsesMutex = false; 81 82/// Current time in milliseconds. 83static inline int64_t GetCurrentTimeMillis() { 84 struct timeval now; 85 gettimeofday(&now, NULL); 86 return now.tv_sec * 1000 + now.tv_usec / 1000; 87} 88 89/// Copy tv to ts adding offset in milliseconds. 90static inline void timeval2timespec(timeval *const tv, 91 timespec *ts, 92 int64_t offset_milli) { 93 const int64_t ten_9 = 1000000000LL; 94 const int64_t ten_6 = 1000000LL; 95 const int64_t ten_3 = 1000LL; 96 int64_t now_nsec = (int64_t)tv->tv_sec * ten_9; 97 now_nsec += (int64_t)tv->tv_usec * ten_3; 98 int64_t then_nsec = now_nsec + offset_milli * ten_6; 99 ts->tv_sec = then_nsec / ten_9; 100 ts->tv_nsec = then_nsec % ten_9; 101} 102 103 104class CondVar; 105 106#ifndef NO_SPINLOCK 107/// helgrind does not (yet) support spin locks, so we annotate them. 108 109#ifndef VGO_darwin 110class SpinLock { 111 public: 112 SpinLock() { 113 CHECK(0 == pthread_spin_init(&mu_, 0)); 114 ANNOTATE_RWLOCK_CREATE((void*)&mu_); 115 } 116 ~SpinLock() { 117 ANNOTATE_RWLOCK_DESTROY((void*)&mu_); 118 CHECK(0 == pthread_spin_destroy(&mu_)); 119 } 120 void Lock() { 121 CHECK(0 == pthread_spin_lock(&mu_)); 122 ANNOTATE_RWLOCK_ACQUIRED((void*)&mu_, 1); 123 } 124 void Unlock() { 125 ANNOTATE_RWLOCK_RELEASED((void*)&mu_, 1); 126 CHECK(0 == pthread_spin_unlock(&mu_)); 127 } 128 private: 129 pthread_spinlock_t mu_; 130}; 131 132#else 133 134class SpinLock { 135 public: 136 // Mac OS X version. 137 SpinLock() : mu_(OS_SPINLOCK_INIT) { 138 ANNOTATE_RWLOCK_CREATE((void*)&mu_); 139 } 140 ~SpinLock() { 141 ANNOTATE_RWLOCK_DESTROY((void*)&mu_); 142 } 143 void Lock() { 144 OSSpinLockLock(&mu_); 145 ANNOTATE_RWLOCK_ACQUIRED((void*)&mu_, 1); 146 } 147 void Unlock() { 148 ANNOTATE_RWLOCK_RELEASED((void*)&mu_, 1); 149 OSSpinLockUnlock(&mu_); 150 } 151 private: 152 OSSpinLock mu_; 153}; 154#endif // VGO_darwin 155 156#endif // NO_SPINLOCK 157 158/// Just a boolean condition. Used by Mutex::LockWhen and similar. 159class Condition { 160 public: 161 typedef bool (*func_t)(void*); 162 163 template <typename T> 164 Condition(bool (*func)(T*), T* arg) 165 : func_(reinterpret_cast<func_t>(func)), arg_(arg) {} 166 167 Condition(bool (*func)()) 168 : func_(reinterpret_cast<func_t>(func)), arg_(NULL) {} 169 170 bool Eval() { return func_(arg_); } 171 private: 172 func_t func_; 173 void *arg_; 174 175}; 176 177 178/// Wrapper for pthread_mutex_t. 179/// 180/// pthread_mutex_t is *not* a reader-writer lock, 181/// so the methods like ReaderLock() aren't really reader locks. 182/// We can not use pthread_rwlock_t because it 183/// does not work with pthread_cond_t. 184/// 185/// TODO: We still need to test reader locks with this class. 186/// Implement a mode where pthread_rwlock_t will be used 187/// instead of pthread_mutex_t (only when not used with CondVar or LockWhen). 188/// 189class Mutex { 190 friend class CondVar; 191 public: 192 Mutex() { 193 CHECK(0 == pthread_mutex_init(&mu_, NULL)); 194 CHECK(0 == pthread_cond_init(&cv_, NULL)); 195 signal_at_unlock_ = true; // Always signal at Unlock to make 196 // Mutex more friendly to hybrid detectors. 197 } 198 ~Mutex() { 199 CHECK(0 == pthread_cond_destroy(&cv_)); 200 CHECK(0 == pthread_mutex_destroy(&mu_)); 201 } 202 void Lock() { CHECK(0 == pthread_mutex_lock(&mu_));} 203 bool TryLock() { return (0 == pthread_mutex_trylock(&mu_));} 204 void Unlock() { 205 if (signal_at_unlock_) { 206 CHECK(0 == pthread_cond_signal(&cv_)); 207 } 208 CHECK(0 == pthread_mutex_unlock(&mu_)); 209 } 210 void ReaderLock() { Lock(); } 211 bool ReaderTryLock() { return TryLock();} 212 void ReaderUnlock() { Unlock(); } 213 214 void LockWhen(Condition cond) { Lock(); WaitLoop(cond); } 215 void ReaderLockWhen(Condition cond) { Lock(); WaitLoop(cond); } 216 void Await(Condition cond) { WaitLoop(cond); } 217 218 bool ReaderLockWhenWithTimeout(Condition cond, int millis) 219 { Lock(); return WaitLoopWithTimeout(cond, millis); } 220 bool LockWhenWithTimeout(Condition cond, int millis) 221 { Lock(); return WaitLoopWithTimeout(cond, millis); } 222 bool AwaitWithTimeout(Condition cond, int millis) 223 { return WaitLoopWithTimeout(cond, millis); } 224 225 private: 226 227 void WaitLoop(Condition cond) { 228 signal_at_unlock_ = true; 229 while(cond.Eval() == false) { 230 pthread_cond_wait(&cv_, &mu_); 231 } 232 ANNOTATE_CONDVAR_LOCK_WAIT(&cv_, &mu_); 233 } 234 235 bool WaitLoopWithTimeout(Condition cond, int millis) { 236 struct timeval now; 237 struct timespec timeout; 238 int retcode = 0; 239 gettimeofday(&now, NULL); 240 timeval2timespec(&now, &timeout, millis); 241 242 signal_at_unlock_ = true; 243 while (cond.Eval() == false && retcode == 0) { 244 retcode = pthread_cond_timedwait(&cv_, &mu_, &timeout); 245 } 246 if(retcode == 0) { 247 ANNOTATE_CONDVAR_LOCK_WAIT(&cv_, &mu_); 248 } 249 return cond.Eval(); 250 } 251 252 // A hack. cv_ should be the first data member so that 253 // ANNOTATE_CONDVAR_WAIT(&MU, &MU) and ANNOTATE_CONDVAR_SIGNAL(&MU) works. 254 // (See also racecheck_unittest.cc) 255 pthread_cond_t cv_; 256 pthread_mutex_t mu_; 257 bool signal_at_unlock_; // Set to true if Wait was called. 258}; 259 260 261class MutexLock { // Scoped Mutex Locker/Unlocker 262 public: 263 MutexLock(Mutex *mu) 264 : mu_(mu) { 265 mu_->Lock(); 266 } 267 ~MutexLock() { 268 mu_->Unlock(); 269 } 270 private: 271 Mutex *mu_; 272}; 273 274 275/// Wrapper for pthread_cond_t. 276class CondVar { 277 public: 278 CondVar() { CHECK(0 == pthread_cond_init(&cv_, NULL)); } 279 ~CondVar() { CHECK(0 == pthread_cond_destroy(&cv_)); } 280 void Wait(Mutex *mu) { CHECK(0 == pthread_cond_wait(&cv_, &mu->mu_)); } 281 bool WaitWithTimeout(Mutex *mu, int millis) { 282 struct timeval now; 283 struct timespec timeout; 284 gettimeofday(&now, NULL); 285 timeval2timespec(&now, &timeout, millis); 286 return 0 != pthread_cond_timedwait(&cv_, &mu->mu_, &timeout); 287 } 288 void Signal() { CHECK(0 == pthread_cond_signal(&cv_)); } 289 void SignalAll() { CHECK(0 == pthread_cond_broadcast(&cv_)); } 290 private: 291 pthread_cond_t cv_; 292}; 293 294 295// pthreads do not allow to use condvar with rwlock so we can't make 296// ReaderLock method of Mutex to be the real rw-lock. 297// So, we need a special lock class to test reader locks. 298#define NEEDS_SEPERATE_RW_LOCK 299class RWLock { 300 public: 301 RWLock() { CHECK(0 == pthread_rwlock_init(&mu_, NULL)); } 302 ~RWLock() { CHECK(0 == pthread_rwlock_destroy(&mu_)); } 303 void Lock() { CHECK(0 == pthread_rwlock_wrlock(&mu_)); } 304 void ReaderLock() { CHECK(0 == pthread_rwlock_rdlock(&mu_)); } 305 void Unlock() { CHECK(0 == pthread_rwlock_unlock(&mu_)); } 306 void ReaderUnlock() { CHECK(0 == pthread_rwlock_unlock(&mu_)); } 307 private: 308 pthread_cond_t dummy; // Damn, this requires some redesign... 309 pthread_rwlock_t mu_; 310}; 311 312class ReaderLockScoped { // Scoped RWLock Locker/Unlocker 313 public: 314 ReaderLockScoped(RWLock *mu) 315 : mu_(mu) { 316 mu_->ReaderLock(); 317 } 318 ~ReaderLockScoped() { 319 mu_->ReaderUnlock(); 320 } 321 private: 322 RWLock *mu_; 323}; 324 325class WriterLockScoped { // Scoped RWLock Locker/Unlocker 326 public: 327 WriterLockScoped(RWLock *mu) 328 : mu_(mu) { 329 mu_->Lock(); 330 } 331 ~WriterLockScoped() { 332 mu_->Unlock(); 333 } 334 private: 335 RWLock *mu_; 336}; 337 338 339 340 341/// Wrapper for pthread_create()/pthread_join(). 342class MyThread { 343 public: 344 typedef void *(*worker_t)(void*); 345 346 MyThread(worker_t worker, void *arg = NULL, const char *name = NULL) 347 :w_(worker), arg_(arg), name_(name) {} 348 MyThread(void (*worker)(void), void *arg = NULL, const char *name = NULL) 349 :w_(reinterpret_cast<worker_t>(worker)), arg_(arg), name_(name) {} 350 MyThread(void (*worker)(void *), void *arg = NULL, const char *name = NULL) 351 :w_(reinterpret_cast<worker_t>(worker)), arg_(arg), name_(name) {} 352 353 ~MyThread(){ w_ = NULL; arg_ = NULL;} 354 void Start() { CHECK(0 == pthread_create(&t_, NULL, (worker_t)ThreadBody, this));} 355 void Join() { CHECK(0 == pthread_join(t_, NULL));} 356 pthread_t tid() const { return t_; } 357 private: 358 static void ThreadBody(MyThread *my_thread) { 359 if (my_thread->name_) { 360 ANNOTATE_THREAD_NAME(my_thread->name_); 361 } 362 my_thread->w_(my_thread->arg_); 363 } 364 pthread_t t_; 365 worker_t w_; 366 void *arg_; 367 const char *name_; 368}; 369 370 371/// Just a message queue. 372class ProducerConsumerQueue { 373 public: 374 ProducerConsumerQueue(int unused) { 375 //ANNOTATE_PCQ_CREATE(this); 376 } 377 ~ProducerConsumerQueue() { 378 CHECK(q_.empty()); 379 //ANNOTATE_PCQ_DESTROY(this); 380 } 381 382 // Put. 383 void Put(void *item) { 384 mu_.Lock(); 385 q_.push(item); 386 ANNOTATE_CONDVAR_SIGNAL(&mu_); // LockWhen in Get() 387 //ANNOTATE_PCQ_PUT(this); 388 mu_.Unlock(); 389 } 390 391 // Get. 392 // Blocks if the queue is empty. 393 void *Get() { 394 mu_.LockWhen(Condition(IsQueueNotEmpty, &q_)); 395 void * item = NULL; 396 bool ok = TryGetInternal(&item); 397 CHECK(ok); 398 mu_.Unlock(); 399 return item; 400 } 401 402 // If queue is not empty, 403 // remove an element from queue, put it into *res and return true. 404 // Otherwise return false. 405 bool TryGet(void **res) { 406 mu_.Lock(); 407 bool ok = TryGetInternal(res); 408 mu_.Unlock(); 409 return ok; 410 } 411 412 private: 413 Mutex mu_; 414 std::queue<void*> q_; // protected by mu_ 415 416 // Requires mu_ 417 bool TryGetInternal(void ** item_ptr) { 418 if (q_.empty()) 419 return false; 420 *item_ptr = q_.front(); 421 q_.pop(); 422 //ANNOTATE_PCQ_GET(this); 423 return true; 424 } 425 426 static bool IsQueueNotEmpty(std::queue<void*> * queue) { 427 return !queue->empty(); 428 } 429}; 430 431 432 433/// Function pointer with zero, one or two parameters. 434struct Closure { 435 typedef void (*F0)(); 436 typedef void (*F1)(void *arg1); 437 typedef void (*F2)(void *arg1, void *arg2); 438 int n_params; 439 void *f; 440 void *param1; 441 void *param2; 442 443 void Execute() { 444 if (n_params == 0) { 445 (F0(f))(); 446 } else if (n_params == 1) { 447 (F1(f))(param1); 448 } else { 449 CHECK(n_params == 2); 450 (F2(f))(param1, param2); 451 } 452 delete this; 453 } 454}; 455 456Closure *NewCallback(void (*f)()) { 457 Closure *res = new Closure; 458 res->n_params = 0; 459 res->f = (void*)(f); 460 res->param1 = NULL; 461 res->param2 = NULL; 462 return res; 463} 464 465template <class P1> 466Closure *NewCallback(void (*f)(P1), P1 p1) { 467 CHECK(sizeof(P1) <= sizeof(void*)); 468 Closure *res = new Closure; 469 res->n_params = 1; 470 res->f = (void*)(f); 471 res->param1 = (void*)p1; 472 res->param2 = NULL; 473 return res; 474} 475 476template <class T, class P1, class P2> 477Closure *NewCallback(void (*f)(P1, P2), P1 p1, P2 p2) { 478 CHECK(sizeof(P1) <= sizeof(void*)); 479 Closure *res = new Closure; 480 res->n_params = 2; 481 res->f = (void*)(f); 482 res->param1 = (void*)p1; 483 res->param2 = (void*)p2; 484 return res; 485} 486 487/*! A thread pool that uses ProducerConsumerQueue. 488 Usage: 489 { 490 ThreadPool pool(n_workers); 491 pool.StartWorkers(); 492 pool.Add(NewCallback(func_with_no_args)); 493 pool.Add(NewCallback(func_with_one_arg, arg)); 494 pool.Add(NewCallback(func_with_two_args, arg1, arg2)); 495 ... // more calls to pool.Add() 496 497 // the ~ThreadPool() is called: we wait workers to finish 498 // and then join all threads in the pool. 499 } 500*/ 501class ThreadPool { 502 public: 503 //! Create n_threads threads, but do not start. 504 explicit ThreadPool(int n_threads) 505 : queue_(INT_MAX) { 506 for (int i = 0; i < n_threads; i++) { 507 MyThread *thread = new MyThread(&ThreadPool::Worker, this); 508 workers_.push_back(thread); 509 } 510 } 511 512 //! Start all threads. 513 void StartWorkers() { 514 for (size_t i = 0; i < workers_.size(); i++) { 515 workers_[i]->Start(); 516 } 517 } 518 519 //! Add a closure. 520 void Add(Closure *closure) { 521 queue_.Put(closure); 522 } 523 524 int num_threads() { return workers_.size();} 525 526 //! Wait workers to finish, then join all threads. 527 ~ThreadPool() { 528 for (size_t i = 0; i < workers_.size(); i++) { 529 Add(NULL); 530 } 531 for (size_t i = 0; i < workers_.size(); i++) { 532 workers_[i]->Join(); 533 delete workers_[i]; 534 } 535 } 536 private: 537 std::vector<MyThread*> workers_; 538 ProducerConsumerQueue queue_; 539 540 static void *Worker(void *p) { 541 ThreadPool *pool = reinterpret_cast<ThreadPool*>(p); 542 while (true) { 543 Closure *closure = reinterpret_cast<Closure*>(pool->queue_.Get()); 544 if(closure == NULL) { 545 return NULL; 546 } 547 closure->Execute(); 548 } 549 } 550}; 551 552#ifndef NO_BARRIER 553/// Wrapper for pthread_barrier_t. 554class Barrier{ 555 public: 556 explicit Barrier(int n_threads) {CHECK(0 == pthread_barrier_init(&b_, 0, n_threads));} 557 ~Barrier() {CHECK(0 == pthread_barrier_destroy(&b_));} 558 void Block() { 559 // helgrind 3.3.0 does not have an interceptor for barrier. 560 // but our current local version does. 561 // ANNOTATE_CONDVAR_SIGNAL(this); 562 pthread_barrier_wait(&b_); 563 // ANNOTATE_CONDVAR_WAIT(this, this); 564 } 565 private: 566 pthread_barrier_t b_; 567}; 568 569#endif // NO_BARRIER 570 571class BlockingCounter { 572 public: 573 explicit BlockingCounter(int initial_count) : 574 count_(initial_count) {} 575 bool DecrementCount() { 576 MutexLock lock(&mu_); 577 count_--; 578 return count_ == 0; 579 } 580 void Wait() { 581 mu_.LockWhen(Condition(&IsZero, &count_)); 582 mu_.Unlock(); 583 } 584 private: 585 static bool IsZero(int *arg) { return *arg == 0; } 586 Mutex mu_; 587 int count_; 588}; 589 590int AtomicIncrement(volatile int *value, int increment); 591 592#ifndef VGO_darwin 593inline int AtomicIncrement(volatile int *value, int increment) { 594 return __sync_add_and_fetch(value, increment); 595} 596 597#else 598// Mac OS X version. 599inline int AtomicIncrement(volatile int *value, int increment) { 600 return OSAtomicAdd32(increment, value); 601} 602 603// TODO(timurrrr) this is a hack 604#define memalign(A,B) malloc(B) 605 606// TODO(timurrrr) this is a hack 607int posix_memalign(void **out, size_t al, size_t size) { 608 *out = memalign(al, size); 609 return (*out == 0); 610} 611#endif // VGO_darwin 612 613#endif // THREAD_WRAPPERS_PTHREAD_H 614// vim:shiftwidth=2:softtabstop=2:expandtab:foldmethod=marker 615