1// Copyright (c) 2010 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "base/threading/simple_thread.h" 6 7#include "base/logging.h" 8#include "base/strings/string_number_conversions.h" 9#include "base/threading/platform_thread.h" 10#include "base/threading/thread_restrictions.h" 11 12namespace base { 13 14SimpleThread::SimpleThread(const std::string& name_prefix) 15 : SimpleThread(name_prefix, Options()) {} 16 17SimpleThread::SimpleThread(const std::string& name_prefix, 18 const Options& options) 19 : name_prefix_(name_prefix), 20 options_(options), 21 event_(WaitableEvent::ResetPolicy::MANUAL, 22 WaitableEvent::InitialState::NOT_SIGNALED) {} 23 24SimpleThread::~SimpleThread() { 25 DCHECK(HasBeenStarted()) << "SimpleThread was never started."; 26 DCHECK(!options_.joinable || HasBeenJoined()) 27 << "Joinable SimpleThread destroyed without being Join()ed."; 28} 29 30void SimpleThread::Start() { 31 DCHECK(!HasBeenStarted()) << "Tried to Start a thread multiple times."; 32 bool success = 33 options_.joinable 34 ? PlatformThread::CreateWithPriority(options_.stack_size, this, 35 &thread_, options_.priority) 36 : PlatformThread::CreateNonJoinableWithPriority( 37 options_.stack_size, this, options_.priority); 38 DCHECK(success); 39 ThreadRestrictions::ScopedAllowWait allow_wait; 40 event_.Wait(); // Wait for the thread to complete initialization. 41} 42 43void SimpleThread::Join() { 44 DCHECK(options_.joinable) << "A non-joinable thread can't be joined."; 45 DCHECK(HasBeenStarted()) << "Tried to Join a never-started thread."; 46 DCHECK(!HasBeenJoined()) << "Tried to Join a thread multiple times."; 47 PlatformThread::Join(thread_); 48 thread_ = PlatformThreadHandle(); 49 joined_ = true; 50} 51 52bool SimpleThread::HasBeenStarted() { 53 ThreadRestrictions::ScopedAllowWait allow_wait; 54 return event_.IsSignaled(); 55} 56 57void SimpleThread::ThreadMain() { 58 tid_ = PlatformThread::CurrentId(); 59 // Construct our full name of the form "name_prefix_/TID". 60 std::string name(name_prefix_); 61 name.push_back('/'); 62 name.append(IntToString(tid_)); 63 PlatformThread::SetName(name); 64 65 // We've initialized our new thread, signal that we're done to Start(). 66 event_.Signal(); 67 68 Run(); 69} 70 71DelegateSimpleThread::DelegateSimpleThread(Delegate* delegate, 72 const std::string& name_prefix) 73 : DelegateSimpleThread(delegate, name_prefix, Options()) {} 74 75DelegateSimpleThread::DelegateSimpleThread(Delegate* delegate, 76 const std::string& name_prefix, 77 const Options& options) 78 : SimpleThread(name_prefix, options), 79 delegate_(delegate) { 80 DCHECK(delegate_); 81} 82 83DelegateSimpleThread::~DelegateSimpleThread() = default; 84 85void DelegateSimpleThread::Run() { 86 DCHECK(delegate_) << "Tried to call Run without a delegate (called twice?)"; 87 88 // Non-joinable DelegateSimpleThreads are allowed to be deleted during Run(). 89 // Member state must not be accessed after invoking Run(). 90 Delegate* delegate = delegate_; 91 delegate_ = nullptr; 92 delegate->Run(); 93} 94 95DelegateSimpleThreadPool::DelegateSimpleThreadPool( 96 const std::string& name_prefix, 97 int num_threads) 98 : name_prefix_(name_prefix), 99 num_threads_(num_threads), 100 dry_(WaitableEvent::ResetPolicy::MANUAL, 101 WaitableEvent::InitialState::NOT_SIGNALED) {} 102 103DelegateSimpleThreadPool::~DelegateSimpleThreadPool() { 104 DCHECK(threads_.empty()); 105 DCHECK(delegates_.empty()); 106 DCHECK(!dry_.IsSignaled()); 107} 108 109void DelegateSimpleThreadPool::Start() { 110 DCHECK(threads_.empty()) << "Start() called with outstanding threads."; 111 for (int i = 0; i < num_threads_; ++i) { 112 DelegateSimpleThread* thread = new DelegateSimpleThread(this, name_prefix_); 113 thread->Start(); 114 threads_.push_back(thread); 115 } 116} 117 118void DelegateSimpleThreadPool::JoinAll() { 119 DCHECK(!threads_.empty()) << "JoinAll() called with no outstanding threads."; 120 121 // Tell all our threads to quit their worker loop. 122 AddWork(NULL, num_threads_); 123 124 // Join and destroy all the worker threads. 125 for (int i = 0; i < num_threads_; ++i) { 126 threads_[i]->Join(); 127 delete threads_[i]; 128 } 129 threads_.clear(); 130 DCHECK(delegates_.empty()); 131} 132 133void DelegateSimpleThreadPool::AddWork(Delegate* delegate, int repeat_count) { 134 AutoLock locked(lock_); 135 for (int i = 0; i < repeat_count; ++i) 136 delegates_.push(delegate); 137 // If we were empty, signal that we have work now. 138 if (!dry_.IsSignaled()) 139 dry_.Signal(); 140} 141 142void DelegateSimpleThreadPool::Run() { 143 Delegate* work = NULL; 144 145 while (true) { 146 dry_.Wait(); 147 { 148 AutoLock locked(lock_); 149 if (!dry_.IsSignaled()) 150 continue; 151 152 DCHECK(!delegates_.empty()); 153 work = delegates_.front(); 154 delegates_.pop(); 155 156 // Signal to any other threads that we're currently out of work. 157 if (delegates_.empty()) 158 dry_.Reset(); 159 } 160 161 // A NULL delegate pointer signals us to quit. 162 if (!work) 163 break; 164 165 work->Run(); 166 } 167} 168 169} // namespace base 170