1868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// Copyright (c) 2013 The Chromium Authors. All rights reserved. 2868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// Use of this source code is governed by a BSD-style license that can be 3868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// found in the LICENSE file. 4868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) 5868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)#include "sdk_util/thread_pool.h" 6868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) 7868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)#include <pthread.h> 8868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)#include <semaphore.h> 9868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)#include <stdio.h> 10868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)#include <stdlib.h> 11868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) 12868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)#include "sdk_util/auto_lock.h" 13868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) 14ca12bfac764ba476d6cd062bf1dde12cc64c3f40Ben Murdochnamespace sdk_util { 15ca12bfac764ba476d6cd062bf1dde12cc64c3f40Ben Murdoch 16868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// Initializes mutex, semaphores and a pool of threads. If 0 is passed for 17868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// num_threads, all work will be performed on the dispatch thread. 18868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)ThreadPool::ThreadPool(int num_threads) 19868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) : threads_(NULL), counter_(0), num_threads_(num_threads), exiting_(false), 20868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) user_data_(NULL), user_work_function_(NULL) { 21868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) if (num_threads_ > 0) { 22868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) int status; 23868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) status = sem_init(&work_sem_, 0, 0); 24868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) if (-1 == status) { 25868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) fprintf(stderr, "Failed to initialize semaphore!\n"); 26868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) exit(-1); 27868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) } 28868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) status = sem_init(&done_sem_, 0, 0); 29868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) if (-1 == status) { 30868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) fprintf(stderr, "Failed to initialize semaphore!\n"); 31868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) exit(-1); 32868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) } 33868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) threads_ = new pthread_t[num_threads_]; 34868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) for (int i = 0; i < num_threads_; i++) { 35868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) status = pthread_create(&threads_[i], NULL, WorkerThreadEntry, this); 36868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) if (0 != status) { 37868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) fprintf(stderr, "Failed to create thread!\n"); 38868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) exit(-1); 39868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) } 40868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) } 41868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) } 42868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)} 43868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) 44868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// Post exit request, wait for all threads to join, and cleanup. 45868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)ThreadPool::~ThreadPool() { 46868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) if (num_threads_ > 0) { 47868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) PostExitAndJoinAll(); 48868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) delete[] threads_; 49868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) sem_destroy(&done_sem_); 50868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) sem_destroy(&work_sem_); 51868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) } 52868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)} 53868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) 54868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// Setup work parameters. This function is called from the dispatch thread, 55868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// when all worker threads are sleeping. 56868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)void ThreadPool::Setup(int counter, WorkFunction work, void *data) { 57868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) counter_ = counter; 58868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) user_work_function_ = work; 59868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) user_data_ = data; 60868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)} 61868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) 62868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// Return decremented task counter. This function 63868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// can be called from multiple threads at any given time. 64868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)int ThreadPool::DecCounter() { 65ca12bfac764ba476d6cd062bf1dde12cc64c3f40Ben Murdoch return AtomicAddFetch(&counter_, -1); 66868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)} 67868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) 68868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// Set exit flag, post and join all the threads in the pool. This function is 69868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// called only from the dispatch thread, and only when all worker threads are 70868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// sleeping. 71868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)void ThreadPool::PostExitAndJoinAll() { 72868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) exiting_ = true; 73868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) // Wake up all the sleeping worker threads. 74868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) for (int i = 0; i < num_threads_; ++i) 75868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) sem_post(&work_sem_); 76868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) void* retval; 77868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) for (int i = 0; i < num_threads_; ++i) 78868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) pthread_join(threads_[i], &retval); 79868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)} 80868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) 81868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// Main work loop - one for each worker thread. 82868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)void ThreadPool::WorkLoop() { 83868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) while (true) { 84868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) // Wait for work. If no work is availble, this thread will sleep here. 85868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) sem_wait(&work_sem_); 86868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) if (exiting_) break; 87868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) while (true) { 88868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) // Grab a task index to work on from the counter. 89868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) int task_index = DecCounter(); 90868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) if (task_index < 0) 91868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) break; 92868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) user_work_function_(task_index, user_data_); 93868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) } 94868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) // Post to dispatch thread work is done. 95868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) sem_post(&done_sem_); 96868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) } 97868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)} 98868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) 99868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// pthread entry point for a worker thread. 100868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)void* ThreadPool::WorkerThreadEntry(void* thiz) { 101868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) static_cast<ThreadPool*>(thiz)->WorkLoop(); 102868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) return NULL; 103868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)} 104868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) 105868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// DispatchMany() will dispatch a set of tasks across worker threads. 106868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// Note: This function will block until all work has completed. 107868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)void ThreadPool::DispatchMany(int num_tasks, WorkFunction work, void* data) { 108868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) // On entry, all worker threads are sleeping. 109868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) Setup(num_tasks, work, data); 110868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) 111868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) // Wake up the worker threads & have them process tasks. 112868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) for (int i = 0; i < num_threads_; i++) 113868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) sem_post(&work_sem_); 114868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) 115868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) // Worker threads are now awake and busy. 116868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) 117868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) // This dispatch thread will now sleep-wait for the worker threads to finish. 118868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) for (int i = 0; i < num_threads_; i++) 119868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) sem_wait(&done_sem_); 120868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) // On exit, all tasks are done and all worker threads are sleeping again. 121868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)} 122868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) 123868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// DispatchHere will dispatch all tasks on this thread. 124868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)void ThreadPool::DispatchHere(int num_tasks, WorkFunction work, void* data) { 125868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) for (int i = 0; i < num_tasks; i++) 126868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) work(i, data); 127868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)} 128868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) 129868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// Dispatch() will invoke the user supplied work function across 130868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// one or more threads for each task. 131868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// Note: This function will block until all work has completed. 132868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)void ThreadPool::Dispatch(int num_tasks, WorkFunction work, void* data) { 133868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) if (num_threads_ > 0) 134868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) DispatchMany(num_tasks, work, data); 135868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) else 136868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) DispatchHere(num_tasks, work, data); 137868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)} 138868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles) 139ca12bfac764ba476d6cd062bf1dde12cc64c3f40Ben Murdoch} // namespace sdk_util 140ca12bfac764ba476d6cd062bf1dde12cc64c3f40Ben Murdoch 141