1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5// Multi-threaded tests of ConditionVariable class.
6
7#include <time.h>
8#include <algorithm>
9#include <vector>
10
11#include "base/bind.h"
12#include "base/logging.h"
13#include "base/memory/scoped_ptr.h"
14#include "base/synchronization/condition_variable.h"
15#include "base/synchronization/lock.h"
16#include "base/synchronization/spin_wait.h"
17#include "base/threading/platform_thread.h"
18#include "base/threading/thread.h"
19#include "base/threading/thread_collision_warner.h"
20#include "base/time/time.h"
21#include "testing/gtest/include/gtest/gtest.h"
22#include "testing/platform_test.h"
23
24namespace base {
25
26namespace {
27//------------------------------------------------------------------------------
28// Define our test class, with several common variables.
29//------------------------------------------------------------------------------
30
31class ConditionVariableTest : public PlatformTest {
32 public:
33  const TimeDelta kZeroMs;
34  const TimeDelta kTenMs;
35  const TimeDelta kThirtyMs;
36  const TimeDelta kFortyFiveMs;
37  const TimeDelta kSixtyMs;
38  const TimeDelta kOneHundredMs;
39
40  ConditionVariableTest()
41      : kZeroMs(TimeDelta::FromMilliseconds(0)),
42        kTenMs(TimeDelta::FromMilliseconds(10)),
43        kThirtyMs(TimeDelta::FromMilliseconds(30)),
44        kFortyFiveMs(TimeDelta::FromMilliseconds(45)),
45        kSixtyMs(TimeDelta::FromMilliseconds(60)),
46        kOneHundredMs(TimeDelta::FromMilliseconds(100)) {
47  }
48};
49
50//------------------------------------------------------------------------------
51// Define a class that will control activities an several multi-threaded tests.
52// The general structure of multi-threaded tests is that a test case will
53// construct an instance of a WorkQueue.  The WorkQueue will spin up some
54// threads and control them throughout their lifetime, as well as maintaining
55// a central repository of the work thread's activity.  Finally, the WorkQueue
56// will command the the worker threads to terminate.  At that point, the test
57// cases will validate that the WorkQueue has records showing that the desired
58// activities were performed.
59//------------------------------------------------------------------------------
60
61// Callers are responsible for synchronizing access to the following class.
62// The WorkQueue::lock_, as accessed via WorkQueue::lock(), should be used for
63// all synchronized access.
64class WorkQueue : public PlatformThread::Delegate {
65 public:
66  explicit WorkQueue(int thread_count);
67  virtual ~WorkQueue();
68
69  // PlatformThread::Delegate interface.
70  virtual void ThreadMain() OVERRIDE;
71
72  //----------------------------------------------------------------------------
73  // Worker threads only call the following methods.
74  // They should use the lock to get exclusive access.
75  int GetThreadId();  // Get an ID assigned to a thread..
76  bool EveryIdWasAllocated() const;  // Indicates that all IDs were handed out.
77  TimeDelta GetAnAssignment(int thread_id);  // Get a work task duration.
78  void WorkIsCompleted(int thread_id);
79
80  int task_count() const;
81  bool allow_help_requests() const;  // Workers can signal more workers.
82  bool shutdown() const;  // Check if shutdown has been requested.
83
84  void thread_shutting_down();
85
86
87  //----------------------------------------------------------------------------
88  // Worker threads can call them but not needed to acquire a lock.
89  Lock* lock();
90
91  ConditionVariable* work_is_available();
92  ConditionVariable* all_threads_have_ids();
93  ConditionVariable* no_more_tasks();
94
95  //----------------------------------------------------------------------------
96  // The rest of the methods are for use by the controlling master thread (the
97  // test case code).
98  void ResetHistory();
99  int GetMinCompletionsByWorkerThread() const;
100  int GetMaxCompletionsByWorkerThread() const;
101  int GetNumThreadsTakingAssignments() const;
102  int GetNumThreadsCompletingTasks() const;
103  int GetNumberOfCompletedTasks() const;
104
105  void SetWorkTime(TimeDelta delay);
106  void SetTaskCount(int count);
107  void SetAllowHelp(bool allow);
108
109  // The following must be called without locking, and will spin wait until the
110  // threads are all in a wait state.
111  void SpinUntilAllThreadsAreWaiting();
112  void SpinUntilTaskCountLessThan(int task_count);
113
114  // Caller must acquire lock before calling.
115  void SetShutdown();
116
117  // Compares the |shutdown_task_count_| to the |thread_count| and returns true
118  // if they are equal.  This check will acquire the |lock_| so the caller
119  // should not hold the lock when calling this method.
120  bool ThreadSafeCheckShutdown(int thread_count);
121
122 private:
123  // Both worker threads and controller use the following to synchronize.
124  Lock lock_;
125  ConditionVariable work_is_available_;  // To tell threads there is work.
126
127  // Conditions to notify the controlling process (if it is interested).
128  ConditionVariable all_threads_have_ids_;  // All threads are running.
129  ConditionVariable no_more_tasks_;  // Task count is zero.
130
131  const int thread_count_;
132  int waiting_thread_count_;
133  scoped_ptr<PlatformThreadHandle[]> thread_handles_;
134  std::vector<int> assignment_history_;  // Number of assignment per worker.
135  std::vector<int> completion_history_;  // Number of completions per worker.
136  int thread_started_counter_;  // Used to issue unique id to workers.
137  int shutdown_task_count_;  // Number of tasks told to shutdown
138  int task_count_;  // Number of assignment tasks waiting to be processed.
139  TimeDelta worker_delay_;  // Time each task takes to complete.
140  bool allow_help_requests_;  // Workers can signal more workers.
141  bool shutdown_;  // Set when threads need to terminate.
142
143  DFAKE_MUTEX(locked_methods_);
144};
145
146//------------------------------------------------------------------------------
147// The next section contains the actual tests.
148//------------------------------------------------------------------------------
149
150TEST_F(ConditionVariableTest, StartupShutdownTest) {
151  Lock lock;
152
153  // First try trivial startup/shutdown.
154  {
155    ConditionVariable cv1(&lock);
156  }  // Call for cv1 destruction.
157
158  // Exercise with at least a few waits.
159  ConditionVariable cv(&lock);
160
161  lock.Acquire();
162  cv.TimedWait(kTenMs);  // Wait for 10 ms.
163  cv.TimedWait(kTenMs);  // Wait for 10 ms.
164  lock.Release();
165
166  lock.Acquire();
167  cv.TimedWait(kTenMs);  // Wait for 10 ms.
168  cv.TimedWait(kTenMs);  // Wait for 10 ms.
169  cv.TimedWait(kTenMs);  // Wait for 10 ms.
170  lock.Release();
171}  // Call for cv destruction.
172
173TEST_F(ConditionVariableTest, TimeoutTest) {
174  Lock lock;
175  ConditionVariable cv(&lock);
176  lock.Acquire();
177
178  TimeTicks start = TimeTicks::Now();
179  const TimeDelta WAIT_TIME = TimeDelta::FromMilliseconds(300);
180  // Allow for clocking rate granularity.
181  const TimeDelta FUDGE_TIME = TimeDelta::FromMilliseconds(50);
182
183  cv.TimedWait(WAIT_TIME + FUDGE_TIME);
184  TimeDelta duration = TimeTicks::Now() - start;
185  // We can't use EXPECT_GE here as the TimeDelta class does not support the
186  // required stream conversion.
187  EXPECT_TRUE(duration >= WAIT_TIME);
188
189  lock.Release();
190}
191
192#if defined(OS_POSIX)
193const int kDiscontinuitySeconds = 2;
194
195void BackInTime(Lock* lock) {
196  AutoLock auto_lock(*lock);
197
198  timeval tv;
199  gettimeofday(&tv, NULL);
200  tv.tv_sec -= kDiscontinuitySeconds;
201  settimeofday(&tv, NULL);
202}
203
204// Tests that TimedWait ignores changes to the system clock.
205// Test is disabled by default, because it needs to run as root to muck with the
206// system clock.
207// http://crbug.com/293736
208TEST_F(ConditionVariableTest, DISABLED_TimeoutAcrossSetTimeOfDay) {
209  timeval tv;
210  gettimeofday(&tv, NULL);
211  tv.tv_sec += kDiscontinuitySeconds;
212  if (settimeofday(&tv, NULL) < 0) {
213    PLOG(ERROR) << "Could not set time of day. Run as root?";
214    return;
215  }
216
217  Lock lock;
218  ConditionVariable cv(&lock);
219  lock.Acquire();
220
221  Thread thread("Helper");
222  thread.Start();
223  thread.message_loop()->PostTask(FROM_HERE, base::Bind(&BackInTime, &lock));
224
225  TimeTicks start = TimeTicks::Now();
226  const TimeDelta kWaitTime = TimeDelta::FromMilliseconds(300);
227  // Allow for clocking rate granularity.
228  const TimeDelta kFudgeTime = TimeDelta::FromMilliseconds(50);
229
230  cv.TimedWait(kWaitTime + kFudgeTime);
231  TimeDelta duration = TimeTicks::Now() - start;
232
233  thread.Stop();
234  // We can't use EXPECT_GE here as the TimeDelta class does not support the
235  // required stream conversion.
236  EXPECT_TRUE(duration >= kWaitTime);
237  EXPECT_TRUE(duration <= TimeDelta::FromSeconds(kDiscontinuitySeconds));
238
239  lock.Release();
240}
241#endif
242
243
244// Suddenly got flaky on Win, see http://crbug.com/10607 (starting at
245// comment #15).
246#if defined(OS_WIN)
247#define MAYBE_MultiThreadConsumerTest DISABLED_MultiThreadConsumerTest
248#else
249#define MAYBE_MultiThreadConsumerTest MultiThreadConsumerTest
250#endif
251// Test serial task servicing, as well as two parallel task servicing methods.
252TEST_F(ConditionVariableTest, MAYBE_MultiThreadConsumerTest) {
253  const int kThreadCount = 10;
254  WorkQueue queue(kThreadCount);  // Start the threads.
255
256  const int kTaskCount = 10;  // Number of tasks in each mini-test here.
257
258  Time start_time;  // Used to time task processing.
259
260  {
261    base::AutoLock auto_lock(*queue.lock());
262    while (!queue.EveryIdWasAllocated())
263      queue.all_threads_have_ids()->Wait();
264  }
265
266  // If threads aren't in a wait state, they may start to gobble up tasks in
267  // parallel, short-circuiting (breaking) this test.
268  queue.SpinUntilAllThreadsAreWaiting();
269
270  {
271    // Since we have no tasks yet, all threads should be waiting by now.
272    base::AutoLock auto_lock(*queue.lock());
273    EXPECT_EQ(0, queue.GetNumThreadsTakingAssignments());
274    EXPECT_EQ(0, queue.GetNumThreadsCompletingTasks());
275    EXPECT_EQ(0, queue.task_count());
276    EXPECT_EQ(0, queue.GetMaxCompletionsByWorkerThread());
277    EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread());
278    EXPECT_EQ(0, queue.GetNumberOfCompletedTasks());
279
280    // Set up to make each task include getting help from another worker, so
281    // so that the work gets done in paralell.
282    queue.ResetHistory();
283    queue.SetTaskCount(kTaskCount);
284    queue.SetWorkTime(kThirtyMs);
285    queue.SetAllowHelp(true);
286
287    start_time = Time::Now();
288  }
289
290  queue.work_is_available()->Signal();  // But each worker can signal another.
291  // Wait till we at least start to handle tasks (and we're not all waiting).
292  queue.SpinUntilTaskCountLessThan(kTaskCount);
293  // Wait to allow the all workers to get done.
294  queue.SpinUntilAllThreadsAreWaiting();
295
296  {
297    // Wait until all work tasks have at least been assigned.
298    base::AutoLock auto_lock(*queue.lock());
299    while (queue.task_count())
300      queue.no_more_tasks()->Wait();
301
302    // To avoid racy assumptions, we'll just assert that at least 2 threads
303    // did work.  We know that the first worker should have gone to sleep, and
304    // hence a second worker should have gotten an assignment.
305    EXPECT_LE(2, queue.GetNumThreadsTakingAssignments());
306    EXPECT_EQ(kTaskCount, queue.GetNumberOfCompletedTasks());
307
308    // Try to ask all workers to help, and only a few will do the work.
309    queue.ResetHistory();
310    queue.SetTaskCount(3);
311    queue.SetWorkTime(kThirtyMs);
312    queue.SetAllowHelp(false);
313  }
314  queue.work_is_available()->Broadcast();  // Make them all try.
315  // Wait till we at least start to handle tasks (and we're not all waiting).
316  queue.SpinUntilTaskCountLessThan(3);
317  // Wait to allow the 3 workers to get done.
318  queue.SpinUntilAllThreadsAreWaiting();
319
320  {
321    base::AutoLock auto_lock(*queue.lock());
322    EXPECT_EQ(3, queue.GetNumThreadsTakingAssignments());
323    EXPECT_EQ(3, queue.GetNumThreadsCompletingTasks());
324    EXPECT_EQ(0, queue.task_count());
325    EXPECT_EQ(1, queue.GetMaxCompletionsByWorkerThread());
326    EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread());
327    EXPECT_EQ(3, queue.GetNumberOfCompletedTasks());
328
329    // Set up to make each task get help from another worker.
330    queue.ResetHistory();
331    queue.SetTaskCount(3);
332    queue.SetWorkTime(kThirtyMs);
333    queue.SetAllowHelp(true);  // Allow (unnecessary) help requests.
334  }
335  queue.work_is_available()->Broadcast();  // Signal all threads.
336  // Wait till we at least start to handle tasks (and we're not all waiting).
337  queue.SpinUntilTaskCountLessThan(3);
338  // Wait to allow the 3 workers to get done.
339  queue.SpinUntilAllThreadsAreWaiting();
340
341  {
342    base::AutoLock auto_lock(*queue.lock());
343    EXPECT_EQ(3, queue.GetNumThreadsTakingAssignments());
344    EXPECT_EQ(3, queue.GetNumThreadsCompletingTasks());
345    EXPECT_EQ(0, queue.task_count());
346    EXPECT_EQ(1, queue.GetMaxCompletionsByWorkerThread());
347    EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread());
348    EXPECT_EQ(3, queue.GetNumberOfCompletedTasks());
349
350    // Set up to make each task get help from another worker.
351    queue.ResetHistory();
352    queue.SetTaskCount(20);  // 2 tasks per thread.
353    queue.SetWorkTime(kThirtyMs);
354    queue.SetAllowHelp(true);
355  }
356  queue.work_is_available()->Signal();  // But each worker can signal another.
357  // Wait till we at least start to handle tasks (and we're not all waiting).
358  queue.SpinUntilTaskCountLessThan(20);
359  // Wait to allow the 10 workers to get done.
360  queue.SpinUntilAllThreadsAreWaiting();  // Should take about 60 ms.
361
362  {
363    base::AutoLock auto_lock(*queue.lock());
364    EXPECT_EQ(10, queue.GetNumThreadsTakingAssignments());
365    EXPECT_EQ(10, queue.GetNumThreadsCompletingTasks());
366    EXPECT_EQ(0, queue.task_count());
367    EXPECT_EQ(20, queue.GetNumberOfCompletedTasks());
368
369    // Same as last test, but with Broadcast().
370    queue.ResetHistory();
371    queue.SetTaskCount(20);  // 2 tasks per thread.
372    queue.SetWorkTime(kThirtyMs);
373    queue.SetAllowHelp(true);
374  }
375  queue.work_is_available()->Broadcast();
376  // Wait till we at least start to handle tasks (and we're not all waiting).
377  queue.SpinUntilTaskCountLessThan(20);
378  // Wait to allow the 10 workers to get done.
379  queue.SpinUntilAllThreadsAreWaiting();  // Should take about 60 ms.
380
381  {
382    base::AutoLock auto_lock(*queue.lock());
383    EXPECT_EQ(10, queue.GetNumThreadsTakingAssignments());
384    EXPECT_EQ(10, queue.GetNumThreadsCompletingTasks());
385    EXPECT_EQ(0, queue.task_count());
386    EXPECT_EQ(20, queue.GetNumberOfCompletedTasks());
387
388    queue.SetShutdown();
389  }
390  queue.work_is_available()->Broadcast();  // Force check for shutdown.
391
392  SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(1),
393                                   queue.ThreadSafeCheckShutdown(kThreadCount));
394}
395
396TEST_F(ConditionVariableTest, LargeFastTaskTest) {
397  const int kThreadCount = 200;
398  WorkQueue queue(kThreadCount);  // Start the threads.
399
400  Lock private_lock;  // Used locally for master to wait.
401  base::AutoLock private_held_lock(private_lock);
402  ConditionVariable private_cv(&private_lock);
403
404  {
405    base::AutoLock auto_lock(*queue.lock());
406    while (!queue.EveryIdWasAllocated())
407      queue.all_threads_have_ids()->Wait();
408  }
409
410  // Wait a bit more to allow threads to reach their wait state.
411  queue.SpinUntilAllThreadsAreWaiting();
412
413  {
414    // Since we have no tasks, all threads should be waiting by now.
415    base::AutoLock auto_lock(*queue.lock());
416    EXPECT_EQ(0, queue.GetNumThreadsTakingAssignments());
417    EXPECT_EQ(0, queue.GetNumThreadsCompletingTasks());
418    EXPECT_EQ(0, queue.task_count());
419    EXPECT_EQ(0, queue.GetMaxCompletionsByWorkerThread());
420    EXPECT_EQ(0, queue.GetMinCompletionsByWorkerThread());
421    EXPECT_EQ(0, queue.GetNumberOfCompletedTasks());
422
423    // Set up to make all workers do (an average of) 20 tasks.
424    queue.ResetHistory();
425    queue.SetTaskCount(20 * kThreadCount);
426    queue.SetWorkTime(kFortyFiveMs);
427    queue.SetAllowHelp(false);
428  }
429  queue.work_is_available()->Broadcast();  // Start up all threads.
430  // Wait until we've handed out all tasks.
431  {
432    base::AutoLock auto_lock(*queue.lock());
433    while (queue.task_count() != 0)
434      queue.no_more_tasks()->Wait();
435  }
436
437  // Wait till the last of the tasks complete.
438  queue.SpinUntilAllThreadsAreWaiting();
439
440  {
441    // With Broadcast(), every thread should have participated.
442    // but with racing.. they may not all have done equal numbers of tasks.
443    base::AutoLock auto_lock(*queue.lock());
444    EXPECT_EQ(kThreadCount, queue.GetNumThreadsTakingAssignments());
445    EXPECT_EQ(kThreadCount, queue.GetNumThreadsCompletingTasks());
446    EXPECT_EQ(0, queue.task_count());
447    EXPECT_LE(20, queue.GetMaxCompletionsByWorkerThread());
448    EXPECT_EQ(20 * kThreadCount, queue.GetNumberOfCompletedTasks());
449
450    // Set up to make all workers do (an average of) 4 tasks.
451    queue.ResetHistory();
452    queue.SetTaskCount(kThreadCount * 4);
453    queue.SetWorkTime(kFortyFiveMs);
454    queue.SetAllowHelp(true);  // Might outperform Broadcast().
455  }
456  queue.work_is_available()->Signal();  // Start up one thread.
457
458  // Wait until we've handed out all tasks
459  {
460    base::AutoLock auto_lock(*queue.lock());
461    while (queue.task_count() != 0)
462      queue.no_more_tasks()->Wait();
463  }
464
465  // Wait till the last of the tasks complete.
466  queue.SpinUntilAllThreadsAreWaiting();
467
468  {
469    // With Signal(), every thread should have participated.
470    // but with racing.. they may not all have done four tasks.
471    base::AutoLock auto_lock(*queue.lock());
472    EXPECT_EQ(kThreadCount, queue.GetNumThreadsTakingAssignments());
473    EXPECT_EQ(kThreadCount, queue.GetNumThreadsCompletingTasks());
474    EXPECT_EQ(0, queue.task_count());
475    EXPECT_LE(4, queue.GetMaxCompletionsByWorkerThread());
476    EXPECT_EQ(4 * kThreadCount, queue.GetNumberOfCompletedTasks());
477
478    queue.SetShutdown();
479  }
480  queue.work_is_available()->Broadcast();  // Force check for shutdown.
481
482  // Wait for shutdowns to complete.
483  SPIN_FOR_TIMEDELTA_OR_UNTIL_TRUE(TimeDelta::FromMinutes(1),
484                                   queue.ThreadSafeCheckShutdown(kThreadCount));
485}
486
487//------------------------------------------------------------------------------
488// Finally we provide the implementation for the methods in the WorkQueue class.
489//------------------------------------------------------------------------------
490
491WorkQueue::WorkQueue(int thread_count)
492  : lock_(),
493    work_is_available_(&lock_),
494    all_threads_have_ids_(&lock_),
495    no_more_tasks_(&lock_),
496    thread_count_(thread_count),
497    waiting_thread_count_(0),
498    thread_handles_(new PlatformThreadHandle[thread_count]),
499    assignment_history_(thread_count),
500    completion_history_(thread_count),
501    thread_started_counter_(0),
502    shutdown_task_count_(0),
503    task_count_(0),
504    allow_help_requests_(false),
505    shutdown_(false) {
506  EXPECT_GE(thread_count_, 1);
507  ResetHistory();
508  SetTaskCount(0);
509  SetWorkTime(TimeDelta::FromMilliseconds(30));
510
511  for (int i = 0; i < thread_count_; ++i) {
512    PlatformThreadHandle pth;
513    EXPECT_TRUE(PlatformThread::Create(0, this, &pth));
514    thread_handles_[i] = pth;
515  }
516}
517
518WorkQueue::~WorkQueue() {
519  {
520    base::AutoLock auto_lock(lock_);
521    SetShutdown();
522  }
523  work_is_available_.Broadcast();  // Tell them all to terminate.
524
525  for (int i = 0; i < thread_count_; ++i) {
526    PlatformThread::Join(thread_handles_[i]);
527  }
528  EXPECT_EQ(0, waiting_thread_count_);
529}
530
531int WorkQueue::GetThreadId() {
532  DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
533  DCHECK(!EveryIdWasAllocated());
534  return thread_started_counter_++;  // Give out Unique IDs.
535}
536
537bool WorkQueue::EveryIdWasAllocated() const {
538  DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
539  return thread_count_ == thread_started_counter_;
540}
541
542TimeDelta WorkQueue::GetAnAssignment(int thread_id) {
543  DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
544  DCHECK_LT(0, task_count_);
545  assignment_history_[thread_id]++;
546  if (0 == --task_count_) {
547    no_more_tasks_.Signal();
548  }
549  return worker_delay_;
550}
551
552void WorkQueue::WorkIsCompleted(int thread_id) {
553  DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
554  completion_history_[thread_id]++;
555}
556
557int WorkQueue::task_count() const {
558  DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
559  return task_count_;
560}
561
562bool WorkQueue::allow_help_requests() const {
563  DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
564  return allow_help_requests_;
565}
566
567bool WorkQueue::shutdown() const {
568  lock_.AssertAcquired();
569  DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
570  return shutdown_;
571}
572
573// Because this method is called from the test's main thread we need to actually
574// take the lock.  Threads will call the thread_shutting_down() method with the
575// lock already acquired.
576bool WorkQueue::ThreadSafeCheckShutdown(int thread_count) {
577  bool all_shutdown;
578  base::AutoLock auto_lock(lock_);
579  {
580    // Declare in scope so DFAKE is guranteed to be destroyed before AutoLock.
581    DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
582    all_shutdown = (shutdown_task_count_ == thread_count);
583  }
584  return all_shutdown;
585}
586
587void WorkQueue::thread_shutting_down() {
588  lock_.AssertAcquired();
589  DFAKE_SCOPED_RECURSIVE_LOCK(locked_methods_);
590  shutdown_task_count_++;
591}
592
593Lock* WorkQueue::lock() {
594  return &lock_;
595}
596
597ConditionVariable* WorkQueue::work_is_available() {
598  return &work_is_available_;
599}
600
601ConditionVariable* WorkQueue::all_threads_have_ids() {
602  return &all_threads_have_ids_;
603}
604
605ConditionVariable* WorkQueue::no_more_tasks() {
606  return &no_more_tasks_;
607}
608
609void WorkQueue::ResetHistory() {
610  for (int i = 0; i < thread_count_; ++i) {
611    assignment_history_[i] = 0;
612    completion_history_[i] = 0;
613  }
614}
615
616int WorkQueue::GetMinCompletionsByWorkerThread() const {
617  int minumum = completion_history_[0];
618  for (int i = 0; i < thread_count_; ++i)
619    minumum = std::min(minumum, completion_history_[i]);
620  return minumum;
621}
622
623int WorkQueue::GetMaxCompletionsByWorkerThread() const {
624  int maximum = completion_history_[0];
625  for (int i = 0; i < thread_count_; ++i)
626    maximum = std::max(maximum, completion_history_[i]);
627  return maximum;
628}
629
630int WorkQueue::GetNumThreadsTakingAssignments() const {
631  int count = 0;
632  for (int i = 0; i < thread_count_; ++i)
633    if (assignment_history_[i])
634      count++;
635  return count;
636}
637
638int WorkQueue::GetNumThreadsCompletingTasks() const {
639  int count = 0;
640  for (int i = 0; i < thread_count_; ++i)
641    if (completion_history_[i])
642      count++;
643  return count;
644}
645
646int WorkQueue::GetNumberOfCompletedTasks() const {
647  int total = 0;
648  for (int i = 0; i < thread_count_; ++i)
649    total += completion_history_[i];
650  return total;
651}
652
653void WorkQueue::SetWorkTime(TimeDelta delay) {
654  worker_delay_ = delay;
655}
656
657void WorkQueue::SetTaskCount(int count) {
658  task_count_ = count;
659}
660
661void WorkQueue::SetAllowHelp(bool allow) {
662  allow_help_requests_ = allow;
663}
664
665void WorkQueue::SetShutdown() {
666  lock_.AssertAcquired();
667  shutdown_ = true;
668}
669
670void WorkQueue::SpinUntilAllThreadsAreWaiting() {
671  while (true) {
672    {
673      base::AutoLock auto_lock(lock_);
674      if (waiting_thread_count_ == thread_count_)
675        break;
676    }
677    PlatformThread::Sleep(TimeDelta::FromMilliseconds(30));
678  }
679}
680
681void WorkQueue::SpinUntilTaskCountLessThan(int task_count) {
682  while (true) {
683    {
684      base::AutoLock auto_lock(lock_);
685      if (task_count_ < task_count)
686        break;
687    }
688    PlatformThread::Sleep(TimeDelta::FromMilliseconds(30));
689  }
690}
691
692
693//------------------------------------------------------------------------------
694// Define the standard worker task. Several tests will spin out many of these
695// threads.
696//------------------------------------------------------------------------------
697
698// The multithread tests involve several threads with a task to perform as
699// directed by an instance of the class WorkQueue.
700// The task is to:
701// a) Check to see if there are more tasks (there is a task counter).
702//    a1) Wait on condition variable if there are no tasks currently.
703// b) Call a function to see what should be done.
704// c) Do some computation based on the number of milliseconds returned in (b).
705// d) go back to (a).
706
707// WorkQueue::ThreadMain() implements the above task for all threads.
708// It calls the controlling object to tell the creator about progress, and to
709// ask about tasks.
710
711void WorkQueue::ThreadMain() {
712  int thread_id;
713  {
714    base::AutoLock auto_lock(lock_);
715    thread_id = GetThreadId();
716    if (EveryIdWasAllocated())
717      all_threads_have_ids()->Signal();  // Tell creator we're ready.
718  }
719
720  Lock private_lock;  // Used to waste time on "our work".
721  while (1) {  // This is the main consumer loop.
722    TimeDelta work_time;
723    bool could_use_help;
724    {
725      base::AutoLock auto_lock(lock_);
726      while (0 == task_count() && !shutdown()) {
727        ++waiting_thread_count_;
728        work_is_available()->Wait();
729        --waiting_thread_count_;
730      }
731      if (shutdown()) {
732        // Ack the notification of a shutdown message back to the controller.
733        thread_shutting_down();
734        return;  // Terminate.
735      }
736      // Get our task duration from the queue.
737      work_time = GetAnAssignment(thread_id);
738      could_use_help = (task_count() > 0) && allow_help_requests();
739    }  // Release lock
740
741    // Do work (outside of locked region.
742    if (could_use_help)
743      work_is_available()->Signal();  // Get help from other threads.
744
745    if (work_time > TimeDelta::FromMilliseconds(0)) {
746      // We could just sleep(), but we'll instead further exercise the
747      // condition variable class, and do a timed wait.
748      base::AutoLock auto_lock(private_lock);
749      ConditionVariable private_cv(&private_lock);
750      private_cv.TimedWait(work_time);  // Unsynchronized waiting.
751    }
752
753    {
754      base::AutoLock auto_lock(lock_);
755      // Send notification that we completed our "work."
756      WorkIsCompleted(thread_id);
757    }
758  }
759}
760
761}  // namespace
762
763}  // namespace base
764