1// Copyright (c) 2010 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "base/threading/worker_pool_posix.h"
6
7#include <set>
8
9#include "base/synchronization/condition_variable.h"
10#include "base/synchronization/lock.h"
11#include "base/task.h"
12#include "base/threading/platform_thread.h"
13#include "base/synchronization/waitable_event.h"
14#include "testing/gtest/include/gtest/gtest.h"
15
16namespace base {
17
18// Peer class to provide passthrough access to PosixDynamicThreadPool internals.
19class PosixDynamicThreadPool::PosixDynamicThreadPoolPeer {
20 public:
21  explicit PosixDynamicThreadPoolPeer(PosixDynamicThreadPool* pool)
22      : pool_(pool) {}
23
24  Lock* lock() { return &pool_->lock_; }
25  ConditionVariable* tasks_available_cv() {
26    return &pool_->tasks_available_cv_;
27  }
28  const std::queue<Task*>& tasks() const { return pool_->tasks_; }
29  int num_idle_threads() const { return pool_->num_idle_threads_; }
30  ConditionVariable* num_idle_threads_cv() {
31    return pool_->num_idle_threads_cv_.get();
32  }
33  void set_num_idle_threads_cv(ConditionVariable* cv) {
34    pool_->num_idle_threads_cv_.reset(cv);
35  }
36
37 private:
38  PosixDynamicThreadPool* pool_;
39
40  DISALLOW_COPY_AND_ASSIGN(PosixDynamicThreadPoolPeer);
41};
42
43namespace {
44
45// IncrementingTask's main purpose is to increment a counter.  It also updates a
46// set of unique thread ids, and signals a ConditionVariable on completion.
47// Note that since it does not block, there is no way to control the number of
48// threads used if more than one IncrementingTask is consecutively posted to the
49// thread pool, since the first one might finish executing before the subsequent
50// PostTask() calls get invoked.
51class IncrementingTask : public Task {
52 public:
53  IncrementingTask(Lock* counter_lock,
54                   int* counter,
55                   Lock* unique_threads_lock,
56                   std::set<PlatformThreadId>* unique_threads)
57      : counter_lock_(counter_lock),
58        unique_threads_lock_(unique_threads_lock),
59        unique_threads_(unique_threads),
60        counter_(counter) {}
61
62  virtual void Run() {
63    AddSelfToUniqueThreadSet();
64    base::AutoLock locked(*counter_lock_);
65    (*counter_)++;
66  }
67
68  void AddSelfToUniqueThreadSet() {
69    base::AutoLock locked(*unique_threads_lock_);
70    unique_threads_->insert(PlatformThread::CurrentId());
71  }
72
73 private:
74  Lock* counter_lock_;
75  Lock* unique_threads_lock_;
76  std::set<PlatformThreadId>* unique_threads_;
77  int* counter_;
78
79  DISALLOW_COPY_AND_ASSIGN(IncrementingTask);
80};
81
82// BlockingIncrementingTask is a simple wrapper around IncrementingTask that
83// allows for waiting at the start of Run() for a WaitableEvent to be signalled.
84class BlockingIncrementingTask : public Task {
85 public:
86  BlockingIncrementingTask(Lock* counter_lock,
87                           int* counter,
88                           Lock* unique_threads_lock,
89                           std::set<PlatformThreadId>* unique_threads,
90                           Lock* num_waiting_to_start_lock,
91                           int* num_waiting_to_start,
92                           ConditionVariable* num_waiting_to_start_cv,
93                           base::WaitableEvent* start)
94      : incrementer_(
95          counter_lock, counter, unique_threads_lock, unique_threads),
96        num_waiting_to_start_lock_(num_waiting_to_start_lock),
97        num_waiting_to_start_(num_waiting_to_start),
98        num_waiting_to_start_cv_(num_waiting_to_start_cv),
99        start_(start) {}
100
101  virtual void Run() {
102    {
103      base::AutoLock num_waiting_to_start_locked(*num_waiting_to_start_lock_);
104      (*num_waiting_to_start_)++;
105    }
106    num_waiting_to_start_cv_->Signal();
107    CHECK(start_->Wait());
108    incrementer_.Run();
109  }
110
111 private:
112  IncrementingTask incrementer_;
113  Lock* num_waiting_to_start_lock_;
114  int* num_waiting_to_start_;
115  ConditionVariable* num_waiting_to_start_cv_;
116  base::WaitableEvent* start_;
117
118  DISALLOW_COPY_AND_ASSIGN(BlockingIncrementingTask);
119};
120
121class PosixDynamicThreadPoolTest : public testing::Test {
122 protected:
123  PosixDynamicThreadPoolTest()
124      : pool_(new base::PosixDynamicThreadPool("dynamic_pool", 60*60)),
125        peer_(pool_.get()),
126        counter_(0),
127        num_waiting_to_start_(0),
128        num_waiting_to_start_cv_(&num_waiting_to_start_lock_),
129        start_(true, false) {}
130
131  virtual void SetUp() {
132    peer_.set_num_idle_threads_cv(new ConditionVariable(peer_.lock()));
133  }
134
135  virtual void TearDown() {
136    // Wake up the idle threads so they can terminate.
137    if (pool_.get()) pool_->Terminate();
138  }
139
140  void WaitForTasksToStart(int num_tasks) {
141    base::AutoLock num_waiting_to_start_locked(num_waiting_to_start_lock_);
142    while (num_waiting_to_start_ < num_tasks) {
143      num_waiting_to_start_cv_.Wait();
144    }
145  }
146
147  void WaitForIdleThreads(int num_idle_threads) {
148    base::AutoLock pool_locked(*peer_.lock());
149    while (peer_.num_idle_threads() < num_idle_threads) {
150      peer_.num_idle_threads_cv()->Wait();
151    }
152  }
153
154  Task* CreateNewIncrementingTask() {
155    return new IncrementingTask(&counter_lock_, &counter_,
156                                &unique_threads_lock_, &unique_threads_);
157  }
158
159  Task* CreateNewBlockingIncrementingTask() {
160    return new BlockingIncrementingTask(
161        &counter_lock_, &counter_, &unique_threads_lock_, &unique_threads_,
162        &num_waiting_to_start_lock_, &num_waiting_to_start_,
163        &num_waiting_to_start_cv_, &start_);
164  }
165
166  scoped_refptr<base::PosixDynamicThreadPool> pool_;
167  base::PosixDynamicThreadPool::PosixDynamicThreadPoolPeer peer_;
168  Lock counter_lock_;
169  int counter_;
170  Lock unique_threads_lock_;
171  std::set<PlatformThreadId> unique_threads_;
172  Lock num_waiting_to_start_lock_;
173  int num_waiting_to_start_;
174  ConditionVariable num_waiting_to_start_cv_;
175  base::WaitableEvent start_;
176};
177
178}  // namespace
179
180TEST_F(PosixDynamicThreadPoolTest, Basic) {
181  EXPECT_EQ(0, peer_.num_idle_threads());
182  EXPECT_EQ(0U, unique_threads_.size());
183  EXPECT_EQ(0U, peer_.tasks().size());
184
185  // Add one task and wait for it to be completed.
186  pool_->PostTask(CreateNewIncrementingTask());
187
188  WaitForIdleThreads(1);
189
190  EXPECT_EQ(1U, unique_threads_.size()) <<
191      "There should be only one thread allocated for one task.";
192  EXPECT_EQ(1, peer_.num_idle_threads());
193  EXPECT_EQ(1, counter_);
194}
195
196TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) {
197  // Add one task and wait for it to be completed.
198  pool_->PostTask(CreateNewIncrementingTask());
199
200  WaitForIdleThreads(1);
201
202  // Add another 2 tasks.  One should reuse the existing worker thread.
203  pool_->PostTask(CreateNewBlockingIncrementingTask());
204  pool_->PostTask(CreateNewBlockingIncrementingTask());
205
206  WaitForTasksToStart(2);
207  start_.Signal();
208  WaitForIdleThreads(2);
209
210  EXPECT_EQ(2U, unique_threads_.size());
211  EXPECT_EQ(2, peer_.num_idle_threads());
212  EXPECT_EQ(3, counter_);
213}
214
215TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) {
216  // Add two blocking tasks.
217  pool_->PostTask(CreateNewBlockingIncrementingTask());
218  pool_->PostTask(CreateNewBlockingIncrementingTask());
219
220  EXPECT_EQ(0, counter_) << "Blocking tasks should not have started yet.";
221
222  WaitForTasksToStart(2);
223  start_.Signal();
224  WaitForIdleThreads(2);
225
226  EXPECT_EQ(2U, unique_threads_.size());
227  EXPECT_EQ(2, peer_.num_idle_threads()) << "Existing threads are now idle.";
228  EXPECT_EQ(2, counter_);
229}
230
231TEST_F(PosixDynamicThreadPoolTest, Complex) {
232  // Add two non blocking tasks and wait for them to finish.
233  pool_->PostTask(CreateNewIncrementingTask());
234
235  WaitForIdleThreads(1);
236
237  // Add two blocking tasks, start them simultaneously, and wait for them to
238  // finish.
239  pool_->PostTask(CreateNewBlockingIncrementingTask());
240  pool_->PostTask(CreateNewBlockingIncrementingTask());
241
242  WaitForTasksToStart(2);
243  start_.Signal();
244  WaitForIdleThreads(2);
245
246  EXPECT_EQ(3, counter_);
247  EXPECT_EQ(2, peer_.num_idle_threads());
248  EXPECT_EQ(2U, unique_threads_.size());
249
250  // Wake up all idle threads so they can exit.
251  {
252    base::AutoLock locked(*peer_.lock());
253    while (peer_.num_idle_threads() > 0) {
254      peer_.tasks_available_cv()->Signal();
255      peer_.num_idle_threads_cv()->Wait();
256    }
257  }
258
259  // Add another non blocking task.  There are no threads to reuse.
260  pool_->PostTask(CreateNewIncrementingTask());
261  WaitForIdleThreads(1);
262
263  EXPECT_EQ(3U, unique_threads_.size());
264  EXPECT_EQ(1, peer_.num_idle_threads());
265  EXPECT_EQ(4, counter_);
266}
267
268}  // namespace base
269