1// Copyright (c) 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "sdk_util/thread_pool.h"
6
7#include <pthread.h>
8#include <semaphore.h>
9#include <stdio.h>
10#include <stdlib.h>
11
12#include "sdk_util/auto_lock.h"
13
14namespace sdk_util {
15
16// Initializes mutex, semaphores and a pool of threads.  If 0 is passed for
17// num_threads, all work will be performed on the dispatch thread.
18ThreadPool::ThreadPool(int num_threads)
19    : threads_(NULL), counter_(0), num_threads_(num_threads), exiting_(false),
20      user_data_(NULL), user_work_function_(NULL) {
21  if (num_threads_ > 0) {
22    int status;
23    status = sem_init(&work_sem_, 0, 0);
24    if (-1 == status) {
25      fprintf(stderr, "Failed to initialize semaphore!\n");
26      exit(-1);
27    }
28    status = sem_init(&done_sem_, 0, 0);
29    if (-1 == status) {
30      fprintf(stderr, "Failed to initialize semaphore!\n");
31      exit(-1);
32    }
33    threads_ = new pthread_t[num_threads_];
34    for (int i = 0; i < num_threads_; i++) {
35      status = pthread_create(&threads_[i], NULL, WorkerThreadEntry, this);
36      if (0 != status) {
37        fprintf(stderr, "Failed to create thread!\n");
38        exit(-1);
39      }
40    }
41  }
42}
43
44// Post exit request, wait for all threads to join, and cleanup.
45ThreadPool::~ThreadPool() {
46  if (num_threads_ > 0) {
47    PostExitAndJoinAll();
48    delete[] threads_;
49    sem_destroy(&done_sem_);
50    sem_destroy(&work_sem_);
51  }
52}
53
54// Setup work parameters.  This function is called from the dispatch thread,
55// when all worker threads are sleeping.
56void ThreadPool::Setup(int counter, WorkFunction work, void *data) {
57  counter_ = counter;
58  user_work_function_ = work;
59  user_data_ = data;
60}
61
62// Return decremented task counter.  This function
63// can be called from multiple threads at any given time.
64int ThreadPool::DecCounter() {
65  return AtomicAddFetch(&counter_, -1);
66}
67
68// Set exit flag, post and join all the threads in the pool.  This function is
69// called only from the dispatch thread, and only when all worker threads are
70// sleeping.
71void ThreadPool::PostExitAndJoinAll() {
72  exiting_ = true;
73  // Wake up all the sleeping worker threads.
74  for (int i = 0; i < num_threads_; ++i)
75    sem_post(&work_sem_);
76  void* retval;
77  for (int i = 0; i < num_threads_; ++i)
78    pthread_join(threads_[i], &retval);
79}
80
81// Main work loop - one for each worker thread.
82void ThreadPool::WorkLoop() {
83  while (true) {
84    // Wait for work. If no work is availble, this thread will sleep here.
85    sem_wait(&work_sem_);
86    if (exiting_) break;
87    while (true) {
88      // Grab a task index to work on from the counter.
89      int task_index = DecCounter();
90      if (task_index < 0)
91        break;
92      user_work_function_(task_index, user_data_);
93    }
94    // Post to dispatch thread work is done.
95    sem_post(&done_sem_);
96  }
97}
98
99// pthread entry point for a worker thread.
100void* ThreadPool::WorkerThreadEntry(void* thiz) {
101  static_cast<ThreadPool*>(thiz)->WorkLoop();
102  return NULL;
103}
104
105// DispatchMany() will dispatch a set of tasks across worker threads.
106// Note: This function will block until all work has completed.
107void ThreadPool::DispatchMany(int num_tasks, WorkFunction work, void* data) {
108  // On entry, all worker threads are sleeping.
109  Setup(num_tasks, work, data);
110
111  // Wake up the worker threads & have them process tasks.
112  for (int i = 0; i < num_threads_; i++)
113    sem_post(&work_sem_);
114
115  // Worker threads are now awake and busy.
116
117  // This dispatch thread will now sleep-wait for the worker threads to finish.
118  for (int i = 0; i < num_threads_; i++)
119    sem_wait(&done_sem_);
120  // On exit, all tasks are done and all worker threads are sleeping again.
121}
122
123//  DispatchHere will dispatch all tasks on this thread.
124void ThreadPool::DispatchHere(int num_tasks, WorkFunction work, void* data) {
125  for (int i = 0; i < num_tasks; i++)
126    work(i, data);
127}
128
129// Dispatch() will invoke the user supplied work function across
130// one or more threads for each task.
131// Note: This function will block until all work has completed.
132void ThreadPool::Dispatch(int num_tasks, WorkFunction work, void* data) {
133  if (num_threads_ > 0)
134    DispatchMany(num_tasks, work, data);
135  else
136    DispatchHere(num_tasks, work, data);
137}
138
139}  // namespace sdk_util
140
141