1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "base/threading/worker_pool_posix.h"
6
7#include <set>
8
9#include "base/bind.h"
10#include "base/callback.h"
11#include "base/synchronization/condition_variable.h"
12#include "base/synchronization/lock.h"
13#include "base/threading/platform_thread.h"
14#include "base/synchronization/waitable_event.h"
15#include "testing/gtest/include/gtest/gtest.h"
16
17namespace base {
18
19// Peer class to provide passthrough access to PosixDynamicThreadPool internals.
20class PosixDynamicThreadPool::PosixDynamicThreadPoolPeer {
21 public:
22  explicit PosixDynamicThreadPoolPeer(PosixDynamicThreadPool* pool)
23      : pool_(pool) {}
24
25  Lock* lock() { return &pool_->lock_; }
26  ConditionVariable* pending_tasks_available_cv() {
27    return &pool_->pending_tasks_available_cv_;
28  }
29  const std::queue<PendingTask>& pending_tasks() const {
30    return pool_->pending_tasks_;
31  }
32  int num_idle_threads() const { return pool_->num_idle_threads_; }
33  ConditionVariable* num_idle_threads_cv() {
34    return pool_->num_idle_threads_cv_.get();
35  }
36  void set_num_idle_threads_cv(ConditionVariable* cv) {
37    pool_->num_idle_threads_cv_.reset(cv);
38  }
39
40 private:
41  PosixDynamicThreadPool* pool_;
42
43  DISALLOW_COPY_AND_ASSIGN(PosixDynamicThreadPoolPeer);
44};
45
46namespace {
47
48// IncrementingTask's main purpose is to increment a counter.  It also updates a
49// set of unique thread ids, and signals a ConditionVariable on completion.
50// Note that since it does not block, there is no way to control the number of
51// threads used if more than one IncrementingTask is consecutively posted to the
52// thread pool, since the first one might finish executing before the subsequent
53// PostTask() calls get invoked.
54void IncrementingTask(Lock* counter_lock,
55                      int* counter,
56                      Lock* unique_threads_lock,
57                      std::set<PlatformThreadId>* unique_threads) {
58  {
59    base::AutoLock locked(*unique_threads_lock);
60    unique_threads->insert(PlatformThread::CurrentId());
61  }
62  base::AutoLock locked(*counter_lock);
63  (*counter)++;
64}
65
66// BlockingIncrementingTask is a simple wrapper around IncrementingTask that
67// allows for waiting at the start of Run() for a WaitableEvent to be signalled.
68struct BlockingIncrementingTaskArgs {
69  Lock* counter_lock;
70  int* counter;
71  Lock* unique_threads_lock;
72  std::set<PlatformThreadId>* unique_threads;
73  Lock* num_waiting_to_start_lock;
74  int* num_waiting_to_start;
75  ConditionVariable* num_waiting_to_start_cv;
76  base::WaitableEvent* start;
77};
78
79void BlockingIncrementingTask(const BlockingIncrementingTaskArgs& args) {
80  {
81    base::AutoLock num_waiting_to_start_locked(*args.num_waiting_to_start_lock);
82    (*args.num_waiting_to_start)++;
83  }
84  args.num_waiting_to_start_cv->Signal();
85  args.start->Wait();
86  IncrementingTask(args.counter_lock, args.counter, args.unique_threads_lock,
87                   args.unique_threads);
88}
89
90class PosixDynamicThreadPoolTest : public testing::Test {
91 protected:
92  PosixDynamicThreadPoolTest()
93      : pool_(new base::PosixDynamicThreadPool("dynamic_pool", 60*60)),
94        peer_(pool_.get()),
95        counter_(0),
96        num_waiting_to_start_(0),
97        num_waiting_to_start_cv_(&num_waiting_to_start_lock_),
98        start_(true, false) {}
99
100  virtual void SetUp() OVERRIDE {
101    peer_.set_num_idle_threads_cv(new ConditionVariable(peer_.lock()));
102  }
103
104  virtual void TearDown() OVERRIDE {
105    // Wake up the idle threads so they can terminate.
106    if (pool_.get()) pool_->Terminate();
107  }
108
109  void WaitForTasksToStart(int num_tasks) {
110    base::AutoLock num_waiting_to_start_locked(num_waiting_to_start_lock_);
111    while (num_waiting_to_start_ < num_tasks) {
112      num_waiting_to_start_cv_.Wait();
113    }
114  }
115
116  void WaitForIdleThreads(int num_idle_threads) {
117    base::AutoLock pool_locked(*peer_.lock());
118    while (peer_.num_idle_threads() < num_idle_threads) {
119      peer_.num_idle_threads_cv()->Wait();
120    }
121  }
122
123  base::Closure CreateNewIncrementingTaskCallback() {
124    return base::Bind(&IncrementingTask, &counter_lock_, &counter_,
125                      &unique_threads_lock_, &unique_threads_);
126  }
127
128  base::Closure CreateNewBlockingIncrementingTaskCallback() {
129    BlockingIncrementingTaskArgs args = {
130        &counter_lock_, &counter_, &unique_threads_lock_, &unique_threads_,
131        &num_waiting_to_start_lock_, &num_waiting_to_start_,
132        &num_waiting_to_start_cv_, &start_
133    };
134    return base::Bind(&BlockingIncrementingTask, args);
135  }
136
137  scoped_refptr<base::PosixDynamicThreadPool> pool_;
138  base::PosixDynamicThreadPool::PosixDynamicThreadPoolPeer peer_;
139  Lock counter_lock_;
140  int counter_;
141  Lock unique_threads_lock_;
142  std::set<PlatformThreadId> unique_threads_;
143  Lock num_waiting_to_start_lock_;
144  int num_waiting_to_start_;
145  ConditionVariable num_waiting_to_start_cv_;
146  base::WaitableEvent start_;
147};
148
149}  // namespace
150
151TEST_F(PosixDynamicThreadPoolTest, Basic) {
152  EXPECT_EQ(0, peer_.num_idle_threads());
153  EXPECT_EQ(0U, unique_threads_.size());
154  EXPECT_EQ(0U, peer_.pending_tasks().size());
155
156  // Add one task and wait for it to be completed.
157  pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
158
159  WaitForIdleThreads(1);
160
161  EXPECT_EQ(1U, unique_threads_.size()) <<
162      "There should be only one thread allocated for one task.";
163  EXPECT_EQ(1, counter_);
164}
165
166TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) {
167  // Add one task and wait for it to be completed.
168  pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
169
170  WaitForIdleThreads(1);
171
172  // Add another 2 tasks.  One should reuse the existing worker thread.
173  pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
174  pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
175
176  WaitForTasksToStart(2);
177  start_.Signal();
178  WaitForIdleThreads(2);
179
180  EXPECT_EQ(2U, unique_threads_.size());
181  EXPECT_EQ(2, peer_.num_idle_threads());
182  EXPECT_EQ(3, counter_);
183}
184
185TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) {
186  // Add two blocking tasks.
187  pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
188  pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
189
190  EXPECT_EQ(0, counter_) << "Blocking tasks should not have started yet.";
191
192  WaitForTasksToStart(2);
193  start_.Signal();
194  WaitForIdleThreads(2);
195
196  EXPECT_EQ(2U, unique_threads_.size());
197  EXPECT_EQ(2, peer_.num_idle_threads()) << "Existing threads are now idle.";
198  EXPECT_EQ(2, counter_);
199}
200
201TEST_F(PosixDynamicThreadPoolTest, Complex) {
202  // Add two non blocking tasks and wait for them to finish.
203  pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
204
205  WaitForIdleThreads(1);
206
207  // Add two blocking tasks, start them simultaneously, and wait for them to
208  // finish.
209  pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
210  pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
211
212  WaitForTasksToStart(2);
213  start_.Signal();
214  WaitForIdleThreads(2);
215
216  EXPECT_EQ(3, counter_);
217  EXPECT_EQ(2, peer_.num_idle_threads());
218  EXPECT_EQ(2U, unique_threads_.size());
219
220  // Wake up all idle threads so they can exit.
221  {
222    base::AutoLock locked(*peer_.lock());
223    while (peer_.num_idle_threads() > 0) {
224      peer_.pending_tasks_available_cv()->Signal();
225      peer_.num_idle_threads_cv()->Wait();
226    }
227  }
228
229  // Add another non blocking task.  There are no threads to reuse.
230  pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
231  WaitForIdleThreads(1);
232
233  // The POSIX implementation of PlatformThread::CurrentId() uses pthread_self()
234  // which is not guaranteed to be unique after a thread joins. The OS X
235  // implemntation of pthread_self() returns the address of the pthread_t, which
236  // is merely a malloc()ed pointer stored in the first TLS slot. When a thread
237  // joins and that structure is freed, the block of memory can be put on the
238  // OS free list, meaning the same address could be reused in a subsequent
239  // allocation. This in fact happens when allocating in a loop as this test
240  // does.
241  //
242  // Because there are two concurrent threads, there's at least the guarantee
243  // of having two unique thread IDs in the set. But after those two threads are
244  // joined, the next-created thread can get a re-used ID if the allocation of
245  // the pthread_t structure is taken from the free list. Therefore, there can
246  // be either 2 or 3 unique thread IDs in the set at this stage in the test.
247  EXPECT_TRUE(unique_threads_.size() >= 2 && unique_threads_.size() <= 3)
248      << "unique_threads_.size() = " << unique_threads_.size();
249  EXPECT_EQ(1, peer_.num_idle_threads());
250  EXPECT_EQ(4, counter_);
251}
252
253}  // namespace base
254