1// Copyright (c) 2013 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "sdk_util/thread_pool.h" 6 7#include <pthread.h> 8#include <semaphore.h> 9#include <stdio.h> 10#include <stdlib.h> 11 12#include "sdk_util/auto_lock.h" 13 14namespace sdk_util { 15 16// Initializes mutex, semaphores and a pool of threads. If 0 is passed for 17// num_threads, all work will be performed on the dispatch thread. 18ThreadPool::ThreadPool(int num_threads) 19 : threads_(NULL), counter_(0), num_threads_(num_threads), exiting_(false), 20 user_data_(NULL), user_work_function_(NULL) { 21 if (num_threads_ > 0) { 22 int status; 23 status = sem_init(&work_sem_, 0, 0); 24 if (-1 == status) { 25 fprintf(stderr, "Failed to initialize semaphore!\n"); 26 exit(-1); 27 } 28 status = sem_init(&done_sem_, 0, 0); 29 if (-1 == status) { 30 fprintf(stderr, "Failed to initialize semaphore!\n"); 31 exit(-1); 32 } 33 threads_ = new pthread_t[num_threads_]; 34 for (int i = 0; i < num_threads_; i++) { 35 status = pthread_create(&threads_[i], NULL, WorkerThreadEntry, this); 36 if (0 != status) { 37 fprintf(stderr, "Failed to create thread!\n"); 38 exit(-1); 39 } 40 } 41 } 42} 43 44// Post exit request, wait for all threads to join, and cleanup. 45ThreadPool::~ThreadPool() { 46 if (num_threads_ > 0) { 47 PostExitAndJoinAll(); 48 delete[] threads_; 49 sem_destroy(&done_sem_); 50 sem_destroy(&work_sem_); 51 } 52} 53 54// Setup work parameters. This function is called from the dispatch thread, 55// when all worker threads are sleeping. 56void ThreadPool::Setup(int counter, WorkFunction work, void *data) { 57 counter_ = counter; 58 user_work_function_ = work; 59 user_data_ = data; 60} 61 62// Return decremented task counter. This function 63// can be called from multiple threads at any given time. 64int ThreadPool::DecCounter() { 65 return AtomicAddFetch(&counter_, -1); 66} 67 68// Set exit flag, post and join all the threads in the pool. This function is 69// called only from the dispatch thread, and only when all worker threads are 70// sleeping. 71void ThreadPool::PostExitAndJoinAll() { 72 exiting_ = true; 73 // Wake up all the sleeping worker threads. 74 for (int i = 0; i < num_threads_; ++i) 75 sem_post(&work_sem_); 76 void* retval; 77 for (int i = 0; i < num_threads_; ++i) 78 pthread_join(threads_[i], &retval); 79} 80 81// Main work loop - one for each worker thread. 82void ThreadPool::WorkLoop() { 83 while (true) { 84 // Wait for work. If no work is availble, this thread will sleep here. 85 sem_wait(&work_sem_); 86 if (exiting_) break; 87 while (true) { 88 // Grab a task index to work on from the counter. 89 int task_index = DecCounter(); 90 if (task_index < 0) 91 break; 92 user_work_function_(task_index, user_data_); 93 } 94 // Post to dispatch thread work is done. 95 sem_post(&done_sem_); 96 } 97} 98 99// pthread entry point for a worker thread. 100void* ThreadPool::WorkerThreadEntry(void* thiz) { 101 static_cast<ThreadPool*>(thiz)->WorkLoop(); 102 return NULL; 103} 104 105// DispatchMany() will dispatch a set of tasks across worker threads. 106// Note: This function will block until all work has completed. 107void ThreadPool::DispatchMany(int num_tasks, WorkFunction work, void* data) { 108 // On entry, all worker threads are sleeping. 109 Setup(num_tasks, work, data); 110 111 // Wake up the worker threads & have them process tasks. 112 for (int i = 0; i < num_threads_; i++) 113 sem_post(&work_sem_); 114 115 // Worker threads are now awake and busy. 116 117 // This dispatch thread will now sleep-wait for the worker threads to finish. 118 for (int i = 0; i < num_threads_; i++) 119 sem_wait(&done_sem_); 120 // On exit, all tasks are done and all worker threads are sleeping again. 121} 122 123// DispatchHere will dispatch all tasks on this thread. 124void ThreadPool::DispatchHere(int num_tasks, WorkFunction work, void* data) { 125 for (int i = 0; i < num_tasks; i++) 126 work(i, data); 127} 128 129// Dispatch() will invoke the user supplied work function across 130// one or more threads for each task. 131// Note: This function will block until all work has completed. 132void ThreadPool::Dispatch(int num_tasks, WorkFunction work, void* data) { 133 if (num_threads_ > 0) 134 DispatchMany(num_tasks, work, data); 135 else 136 DispatchHere(num_tasks, work, data); 137} 138 139} // namespace sdk_util 140 141