1// Copyright (c) 2010 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "base/threading/simple_thread.h"
6
7#include "base/logging.h"
8#include "base/strings/string_number_conversions.h"
9#include "base/threading/platform_thread.h"
10#include "base/threading/thread_restrictions.h"
11
12namespace base {
13
14SimpleThread::SimpleThread(const std::string& name_prefix)
15    : SimpleThread(name_prefix, Options()) {}
16
17SimpleThread::SimpleThread(const std::string& name_prefix,
18                           const Options& options)
19    : name_prefix_(name_prefix),
20      options_(options),
21      event_(WaitableEvent::ResetPolicy::MANUAL,
22             WaitableEvent::InitialState::NOT_SIGNALED) {}
23
24SimpleThread::~SimpleThread() {
25  DCHECK(HasBeenStarted()) << "SimpleThread was never started.";
26  DCHECK(!options_.joinable || HasBeenJoined())
27      << "Joinable SimpleThread destroyed without being Join()ed.";
28}
29
30void SimpleThread::Start() {
31  DCHECK(!HasBeenStarted()) << "Tried to Start a thread multiple times.";
32  bool success =
33      options_.joinable
34          ? PlatformThread::CreateWithPriority(options_.stack_size, this,
35                                               &thread_, options_.priority)
36          : PlatformThread::CreateNonJoinableWithPriority(
37                options_.stack_size, this, options_.priority);
38  DCHECK(success);
39  ThreadRestrictions::ScopedAllowWait allow_wait;
40  event_.Wait();  // Wait for the thread to complete initialization.
41}
42
43void SimpleThread::Join() {
44  DCHECK(options_.joinable) << "A non-joinable thread can't be joined.";
45  DCHECK(HasBeenStarted()) << "Tried to Join a never-started thread.";
46  DCHECK(!HasBeenJoined()) << "Tried to Join a thread multiple times.";
47  PlatformThread::Join(thread_);
48  thread_ = PlatformThreadHandle();
49  joined_ = true;
50}
51
52bool SimpleThread::HasBeenStarted() {
53  ThreadRestrictions::ScopedAllowWait allow_wait;
54  return event_.IsSignaled();
55}
56
57void SimpleThread::ThreadMain() {
58  tid_ = PlatformThread::CurrentId();
59  // Construct our full name of the form "name_prefix_/TID".
60  std::string name(name_prefix_);
61  name.push_back('/');
62  name.append(IntToString(tid_));
63  PlatformThread::SetName(name);
64
65  // We've initialized our new thread, signal that we're done to Start().
66  event_.Signal();
67
68  Run();
69}
70
71DelegateSimpleThread::DelegateSimpleThread(Delegate* delegate,
72                                           const std::string& name_prefix)
73    : DelegateSimpleThread(delegate, name_prefix, Options()) {}
74
75DelegateSimpleThread::DelegateSimpleThread(Delegate* delegate,
76                                           const std::string& name_prefix,
77                                           const Options& options)
78    : SimpleThread(name_prefix, options),
79      delegate_(delegate) {
80  DCHECK(delegate_);
81}
82
83DelegateSimpleThread::~DelegateSimpleThread() = default;
84
85void DelegateSimpleThread::Run() {
86  DCHECK(delegate_) << "Tried to call Run without a delegate (called twice?)";
87
88  // Non-joinable DelegateSimpleThreads are allowed to be deleted during Run().
89  // Member state must not be accessed after invoking Run().
90  Delegate* delegate = delegate_;
91  delegate_ = nullptr;
92  delegate->Run();
93}
94
95DelegateSimpleThreadPool::DelegateSimpleThreadPool(
96    const std::string& name_prefix,
97    int num_threads)
98    : name_prefix_(name_prefix),
99      num_threads_(num_threads),
100      dry_(WaitableEvent::ResetPolicy::MANUAL,
101           WaitableEvent::InitialState::NOT_SIGNALED) {}
102
103DelegateSimpleThreadPool::~DelegateSimpleThreadPool() {
104  DCHECK(threads_.empty());
105  DCHECK(delegates_.empty());
106  DCHECK(!dry_.IsSignaled());
107}
108
109void DelegateSimpleThreadPool::Start() {
110  DCHECK(threads_.empty()) << "Start() called with outstanding threads.";
111  for (int i = 0; i < num_threads_; ++i) {
112    DelegateSimpleThread* thread = new DelegateSimpleThread(this, name_prefix_);
113    thread->Start();
114    threads_.push_back(thread);
115  }
116}
117
118void DelegateSimpleThreadPool::JoinAll() {
119  DCHECK(!threads_.empty()) << "JoinAll() called with no outstanding threads.";
120
121  // Tell all our threads to quit their worker loop.
122  AddWork(NULL, num_threads_);
123
124  // Join and destroy all the worker threads.
125  for (int i = 0; i < num_threads_; ++i) {
126    threads_[i]->Join();
127    delete threads_[i];
128  }
129  threads_.clear();
130  DCHECK(delegates_.empty());
131}
132
133void DelegateSimpleThreadPool::AddWork(Delegate* delegate, int repeat_count) {
134  AutoLock locked(lock_);
135  for (int i = 0; i < repeat_count; ++i)
136    delegates_.push(delegate);
137  // If we were empty, signal that we have work now.
138  if (!dry_.IsSignaled())
139    dry_.Signal();
140}
141
142void DelegateSimpleThreadPool::Run() {
143  Delegate* work = NULL;
144
145  while (true) {
146    dry_.Wait();
147    {
148      AutoLock locked(lock_);
149      if (!dry_.IsSignaled())
150        continue;
151
152      DCHECK(!delegates_.empty());
153      work = delegates_.front();
154      delegates_.pop();
155
156      // Signal to any other threads that we're currently out of work.
157      if (delegates_.empty())
158        dry_.Reset();
159    }
160
161    // A NULL delegate pointer signals us to quit.
162    if (!work)
163      break;
164
165    work->Run();
166  }
167}
168
169}  // namespace base
170