1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "base/threading/sequenced_worker_pool.h"
6
7#include <algorithm>
8
9#include "base/bind.h"
10#include "base/compiler_specific.h"
11#include "base/memory/ref_counted.h"
12#include "base/memory/scoped_ptr.h"
13#include "base/message_loop/message_loop.h"
14#include "base/message_loop/message_loop_proxy.h"
15#include "base/synchronization/condition_variable.h"
16#include "base/synchronization/lock.h"
17#include "base/test/sequenced_task_runner_test_template.h"
18#include "base/test/sequenced_worker_pool_owner.h"
19#include "base/test/task_runner_test_template.h"
20#include "base/test/test_timeouts.h"
21#include "base/threading/platform_thread.h"
22#include "base/time/time.h"
23#include "base/tracked_objects.h"
24#include "testing/gtest/include/gtest/gtest.h"
25
26namespace base {
27
28// IMPORTANT NOTE:
29//
30// Many of these tests have failure modes where they'll hang forever. These
31// tests should not be flaky, and hanging indicates a type of failure. Do not
32// mark as flaky if they're hanging, it's likely an actual bug.
33
34namespace {
35
36const size_t kNumWorkerThreads = 3;
37
38// Allows a number of threads to all be blocked on the same event, and
39// provides a way to unblock a certain number of them.
40class ThreadBlocker {
41 public:
42  ThreadBlocker() : lock_(), cond_var_(&lock_), unblock_counter_(0) {}
43
44  void Block() {
45    {
46      base::AutoLock lock(lock_);
47      while (unblock_counter_ == 0)
48        cond_var_.Wait();
49      unblock_counter_--;
50    }
51    cond_var_.Signal();
52  }
53
54  void Unblock(size_t count) {
55    {
56      base::AutoLock lock(lock_);
57      DCHECK(unblock_counter_ == 0);
58      unblock_counter_ = count;
59    }
60    cond_var_.Signal();
61  }
62
63 private:
64  base::Lock lock_;
65  base::ConditionVariable cond_var_;
66
67  size_t unblock_counter_;
68};
69
70class TestTracker : public base::RefCountedThreadSafe<TestTracker> {
71 public:
72  TestTracker()
73      : lock_(),
74        cond_var_(&lock_),
75        started_events_(0) {
76  }
77
78  // Each of these tasks appends the argument to the complete sequence vector
79  // so calling code can see what order they finished in.
80  void FastTask(int id) {
81    SignalWorkerDone(id);
82  }
83
84  void SlowTask(int id) {
85    base::PlatformThread::Sleep(base::TimeDelta::FromSeconds(1));
86    SignalWorkerDone(id);
87  }
88
89  void BlockTask(int id, ThreadBlocker* blocker) {
90    // Note that this task has started and signal anybody waiting for that
91    // to happen.
92    {
93      base::AutoLock lock(lock_);
94      started_events_++;
95    }
96    cond_var_.Signal();
97
98    blocker->Block();
99    SignalWorkerDone(id);
100  }
101
102  void PostAdditionalTasks(
103        int id, SequencedWorkerPool* pool,
104        bool expected_return_value) {
105    Closure fast_task = base::Bind(&TestTracker::FastTask, this, 100);
106    EXPECT_EQ(expected_return_value,
107              pool->PostWorkerTaskWithShutdownBehavior(
108                  FROM_HERE, fast_task,
109                  SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
110    EXPECT_EQ(expected_return_value,
111              pool->PostWorkerTaskWithShutdownBehavior(
112                  FROM_HERE, fast_task,
113                  SequencedWorkerPool::SKIP_ON_SHUTDOWN));
114    pool->PostWorkerTaskWithShutdownBehavior(
115        FROM_HERE, fast_task,
116        SequencedWorkerPool::BLOCK_SHUTDOWN);
117    SignalWorkerDone(id);
118  }
119
120  // Waits until the given number of tasks have started executing.
121  void WaitUntilTasksBlocked(size_t count) {
122    {
123      base::AutoLock lock(lock_);
124      while (started_events_ < count)
125        cond_var_.Wait();
126    }
127    cond_var_.Signal();
128  }
129
130  // Blocks the current thread until at least the given number of tasks are in
131  // the completed vector, and then returns a copy.
132  std::vector<int> WaitUntilTasksComplete(size_t num_tasks) {
133    std::vector<int> ret;
134    {
135      base::AutoLock lock(lock_);
136      while (complete_sequence_.size() < num_tasks)
137        cond_var_.Wait();
138      ret = complete_sequence_;
139    }
140    cond_var_.Signal();
141    return ret;
142  }
143
144  size_t GetTasksCompletedCount() {
145    base::AutoLock lock(lock_);
146    return complete_sequence_.size();
147  }
148
149  void ClearCompleteSequence() {
150    base::AutoLock lock(lock_);
151    complete_sequence_.clear();
152    started_events_ = 0;
153  }
154
155 private:
156  friend class base::RefCountedThreadSafe<TestTracker>;
157  ~TestTracker() {}
158
159  void SignalWorkerDone(int id) {
160    {
161      base::AutoLock lock(lock_);
162      complete_sequence_.push_back(id);
163    }
164    cond_var_.Signal();
165  }
166
167  // Protects the complete_sequence.
168  base::Lock lock_;
169
170  base::ConditionVariable cond_var_;
171
172  // Protected by lock_.
173  std::vector<int> complete_sequence_;
174
175  // Counter of the number of "block" workers that have started.
176  size_t started_events_;
177};
178
179class SequencedWorkerPoolTest : public testing::Test {
180 public:
181  SequencedWorkerPoolTest()
182      : tracker_(new TestTracker) {
183    ResetPool();
184  }
185
186  virtual ~SequencedWorkerPoolTest() {}
187
188  virtual void SetUp() OVERRIDE {}
189
190  virtual void TearDown() OVERRIDE {
191    pool()->Shutdown();
192  }
193
194  const scoped_refptr<SequencedWorkerPool>& pool() {
195    return pool_owner_->pool();
196  }
197  TestTracker* tracker() { return tracker_.get(); }
198
199  // Destroys the SequencedWorkerPool instance, blocking until it is fully shut
200  // down, and creates a new instance.
201  void ResetPool() {
202    pool_owner_.reset(new SequencedWorkerPoolOwner(kNumWorkerThreads, "test"));
203  }
204
205  void SetWillWaitForShutdownCallback(const Closure& callback) {
206    pool_owner_->SetWillWaitForShutdownCallback(callback);
207  }
208
209  // Ensures that the given number of worker threads is created by adding
210  // tasks and waiting until they complete. Worker thread creation is
211  // serialized, can happen on background threads asynchronously, and doesn't
212  // happen any more at shutdown. This means that if a test posts a bunch of
213  // tasks and calls shutdown, fewer workers will be created than the test may
214  // expect.
215  //
216  // This function ensures that this condition can't happen so tests can make
217  // assumptions about the number of workers active. See the comment in
218  // PrepareToStartAdditionalThreadIfNecessary in the .cc file for more
219  // details.
220  //
221  // It will post tasks to the queue with id -1. It also assumes this is the
222  // first thing called in a test since it will clear the complete_sequence_.
223  void EnsureAllWorkersCreated() {
224    // Create a bunch of threads, all waiting. This will cause that may
225    // workers to be created.
226    ThreadBlocker blocker;
227    for (size_t i = 0; i < kNumWorkerThreads; i++) {
228      pool()->PostWorkerTask(FROM_HERE,
229                             base::Bind(&TestTracker::BlockTask,
230                                        tracker(), -1, &blocker));
231    }
232    tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
233
234    // Now wake them up and wait until they're done.
235    blocker.Unblock(kNumWorkerThreads);
236    tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
237
238    // Clean up the task IDs we added.
239    tracker()->ClearCompleteSequence();
240  }
241
242  int has_work_call_count() const {
243    return pool_owner_->has_work_call_count();
244  }
245
246 private:
247  MessageLoop message_loop_;
248  scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
249  const scoped_refptr<TestTracker> tracker_;
250};
251
252// Checks that the given number of entries are in the tasks to complete of
253// the given tracker, and then signals the given event the given number of
254// times. This is used to wakt up blocked background threads before blocking
255// on shutdown.
256void EnsureTasksToCompleteCountAndUnblock(scoped_refptr<TestTracker> tracker,
257                                          size_t expected_tasks_to_complete,
258                                          ThreadBlocker* blocker,
259                                          size_t threads_to_awake) {
260  EXPECT_EQ(
261      expected_tasks_to_complete,
262      tracker->WaitUntilTasksComplete(expected_tasks_to_complete).size());
263
264  blocker->Unblock(threads_to_awake);
265}
266
267class DeletionHelper : public base::RefCountedThreadSafe<DeletionHelper> {
268 public:
269  explicit DeletionHelper(
270      const scoped_refptr<base::RefCountedData<bool> >& deleted_flag)
271      : deleted_flag_(deleted_flag) {
272  }
273
274 private:
275  friend class base::RefCountedThreadSafe<DeletionHelper>;
276  virtual ~DeletionHelper() { deleted_flag_->data = true; }
277
278  const scoped_refptr<base::RefCountedData<bool> > deleted_flag_;
279  DISALLOW_COPY_AND_ASSIGN(DeletionHelper);
280};
281
282void HoldPoolReference(const scoped_refptr<base::SequencedWorkerPool>& pool,
283                       const scoped_refptr<DeletionHelper>& helper) {
284  ADD_FAILURE() << "Should never run";
285}
286
287// Tests that delayed tasks are deleted upon shutdown of the pool.
288TEST_F(SequencedWorkerPoolTest, DelayedTaskDuringShutdown) {
289  // Post something to verify the pool is started up.
290  EXPECT_TRUE(pool()->PostTask(
291      FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 1)));
292
293  scoped_refptr<base::RefCountedData<bool> > deleted_flag(
294      new base::RefCountedData<bool>(false));
295
296  base::Time posted_at(base::Time::Now());
297  // Post something that shouldn't run.
298  EXPECT_TRUE(pool()->PostDelayedTask(
299      FROM_HERE,
300      base::Bind(&HoldPoolReference,
301                 pool(),
302                 make_scoped_refptr(new DeletionHelper(deleted_flag))),
303      TestTimeouts::action_timeout()));
304
305  std::vector<int> completion_sequence = tracker()->WaitUntilTasksComplete(1);
306  ASSERT_EQ(1u, completion_sequence.size());
307  ASSERT_EQ(1, completion_sequence[0]);
308
309  pool()->Shutdown();
310  // Shutdown is asynchronous, so use ResetPool() to block until the pool is
311  // fully destroyed (and thus shut down).
312  ResetPool();
313
314  // Verify that we didn't block until the task was due.
315  ASSERT_LT(base::Time::Now() - posted_at, TestTimeouts::action_timeout());
316
317  // Verify that the deferred task has not only not run, but has also been
318  // destroyed.
319  ASSERT_TRUE(deleted_flag->data);
320}
321
322// Tests that same-named tokens have the same ID.
323TEST_F(SequencedWorkerPoolTest, NamedTokens) {
324  const std::string name1("hello");
325  SequencedWorkerPool::SequenceToken token1 =
326      pool()->GetNamedSequenceToken(name1);
327
328  SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
329
330  const std::string name3("goodbye");
331  SequencedWorkerPool::SequenceToken token3 =
332      pool()->GetNamedSequenceToken(name3);
333
334  // All 3 tokens should be different.
335  EXPECT_FALSE(token1.Equals(token2));
336  EXPECT_FALSE(token1.Equals(token3));
337  EXPECT_FALSE(token2.Equals(token3));
338
339  // Requesting the same name again should give the same value.
340  SequencedWorkerPool::SequenceToken token1again =
341      pool()->GetNamedSequenceToken(name1);
342  EXPECT_TRUE(token1.Equals(token1again));
343
344  SequencedWorkerPool::SequenceToken token3again =
345      pool()->GetNamedSequenceToken(name3);
346  EXPECT_TRUE(token3.Equals(token3again));
347}
348
349// Tests that posting a bunch of tasks (many more than the number of worker
350// threads) runs them all.
351TEST_F(SequencedWorkerPoolTest, LotsOfTasks) {
352  pool()->PostWorkerTask(FROM_HERE,
353                         base::Bind(&TestTracker::SlowTask, tracker(), 0));
354
355  const size_t kNumTasks = 20;
356  for (size_t i = 1; i < kNumTasks; i++) {
357    pool()->PostWorkerTask(FROM_HERE,
358                           base::Bind(&TestTracker::FastTask, tracker(), i));
359  }
360
361  std::vector<int> result = tracker()->WaitUntilTasksComplete(kNumTasks);
362  EXPECT_EQ(kNumTasks, result.size());
363}
364
365// Tests that posting a bunch of tasks (many more than the number of
366// worker threads) to two pools simultaneously runs them all twice.
367// This test is meant to shake out any concurrency issues between
368// pools (like histograms).
369TEST_F(SequencedWorkerPoolTest, LotsOfTasksTwoPools) {
370  SequencedWorkerPoolOwner pool1(kNumWorkerThreads, "test1");
371  SequencedWorkerPoolOwner pool2(kNumWorkerThreads, "test2");
372
373  base::Closure slow_task = base::Bind(&TestTracker::SlowTask, tracker(), 0);
374  pool1.pool()->PostWorkerTask(FROM_HERE, slow_task);
375  pool2.pool()->PostWorkerTask(FROM_HERE, slow_task);
376
377  const size_t kNumTasks = 20;
378  for (size_t i = 1; i < kNumTasks; i++) {
379    base::Closure fast_task =
380        base::Bind(&TestTracker::FastTask, tracker(), i);
381    pool1.pool()->PostWorkerTask(FROM_HERE, fast_task);
382    pool2.pool()->PostWorkerTask(FROM_HERE, fast_task);
383  }
384
385  std::vector<int> result =
386      tracker()->WaitUntilTasksComplete(2*kNumTasks);
387  EXPECT_EQ(2 * kNumTasks, result.size());
388
389  pool2.pool()->Shutdown();
390  pool1.pool()->Shutdown();
391}
392
393// Test that tasks with the same sequence token are executed in order but don't
394// affect other tasks.
395TEST_F(SequencedWorkerPoolTest, Sequence) {
396  // Fill all the worker threads except one.
397  const size_t kNumBackgroundTasks = kNumWorkerThreads - 1;
398  ThreadBlocker background_blocker;
399  for (size_t i = 0; i < kNumBackgroundTasks; i++) {
400    pool()->PostWorkerTask(FROM_HERE,
401                           base::Bind(&TestTracker::BlockTask,
402                                      tracker(), i, &background_blocker));
403  }
404  tracker()->WaitUntilTasksBlocked(kNumBackgroundTasks);
405
406  // Create two tasks with the same sequence token, one that will block on the
407  // event, and one which will just complete quickly when it's run. Since there
408  // is one worker thread free, the first task will start and then block, and
409  // the second task should be waiting.
410  ThreadBlocker blocker;
411  SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
412  pool()->PostSequencedWorkerTask(
413      token1, FROM_HERE,
414      base::Bind(&TestTracker::BlockTask, tracker(), 100, &blocker));
415  pool()->PostSequencedWorkerTask(
416      token1, FROM_HERE,
417      base::Bind(&TestTracker::FastTask, tracker(), 101));
418  EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
419
420  // Create another two tasks as above with a different token. These will be
421  // blocked since there are no slots to run.
422  SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
423  pool()->PostSequencedWorkerTask(
424      token2, FROM_HERE,
425      base::Bind(&TestTracker::FastTask, tracker(), 200));
426  pool()->PostSequencedWorkerTask(
427      token2, FROM_HERE,
428      base::Bind(&TestTracker::FastTask, tracker(), 201));
429  EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
430
431  // Let one background task complete. This should then let both tasks of
432  // token2 run to completion in order. The second task of token1 should still
433  // be blocked.
434  background_blocker.Unblock(1);
435  std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
436  ASSERT_EQ(3u, result.size());
437  EXPECT_EQ(200, result[1]);
438  EXPECT_EQ(201, result[2]);
439
440  // Finish the rest of the background tasks. This should leave some workers
441  // free with the second token1 task still blocked on the first.
442  background_blocker.Unblock(kNumBackgroundTasks - 1);
443  EXPECT_EQ(kNumBackgroundTasks + 2,
444            tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 2).size());
445
446  // Allow the first task of token1 to complete. This should run the second.
447  blocker.Unblock(1);
448  result = tracker()->WaitUntilTasksComplete(kNumBackgroundTasks + 4);
449  ASSERT_EQ(kNumBackgroundTasks + 4, result.size());
450  EXPECT_EQ(100, result[result.size() - 2]);
451  EXPECT_EQ(101, result[result.size() - 1]);
452}
453
454// Tests that any tasks posted after Shutdown are ignored.
455// Disabled for flakiness.  See http://crbug.com/166451.
456TEST_F(SequencedWorkerPoolTest, DISABLED_IgnoresAfterShutdown) {
457  // Start tasks to take all the threads and block them.
458  EnsureAllWorkersCreated();
459  ThreadBlocker blocker;
460  for (size_t i = 0; i < kNumWorkerThreads; i++) {
461    pool()->PostWorkerTask(FROM_HERE,
462                           base::Bind(&TestTracker::BlockTask,
463                                      tracker(), i, &blocker));
464  }
465  tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
466
467  SetWillWaitForShutdownCallback(
468      base::Bind(&EnsureTasksToCompleteCountAndUnblock,
469                 scoped_refptr<TestTracker>(tracker()), 0,
470                 &blocker, kNumWorkerThreads));
471
472  // Shutdown the worker pool. This should discard all non-blocking tasks.
473  const int kMaxNewBlockingTasksAfterShutdown = 100;
474  pool()->Shutdown(kMaxNewBlockingTasksAfterShutdown);
475
476  int old_has_work_call_count = has_work_call_count();
477
478  std::vector<int> result =
479      tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
480
481  // The kNumWorkerThread items should have completed, in no particular order.
482  ASSERT_EQ(kNumWorkerThreads, result.size());
483  for (size_t i = 0; i < kNumWorkerThreads; i++) {
484    EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
485                result.end());
486  }
487
488  // No further tasks, regardless of shutdown mode, should be allowed.
489  EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
490      FROM_HERE,
491      base::Bind(&TestTracker::FastTask, tracker(), 100),
492      SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
493  EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
494      FROM_HERE,
495      base::Bind(&TestTracker::FastTask, tracker(), 101),
496      SequencedWorkerPool::SKIP_ON_SHUTDOWN));
497  EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
498      FROM_HERE,
499      base::Bind(&TestTracker::FastTask, tracker(), 102),
500      SequencedWorkerPool::BLOCK_SHUTDOWN));
501
502  ASSERT_EQ(old_has_work_call_count, has_work_call_count());
503}
504
505TEST_F(SequencedWorkerPoolTest, AllowsAfterShutdown) {
506  // Test that <n> new blocking tasks are allowed provided they're posted
507  // by a running tasks.
508  EnsureAllWorkersCreated();
509  ThreadBlocker blocker;
510
511  // Start tasks to take all the threads and block them.
512  const int kNumBlockTasks = static_cast<int>(kNumWorkerThreads);
513  for (int i = 0; i < kNumBlockTasks; ++i) {
514    EXPECT_TRUE(pool()->PostWorkerTask(
515        FROM_HERE,
516        base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker)));
517  }
518  tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
519
520  // Queue up shutdown blocking tasks behind those which will attempt to post
521  // additional tasks when run, PostAdditionalTasks attemtps to post 3
522  // new FastTasks, one for each shutdown_behavior.
523  const int kNumQueuedTasks = static_cast<int>(kNumWorkerThreads);
524  for (int i = 0; i < kNumQueuedTasks; ++i) {
525    EXPECT_TRUE(pool()->PostWorkerTaskWithShutdownBehavior(
526        FROM_HERE,
527        base::Bind(&TestTracker::PostAdditionalTasks, tracker(), i, pool(),
528                   false),
529        SequencedWorkerPool::BLOCK_SHUTDOWN));
530  }
531
532  // Setup to open the floodgates from within Shutdown().
533  SetWillWaitForShutdownCallback(
534      base::Bind(&EnsureTasksToCompleteCountAndUnblock,
535                 scoped_refptr<TestTracker>(tracker()),
536                 0, &blocker, kNumBlockTasks));
537
538  // Allow half of the additional blocking tasks thru.
539  const int kNumNewBlockingTasksToAllow = kNumWorkerThreads / 2;
540  pool()->Shutdown(kNumNewBlockingTasksToAllow);
541
542  // Ensure that the correct number of tasks actually got run.
543  tracker()->WaitUntilTasksComplete(static_cast<size_t>(
544      kNumBlockTasks + kNumQueuedTasks + kNumNewBlockingTasksToAllow));
545
546  // Clean up the task IDs we added and go home.
547  tracker()->ClearCompleteSequence();
548}
549
550// Tests that unrun tasks are discarded properly according to their shutdown
551// mode.
552TEST_F(SequencedWorkerPoolTest, DiscardOnShutdown) {
553  // Start tasks to take all the threads and block them.
554  EnsureAllWorkersCreated();
555  ThreadBlocker blocker;
556  for (size_t i = 0; i < kNumWorkerThreads; i++) {
557    pool()->PostWorkerTask(FROM_HERE,
558                           base::Bind(&TestTracker::BlockTask,
559                                      tracker(), i, &blocker));
560  }
561  tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
562
563  // Create some tasks with different shutdown modes.
564  pool()->PostWorkerTaskWithShutdownBehavior(
565      FROM_HERE,
566      base::Bind(&TestTracker::FastTask, tracker(), 100),
567      SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
568  pool()->PostWorkerTaskWithShutdownBehavior(
569      FROM_HERE,
570      base::Bind(&TestTracker::FastTask, tracker(), 101),
571      SequencedWorkerPool::SKIP_ON_SHUTDOWN);
572  pool()->PostWorkerTaskWithShutdownBehavior(
573      FROM_HERE,
574      base::Bind(&TestTracker::FastTask, tracker(), 102),
575      SequencedWorkerPool::BLOCK_SHUTDOWN);
576
577  // Shutdown the worker pool. This should discard all non-blocking tasks.
578  SetWillWaitForShutdownCallback(
579      base::Bind(&EnsureTasksToCompleteCountAndUnblock,
580                 scoped_refptr<TestTracker>(tracker()), 0,
581                 &blocker, kNumWorkerThreads));
582  pool()->Shutdown();
583
584  std::vector<int> result =
585      tracker()->WaitUntilTasksComplete(kNumWorkerThreads + 1);
586
587  // The kNumWorkerThread items should have completed, plus the BLOCK_SHUTDOWN
588  // one, in no particular order.
589  ASSERT_EQ(kNumWorkerThreads + 1, result.size());
590  for (size_t i = 0; i < kNumWorkerThreads; i++) {
591    EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
592                result.end());
593  }
594  EXPECT_TRUE(std::find(result.begin(), result.end(), 102) != result.end());
595}
596
597// Tests that CONTINUE_ON_SHUTDOWN tasks don't block shutdown.
598TEST_F(SequencedWorkerPoolTest, ContinueOnShutdown) {
599  scoped_refptr<TaskRunner> runner(pool()->GetTaskRunnerWithShutdownBehavior(
600      SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
601  scoped_refptr<SequencedTaskRunner> sequenced_runner(
602      pool()->GetSequencedTaskRunnerWithShutdownBehavior(
603          pool()->GetSequenceToken(),
604          SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
605  EnsureAllWorkersCreated();
606  ThreadBlocker blocker;
607  pool()->PostWorkerTaskWithShutdownBehavior(
608      FROM_HERE,
609      base::Bind(&TestTracker::BlockTask,
610                 tracker(), 0, &blocker),
611      SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
612  runner->PostTask(
613      FROM_HERE,
614      base::Bind(&TestTracker::BlockTask,
615                 tracker(), 1, &blocker));
616  sequenced_runner->PostTask(
617      FROM_HERE,
618      base::Bind(&TestTracker::BlockTask,
619                 tracker(), 2, &blocker));
620
621  tracker()->WaitUntilTasksBlocked(3);
622
623  // This should not block. If this test hangs, it means it failed.
624  pool()->Shutdown();
625
626  // The task should not have completed yet.
627  EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
628
629  // Posting more tasks should fail.
630  EXPECT_FALSE(pool()->PostWorkerTaskWithShutdownBehavior(
631      FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0),
632      SequencedWorkerPool::CONTINUE_ON_SHUTDOWN));
633  EXPECT_FALSE(runner->PostTask(
634      FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));
635  EXPECT_FALSE(sequenced_runner->PostTask(
636      FROM_HERE, base::Bind(&TestTracker::FastTask, tracker(), 0)));
637
638  // Continue the background thread and make sure the tasks can complete.
639  blocker.Unblock(3);
640  std::vector<int> result = tracker()->WaitUntilTasksComplete(3);
641  EXPECT_EQ(3u, result.size());
642}
643
644// Tests that SKIP_ON_SHUTDOWN tasks that have been started block Shutdown
645// until they stop, but tasks not yet started do not.
646TEST_F(SequencedWorkerPoolTest, SkipOnShutdown) {
647  // Start tasks to take all the threads and block them.
648  EnsureAllWorkersCreated();
649  ThreadBlocker blocker;
650
651  // Now block all the threads with SKIP_ON_SHUTDOWN. Shutdown() should not
652  // return until these tasks have completed.
653  for (size_t i = 0; i < kNumWorkerThreads; i++) {
654    pool()->PostWorkerTaskWithShutdownBehavior(
655        FROM_HERE,
656        base::Bind(&TestTracker::BlockTask, tracker(), i, &blocker),
657        SequencedWorkerPool::SKIP_ON_SHUTDOWN);
658  }
659  tracker()->WaitUntilTasksBlocked(kNumWorkerThreads);
660
661  // Now post an additional task as SKIP_ON_SHUTDOWN, which should not be
662  // executed once Shutdown() has been called.
663  pool()->PostWorkerTaskWithShutdownBehavior(
664      FROM_HERE,
665      base::Bind(&TestTracker::BlockTask,
666                 tracker(), 0, &blocker),
667      SequencedWorkerPool::SKIP_ON_SHUTDOWN);
668
669  // This callback will only be invoked if SKIP_ON_SHUTDOWN tasks that have
670  // been started block shutdown.
671  SetWillWaitForShutdownCallback(
672      base::Bind(&EnsureTasksToCompleteCountAndUnblock,
673                 scoped_refptr<TestTracker>(tracker()), 0,
674                 &blocker, kNumWorkerThreads));
675
676  // No tasks should have completed yet.
677  EXPECT_EQ(0u, tracker()->WaitUntilTasksComplete(0).size());
678
679  // This should not block. If this test hangs, it means it failed.
680  pool()->Shutdown();
681
682  // Shutdown should not return until all of the tasks have completed.
683  std::vector<int> result =
684      tracker()->WaitUntilTasksComplete(kNumWorkerThreads);
685
686  // Only tasks marked SKIP_ON_SHUTDOWN that were already started should be
687  // allowed to complete. No additional non-blocking tasks should have been
688  // started.
689  ASSERT_EQ(kNumWorkerThreads, result.size());
690  for (size_t i = 0; i < kNumWorkerThreads; i++) {
691    EXPECT_TRUE(std::find(result.begin(), result.end(), static_cast<int>(i)) !=
692                result.end());
693  }
694}
695
696// Ensure all worker threads are created, and then trigger a spurious
697// work signal. This shouldn't cause any other work signals to be
698// triggered. This is a regression test for http://crbug.com/117469.
699TEST_F(SequencedWorkerPoolTest, SpuriousWorkSignal) {
700  EnsureAllWorkersCreated();
701  int old_has_work_call_count = has_work_call_count();
702  pool()->SignalHasWorkForTesting();
703  // This is inherently racy, but can only produce false positives.
704  base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100));
705  EXPECT_EQ(old_has_work_call_count + 1, has_work_call_count());
706}
707
708void IsRunningOnCurrentThreadTask(
709    SequencedWorkerPool::SequenceToken test_positive_token,
710    SequencedWorkerPool::SequenceToken test_negative_token,
711    SequencedWorkerPool* pool,
712    SequencedWorkerPool* unused_pool) {
713  EXPECT_TRUE(pool->RunsTasksOnCurrentThread());
714  EXPECT_TRUE(pool->IsRunningSequenceOnCurrentThread(test_positive_token));
715  EXPECT_FALSE(pool->IsRunningSequenceOnCurrentThread(test_negative_token));
716  EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
717  EXPECT_FALSE(
718      unused_pool->IsRunningSequenceOnCurrentThread(test_positive_token));
719  EXPECT_FALSE(
720      unused_pool->IsRunningSequenceOnCurrentThread(test_negative_token));
721}
722
723// Verify correctness of the IsRunningSequenceOnCurrentThread method.
724TEST_F(SequencedWorkerPoolTest, IsRunningOnCurrentThread) {
725  SequencedWorkerPool::SequenceToken token1 = pool()->GetSequenceToken();
726  SequencedWorkerPool::SequenceToken token2 = pool()->GetSequenceToken();
727  SequencedWorkerPool::SequenceToken unsequenced_token;
728
729  scoped_refptr<SequencedWorkerPool> unused_pool =
730      new SequencedWorkerPool(2, "unused_pool");
731
732  EXPECT_FALSE(pool()->RunsTasksOnCurrentThread());
733  EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token1));
734  EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(token2));
735  EXPECT_FALSE(pool()->IsRunningSequenceOnCurrentThread(unsequenced_token));
736  EXPECT_FALSE(unused_pool->RunsTasksOnCurrentThread());
737  EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token1));
738  EXPECT_FALSE(unused_pool->IsRunningSequenceOnCurrentThread(token2));
739  EXPECT_FALSE(
740      unused_pool->IsRunningSequenceOnCurrentThread(unsequenced_token));
741
742  pool()->PostSequencedWorkerTask(
743      token1, FROM_HERE,
744      base::Bind(&IsRunningOnCurrentThreadTask,
745                 token1, token2, pool(), unused_pool));
746  pool()->PostSequencedWorkerTask(
747      token2, FROM_HERE,
748      base::Bind(&IsRunningOnCurrentThreadTask,
749                 token2, unsequenced_token, pool(), unused_pool));
750  pool()->PostWorkerTask(
751      FROM_HERE,
752      base::Bind(&IsRunningOnCurrentThreadTask,
753                 unsequenced_token, token1, pool(), unused_pool));
754  pool()->Shutdown();
755  unused_pool->Shutdown();
756}
757
758// Verify that FlushForTesting works as intended.
759TEST_F(SequencedWorkerPoolTest, FlushForTesting) {
760  // Should be fine to call on a new instance.
761  pool()->FlushForTesting();
762
763  // Queue up a bunch of work, including  a long delayed task and
764  // a task that produces additional tasks as an artifact.
765  pool()->PostDelayedWorkerTask(
766      FROM_HERE,
767      base::Bind(&TestTracker::FastTask, tracker(), 0),
768      TimeDelta::FromMinutes(5));
769  pool()->PostWorkerTask(FROM_HERE,
770                         base::Bind(&TestTracker::SlowTask, tracker(), 0));
771  const size_t kNumFastTasks = 20;
772  for (size_t i = 0; i < kNumFastTasks; i++) {
773    pool()->PostWorkerTask(FROM_HERE,
774                           base::Bind(&TestTracker::FastTask, tracker(), 0));
775  }
776  pool()->PostWorkerTask(
777      FROM_HERE,
778      base::Bind(&TestTracker::PostAdditionalTasks, tracker(), 0, pool(),
779                 true));
780
781  // We expect all except the delayed task to have been run. We verify all
782  // closures have been deleted by looking at the refcount of the
783  // tracker.
784  EXPECT_FALSE(tracker()->HasOneRef());
785  pool()->FlushForTesting();
786  EXPECT_TRUE(tracker()->HasOneRef());
787  EXPECT_EQ(1 + kNumFastTasks + 1 + 3, tracker()->GetTasksCompletedCount());
788
789  // Should be fine to call on an idle instance with all threads created, and
790  // spamming the method shouldn't deadlock or confuse the class.
791  pool()->FlushForTesting();
792  pool()->FlushForTesting();
793
794  // Should be fine to call after shutdown too.
795  pool()->Shutdown();
796  pool()->FlushForTesting();
797}
798
799TEST(SequencedWorkerPoolRefPtrTest, ShutsDownCleanWithContinueOnShutdown) {
800  MessageLoop loop;
801  scoped_refptr<SequencedWorkerPool> pool(new SequencedWorkerPool(3, "Pool"));
802  scoped_refptr<SequencedTaskRunner> task_runner =
803      pool->GetSequencedTaskRunnerWithShutdownBehavior(
804          pool->GetSequenceToken(),
805          base::SequencedWorkerPool::CONTINUE_ON_SHUTDOWN);
806
807  // Upon test exit, should shut down without hanging.
808  pool->Shutdown();
809}
810
811class SequencedWorkerPoolTaskRunnerTestDelegate {
812 public:
813  SequencedWorkerPoolTaskRunnerTestDelegate() {}
814
815  ~SequencedWorkerPoolTaskRunnerTestDelegate() {}
816
817  void StartTaskRunner() {
818    pool_owner_.reset(
819        new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
820  }
821
822  scoped_refptr<SequencedWorkerPool> GetTaskRunner() {
823    return pool_owner_->pool();
824  }
825
826  void StopTaskRunner() {
827    // Make sure all tasks are run before shutting down. Delayed tasks are
828    // not run, they're simply deleted.
829    pool_owner_->pool()->FlushForTesting();
830    pool_owner_->pool()->Shutdown();
831    // Don't reset |pool_owner_| here, as the test may still hold a
832    // reference to the pool.
833  }
834
835  bool TaskRunnerHandlesNonZeroDelays() const {
836    return true;
837  }
838
839 private:
840  MessageLoop message_loop_;
841  scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
842};
843
844INSTANTIATE_TYPED_TEST_CASE_P(
845    SequencedWorkerPool, TaskRunnerTest,
846    SequencedWorkerPoolTaskRunnerTestDelegate);
847
848class SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate {
849 public:
850  SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {}
851
852  ~SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate() {
853  }
854
855  void StartTaskRunner() {
856    pool_owner_.reset(
857        new SequencedWorkerPoolOwner(10, "SequencedWorkerPoolTaskRunnerTest"));
858    task_runner_ = pool_owner_->pool()->GetTaskRunnerWithShutdownBehavior(
859        SequencedWorkerPool::BLOCK_SHUTDOWN);
860  }
861
862  scoped_refptr<TaskRunner> GetTaskRunner() {
863    return task_runner_;
864  }
865
866  void StopTaskRunner() {
867    // Make sure all tasks are run before shutting down. Delayed tasks are
868    // not run, they're simply deleted.
869    pool_owner_->pool()->FlushForTesting();
870    pool_owner_->pool()->Shutdown();
871    // Don't reset |pool_owner_| here, as the test may still hold a
872    // reference to the pool.
873  }
874
875  bool TaskRunnerHandlesNonZeroDelays() const {
876    return true;
877  }
878
879 private:
880  MessageLoop message_loop_;
881  scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
882  scoped_refptr<TaskRunner> task_runner_;
883};
884
885INSTANTIATE_TYPED_TEST_CASE_P(
886    SequencedWorkerPoolTaskRunner, TaskRunnerTest,
887    SequencedWorkerPoolTaskRunnerWithShutdownBehaviorTestDelegate);
888
889class SequencedWorkerPoolSequencedTaskRunnerTestDelegate {
890 public:
891  SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {}
892
893  ~SequencedWorkerPoolSequencedTaskRunnerTestDelegate() {
894  }
895
896  void StartTaskRunner() {
897    pool_owner_.reset(new SequencedWorkerPoolOwner(
898        10, "SequencedWorkerPoolSequencedTaskRunnerTest"));
899    task_runner_ = pool_owner_->pool()->GetSequencedTaskRunner(
900        pool_owner_->pool()->GetSequenceToken());
901  }
902
903  scoped_refptr<SequencedTaskRunner> GetTaskRunner() {
904    return task_runner_;
905  }
906
907  void StopTaskRunner() {
908    // Make sure all tasks are run before shutting down. Delayed tasks are
909    // not run, they're simply deleted.
910    pool_owner_->pool()->FlushForTesting();
911    pool_owner_->pool()->Shutdown();
912    // Don't reset |pool_owner_| here, as the test may still hold a
913    // reference to the pool.
914  }
915
916  bool TaskRunnerHandlesNonZeroDelays() const {
917    return true;
918  }
919
920 private:
921  MessageLoop message_loop_;
922  scoped_ptr<SequencedWorkerPoolOwner> pool_owner_;
923  scoped_refptr<SequencedTaskRunner> task_runner_;
924};
925
926INSTANTIATE_TYPED_TEST_CASE_P(
927    SequencedWorkerPoolSequencedTaskRunner, TaskRunnerTest,
928    SequencedWorkerPoolSequencedTaskRunnerTestDelegate);
929
930INSTANTIATE_TYPED_TEST_CASE_P(
931    SequencedWorkerPoolSequencedTaskRunner, SequencedTaskRunnerTest,
932    SequencedWorkerPoolSequencedTaskRunnerTestDelegate);
933
934}  // namespace
935
936}  // namespace base
937