1868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// Copyright (c) 2013 The Chromium Authors. All rights reserved.
2868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// Use of this source code is governed by a BSD-style license that can be
3868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// found in the LICENSE file.
4868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)
5868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)#include "sdk_util/thread_pool.h"
6868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)
7868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)#include <pthread.h>
8868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)#include <semaphore.h>
9868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)#include <stdio.h>
10868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)#include <stdlib.h>
11868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)
12868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)#include "sdk_util/auto_lock.h"
13868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)
14ca12bfac764ba476d6cd062bf1dde12cc64c3f40Ben Murdochnamespace sdk_util {
15ca12bfac764ba476d6cd062bf1dde12cc64c3f40Ben Murdoch
16868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// Initializes mutex, semaphores and a pool of threads.  If 0 is passed for
17868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// num_threads, all work will be performed on the dispatch thread.
18868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)ThreadPool::ThreadPool(int num_threads)
19868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    : threads_(NULL), counter_(0), num_threads_(num_threads), exiting_(false),
20868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)      user_data_(NULL), user_work_function_(NULL) {
21868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  if (num_threads_ > 0) {
22868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    int status;
23868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    status = sem_init(&work_sem_, 0, 0);
24868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    if (-1 == status) {
25868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)      fprintf(stderr, "Failed to initialize semaphore!\n");
26868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)      exit(-1);
27868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    }
28868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    status = sem_init(&done_sem_, 0, 0);
29868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    if (-1 == status) {
30868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)      fprintf(stderr, "Failed to initialize semaphore!\n");
31868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)      exit(-1);
32868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    }
33868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    threads_ = new pthread_t[num_threads_];
34868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    for (int i = 0; i < num_threads_; i++) {
35868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)      status = pthread_create(&threads_[i], NULL, WorkerThreadEntry, this);
36868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)      if (0 != status) {
37868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)        fprintf(stderr, "Failed to create thread!\n");
38868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)        exit(-1);
39868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)      }
40868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    }
41868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  }
42868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)}
43868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)
44868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// Post exit request, wait for all threads to join, and cleanup.
45868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)ThreadPool::~ThreadPool() {
46868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  if (num_threads_ > 0) {
47868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    PostExitAndJoinAll();
48868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    delete[] threads_;
49868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    sem_destroy(&done_sem_);
50868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    sem_destroy(&work_sem_);
51868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  }
52868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)}
53868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)
54868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// Setup work parameters.  This function is called from the dispatch thread,
55868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// when all worker threads are sleeping.
56868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)void ThreadPool::Setup(int counter, WorkFunction work, void *data) {
57868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  counter_ = counter;
58868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  user_work_function_ = work;
59868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  user_data_ = data;
60868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)}
61868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)
62868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// Return decremented task counter.  This function
63868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// can be called from multiple threads at any given time.
64868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)int ThreadPool::DecCounter() {
65ca12bfac764ba476d6cd062bf1dde12cc64c3f40Ben Murdoch  return AtomicAddFetch(&counter_, -1);
66868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)}
67868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)
68868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// Set exit flag, post and join all the threads in the pool.  This function is
69868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// called only from the dispatch thread, and only when all worker threads are
70868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// sleeping.
71868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)void ThreadPool::PostExitAndJoinAll() {
72868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  exiting_ = true;
73868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  // Wake up all the sleeping worker threads.
74868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  for (int i = 0; i < num_threads_; ++i)
75868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    sem_post(&work_sem_);
76868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  void* retval;
77868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  for (int i = 0; i < num_threads_; ++i)
78868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    pthread_join(threads_[i], &retval);
79868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)}
80868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)
81868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// Main work loop - one for each worker thread.
82868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)void ThreadPool::WorkLoop() {
83868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  while (true) {
84868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    // Wait for work. If no work is availble, this thread will sleep here.
85868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    sem_wait(&work_sem_);
86868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    if (exiting_) break;
87868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    while (true) {
88868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)      // Grab a task index to work on from the counter.
89868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)      int task_index = DecCounter();
90868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)      if (task_index < 0)
91868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)        break;
92868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)      user_work_function_(task_index, user_data_);
93868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    }
94868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    // Post to dispatch thread work is done.
95868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    sem_post(&done_sem_);
96868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  }
97868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)}
98868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)
99868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// pthread entry point for a worker thread.
100868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)void* ThreadPool::WorkerThreadEntry(void* thiz) {
101868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  static_cast<ThreadPool*>(thiz)->WorkLoop();
102868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  return NULL;
103868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)}
104868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)
105868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// DispatchMany() will dispatch a set of tasks across worker threads.
106868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// Note: This function will block until all work has completed.
107868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)void ThreadPool::DispatchMany(int num_tasks, WorkFunction work, void* data) {
108868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  // On entry, all worker threads are sleeping.
109868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  Setup(num_tasks, work, data);
110868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)
111868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  // Wake up the worker threads & have them process tasks.
112868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  for (int i = 0; i < num_threads_; i++)
113868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    sem_post(&work_sem_);
114868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)
115868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  // Worker threads are now awake and busy.
116868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)
117868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  // This dispatch thread will now sleep-wait for the worker threads to finish.
118868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  for (int i = 0; i < num_threads_; i++)
119868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    sem_wait(&done_sem_);
120868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  // On exit, all tasks are done and all worker threads are sleeping again.
121868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)}
122868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)
123868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)//  DispatchHere will dispatch all tasks on this thread.
124868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)void ThreadPool::DispatchHere(int num_tasks, WorkFunction work, void* data) {
125868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  for (int i = 0; i < num_tasks; i++)
126868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    work(i, data);
127868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)}
128868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)
129868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// Dispatch() will invoke the user supplied work function across
130868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// one or more threads for each task.
131868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)// Note: This function will block until all work has completed.
132868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)void ThreadPool::Dispatch(int num_tasks, WorkFunction work, void* data) {
133868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  if (num_threads_ > 0)
134868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    DispatchMany(num_tasks, work, data);
135868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)  else
136868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)    DispatchHere(num_tasks, work, data);
137868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)}
138868fa2fe829687343ffae624259930155e16dbd8Torne (Richard Coles)
139ca12bfac764ba476d6cd062bf1dde12cc64c3f40Ben Murdoch}  // namespace sdk_util
140ca12bfac764ba476d6cd062bf1dde12cc64c3f40Ben Murdoch
141