1// Copyright (c) 2010 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "base/threading/simple_thread.h"
6
7#include "base/logging.h"
8#include "base/strings/string_number_conversions.h"
9#include "base/threading/platform_thread.h"
10#include "base/threading/thread_restrictions.h"
11
12namespace base {
13
14SimpleThread::SimpleThread(const std::string& name_prefix)
15    : name_prefix_(name_prefix), name_(name_prefix),
16      thread_(), event_(true, false), tid_(0), joined_(false) {
17}
18
19SimpleThread::SimpleThread(const std::string& name_prefix,
20                           const Options& options)
21    : name_prefix_(name_prefix), name_(name_prefix), options_(options),
22      thread_(), event_(true, false), tid_(0), joined_(false) {
23}
24
25SimpleThread::~SimpleThread() {
26  DCHECK(HasBeenStarted()) << "SimpleThread was never started.";
27  DCHECK(HasBeenJoined()) << "SimpleThread destroyed without being Join()ed.";
28}
29
30void SimpleThread::Start() {
31  DCHECK(!HasBeenStarted()) << "Tried to Start a thread multiple times.";
32  bool success;
33  if (options_.priority() == ThreadPriority::NORMAL) {
34    success = PlatformThread::Create(options_.stack_size(), this, &thread_);
35  } else {
36    success = PlatformThread::CreateWithPriority(options_.stack_size(), this,
37                                                 &thread_, options_.priority());
38  }
39  DCHECK(success);
40  base::ThreadRestrictions::ScopedAllowWait allow_wait;
41  event_.Wait();  // Wait for the thread to complete initialization.
42}
43
44void SimpleThread::Join() {
45  DCHECK(HasBeenStarted()) << "Tried to Join a never-started thread.";
46  DCHECK(!HasBeenJoined()) << "Tried to Join a thread multiple times.";
47  PlatformThread::Join(thread_);
48  joined_ = true;
49}
50
51bool SimpleThread::HasBeenStarted() {
52  base::ThreadRestrictions::ScopedAllowWait allow_wait;
53  return event_.IsSignaled();
54}
55
56void SimpleThread::ThreadMain() {
57  tid_ = PlatformThread::CurrentId();
58  // Construct our full name of the form "name_prefix_/TID".
59  name_.push_back('/');
60  name_.append(IntToString(tid_));
61  PlatformThread::SetName(name_);
62
63  // We've initialized our new thread, signal that we're done to Start().
64  event_.Signal();
65
66  Run();
67}
68
69DelegateSimpleThread::DelegateSimpleThread(Delegate* delegate,
70                                           const std::string& name_prefix)
71    : SimpleThread(name_prefix),
72      delegate_(delegate) {
73}
74
75DelegateSimpleThread::DelegateSimpleThread(Delegate* delegate,
76                                           const std::string& name_prefix,
77                                           const Options& options)
78    : SimpleThread(name_prefix, options),
79      delegate_(delegate) {
80}
81
82DelegateSimpleThread::~DelegateSimpleThread() {
83}
84
85void DelegateSimpleThread::Run() {
86  DCHECK(delegate_) << "Tried to call Run without a delegate (called twice?)";
87  delegate_->Run();
88  delegate_ = NULL;
89}
90
91DelegateSimpleThreadPool::DelegateSimpleThreadPool(
92    const std::string& name_prefix,
93    int num_threads)
94    : name_prefix_(name_prefix),
95      num_threads_(num_threads),
96      dry_(true, false) {
97}
98
99DelegateSimpleThreadPool::~DelegateSimpleThreadPool() {
100  DCHECK(threads_.empty());
101  DCHECK(delegates_.empty());
102  DCHECK(!dry_.IsSignaled());
103}
104
105void DelegateSimpleThreadPool::Start() {
106  DCHECK(threads_.empty()) << "Start() called with outstanding threads.";
107  for (int i = 0; i < num_threads_; ++i) {
108    DelegateSimpleThread* thread = new DelegateSimpleThread(this, name_prefix_);
109    thread->Start();
110    threads_.push_back(thread);
111  }
112}
113
114void DelegateSimpleThreadPool::JoinAll() {
115  DCHECK(!threads_.empty()) << "JoinAll() called with no outstanding threads.";
116
117  // Tell all our threads to quit their worker loop.
118  AddWork(NULL, num_threads_);
119
120  // Join and destroy all the worker threads.
121  for (int i = 0; i < num_threads_; ++i) {
122    threads_[i]->Join();
123    delete threads_[i];
124  }
125  threads_.clear();
126  DCHECK(delegates_.empty());
127}
128
129void DelegateSimpleThreadPool::AddWork(Delegate* delegate, int repeat_count) {
130  AutoLock locked(lock_);
131  for (int i = 0; i < repeat_count; ++i)
132    delegates_.push(delegate);
133  // If we were empty, signal that we have work now.
134  if (!dry_.IsSignaled())
135    dry_.Signal();
136}
137
138void DelegateSimpleThreadPool::Run() {
139  Delegate* work = NULL;
140
141  while (true) {
142    dry_.Wait();
143    {
144      AutoLock locked(lock_);
145      if (!dry_.IsSignaled())
146        continue;
147
148      DCHECK(!delegates_.empty());
149      work = delegates_.front();
150      delegates_.pop();
151
152      // Signal to any other threads that we're currently out of work.
153      if (delegates_.empty())
154        dry_.Reset();
155    }
156
157    // A NULL delegate pointer signals us to quit.
158    if (!work)
159      break;
160
161    work->Run();
162  }
163}
164
165}  // namespace base
166