sequenced_worker_pool.cc revision eb525c5499e34cc9c4b825d6d9e75bb07cc06ace
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "base/threading/sequenced_worker_pool.h" 6 7#include <list> 8#include <map> 9#include <set> 10#include <utility> 11#include <vector> 12 13#include "base/atomicops.h" 14#include "base/callback.h" 15#include "base/compiler_specific.h" 16#include "base/critical_closure.h" 17#include "base/debug/trace_event.h" 18#include "base/logging.h" 19#include "base/memory/linked_ptr.h" 20#include "base/message_loop/message_loop_proxy.h" 21#include "base/metrics/histogram.h" 22#include "base/stl_util.h" 23#include "base/strings/stringprintf.h" 24#include "base/synchronization/condition_variable.h" 25#include "base/synchronization/lock.h" 26#include "base/threading/platform_thread.h" 27#include "base/threading/simple_thread.h" 28#include "base/threading/thread_restrictions.h" 29#include "base/time/time.h" 30#include "base/tracked_objects.h" 31 32#if defined(OS_MACOSX) 33#include "base/mac/scoped_nsautorelease_pool.h" 34#endif 35 36namespace base { 37 38namespace { 39 40struct SequencedTask : public TrackingInfo { 41 SequencedTask() 42 : sequence_token_id(0), 43 trace_id(0), 44 sequence_task_number(0), 45 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} 46 47 explicit SequencedTask(const tracked_objects::Location& from_here) 48 : base::TrackingInfo(from_here, TimeTicks()), 49 sequence_token_id(0), 50 trace_id(0), 51 sequence_task_number(0), 52 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} 53 54 ~SequencedTask() {} 55 56 int sequence_token_id; 57 int trace_id; 58 int64 sequence_task_number; 59 SequencedWorkerPool::WorkerShutdown shutdown_behavior; 60 tracked_objects::Location posted_from; 61 Closure task; 62 63 // Non-delayed tasks and delayed tasks are managed together by time-to-run 64 // order. We calculate the time by adding the posted time and the given delay. 65 TimeTicks time_to_run; 66}; 67 68struct SequencedTaskLessThan { 69 public: 70 bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const { 71 if (lhs.time_to_run < rhs.time_to_run) 72 return true; 73 74 if (lhs.time_to_run > rhs.time_to_run) 75 return false; 76 77 // If the time happen to match, then we use the sequence number to decide. 78 return lhs.sequence_task_number < rhs.sequence_task_number; 79 } 80}; 81 82// SequencedWorkerPoolTaskRunner --------------------------------------------- 83// A TaskRunner which posts tasks to a SequencedWorkerPool with a 84// fixed ShutdownBehavior. 85// 86// Note that this class is RefCountedThreadSafe (inherited from TaskRunner). 87class SequencedWorkerPoolTaskRunner : public TaskRunner { 88 public: 89 SequencedWorkerPoolTaskRunner( 90 const scoped_refptr<SequencedWorkerPool>& pool, 91 SequencedWorkerPool::WorkerShutdown shutdown_behavior); 92 93 // TaskRunner implementation 94 virtual bool PostDelayedTask(const tracked_objects::Location& from_here, 95 const Closure& task, 96 TimeDelta delay) OVERRIDE; 97 virtual bool RunsTasksOnCurrentThread() const OVERRIDE; 98 99 private: 100 virtual ~SequencedWorkerPoolTaskRunner(); 101 102 const scoped_refptr<SequencedWorkerPool> pool_; 103 104 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_; 105 106 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner); 107}; 108 109SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner( 110 const scoped_refptr<SequencedWorkerPool>& pool, 111 SequencedWorkerPool::WorkerShutdown shutdown_behavior) 112 : pool_(pool), 113 shutdown_behavior_(shutdown_behavior) { 114} 115 116SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() { 117} 118 119bool SequencedWorkerPoolTaskRunner::PostDelayedTask( 120 const tracked_objects::Location& from_here, 121 const Closure& task, 122 TimeDelta delay) { 123 if (delay == TimeDelta()) { 124 return pool_->PostWorkerTaskWithShutdownBehavior( 125 from_here, task, shutdown_behavior_); 126 } 127 return pool_->PostDelayedWorkerTask(from_here, task, delay); 128} 129 130bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const { 131 return pool_->RunsTasksOnCurrentThread(); 132} 133 134// SequencedWorkerPoolSequencedTaskRunner ------------------------------------ 135// A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a 136// fixed sequence token. 137// 138// Note that this class is RefCountedThreadSafe (inherited from TaskRunner). 139class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner { 140 public: 141 SequencedWorkerPoolSequencedTaskRunner( 142 const scoped_refptr<SequencedWorkerPool>& pool, 143 SequencedWorkerPool::SequenceToken token, 144 SequencedWorkerPool::WorkerShutdown shutdown_behavior); 145 146 // TaskRunner implementation 147 virtual bool PostDelayedTask(const tracked_objects::Location& from_here, 148 const Closure& task, 149 TimeDelta delay) OVERRIDE; 150 virtual bool RunsTasksOnCurrentThread() const OVERRIDE; 151 152 // SequencedTaskRunner implementation 153 virtual bool PostNonNestableDelayedTask( 154 const tracked_objects::Location& from_here, 155 const Closure& task, 156 TimeDelta delay) OVERRIDE; 157 158 private: 159 virtual ~SequencedWorkerPoolSequencedTaskRunner(); 160 161 const scoped_refptr<SequencedWorkerPool> pool_; 162 163 const SequencedWorkerPool::SequenceToken token_; 164 165 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_; 166 167 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner); 168}; 169 170SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner( 171 const scoped_refptr<SequencedWorkerPool>& pool, 172 SequencedWorkerPool::SequenceToken token, 173 SequencedWorkerPool::WorkerShutdown shutdown_behavior) 174 : pool_(pool), 175 token_(token), 176 shutdown_behavior_(shutdown_behavior) { 177} 178 179SequencedWorkerPoolSequencedTaskRunner:: 180~SequencedWorkerPoolSequencedTaskRunner() { 181} 182 183bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask( 184 const tracked_objects::Location& from_here, 185 const Closure& task, 186 TimeDelta delay) { 187 if (delay == TimeDelta()) { 188 return pool_->PostSequencedWorkerTaskWithShutdownBehavior( 189 token_, from_here, task, shutdown_behavior_); 190 } 191 return pool_->PostDelayedSequencedWorkerTask(token_, from_here, task, delay); 192} 193 194bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const { 195 return pool_->IsRunningSequenceOnCurrentThread(token_); 196} 197 198bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask( 199 const tracked_objects::Location& from_here, 200 const Closure& task, 201 TimeDelta delay) { 202 // There's no way to run nested tasks, so simply forward to 203 // PostDelayedTask. 204 return PostDelayedTask(from_here, task, delay); 205} 206 207// Create a process-wide unique ID to represent this task in trace events. This 208// will be mangled with a Process ID hash to reduce the likelyhood of colliding 209// with MessageLoop pointers on other processes. 210uint64 GetTaskTraceID(const SequencedTask& task, 211 void* pool) { 212 return (static_cast<uint64>(task.trace_id) << 32) | 213 static_cast<uint64>(reinterpret_cast<intptr_t>(pool)); 214} 215 216} // namespace 217 218// Worker --------------------------------------------------------------------- 219 220class SequencedWorkerPool::Worker : public SimpleThread { 221 public: 222 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it 223 // around as long as we are running. 224 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, 225 int thread_number, 226 const std::string& thread_name_prefix); 227 virtual ~Worker(); 228 229 // SimpleThread implementation. This actually runs the background thread. 230 virtual void Run() OVERRIDE; 231 232 void set_running_task_info(SequenceToken token, 233 WorkerShutdown shutdown_behavior) { 234 running_sequence_ = token; 235 running_shutdown_behavior_ = shutdown_behavior; 236 } 237 238 SequenceToken running_sequence() const { 239 return running_sequence_; 240 } 241 242 WorkerShutdown running_shutdown_behavior() const { 243 return running_shutdown_behavior_; 244 } 245 246 private: 247 scoped_refptr<SequencedWorkerPool> worker_pool_; 248 SequenceToken running_sequence_; 249 WorkerShutdown running_shutdown_behavior_; 250 251 DISALLOW_COPY_AND_ASSIGN(Worker); 252}; 253 254// Inner ---------------------------------------------------------------------- 255 256class SequencedWorkerPool::Inner { 257 public: 258 // Take a raw pointer to |worker| to avoid cycles (since we're owned 259 // by it). 260 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, 261 const std::string& thread_name_prefix, 262 TestingObserver* observer); 263 264 ~Inner(); 265 266 SequenceToken GetSequenceToken(); 267 268 SequenceToken GetNamedSequenceToken(const std::string& name); 269 270 // This function accepts a name and an ID. If the name is null, the 271 // token ID is used. This allows us to implement the optional name lookup 272 // from a single function without having to enter the lock a separate time. 273 bool PostTask(const std::string* optional_token_name, 274 SequenceToken sequence_token, 275 WorkerShutdown shutdown_behavior, 276 const tracked_objects::Location& from_here, 277 const Closure& task, 278 TimeDelta delay); 279 280 bool RunsTasksOnCurrentThread() const; 281 282 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; 283 284 void CleanupForTesting(); 285 286 void SignalHasWorkForTesting(); 287 288 int GetWorkSignalCountForTesting() const; 289 290 void Shutdown(int max_blocking_tasks_after_shutdown); 291 292 // Runs the worker loop on the background thread. 293 void ThreadLoop(Worker* this_worker); 294 295 private: 296 enum GetWorkStatus { 297 GET_WORK_FOUND, 298 GET_WORK_NOT_FOUND, 299 GET_WORK_WAIT, 300 }; 301 302 enum CleanupState { 303 CLEANUP_REQUESTED, 304 CLEANUP_STARTING, 305 CLEANUP_RUNNING, 306 CLEANUP_FINISHING, 307 CLEANUP_DONE, 308 }; 309 310 // Called from within the lock, this converts the given token name into a 311 // token ID, creating a new one if necessary. 312 int LockedGetNamedTokenID(const std::string& name); 313 314 // Called from within the lock, this returns the next sequence task number. 315 int64 LockedGetNextSequenceTaskNumber(); 316 317 // Called from within the lock, returns the shutdown behavior of the task 318 // running on the currently executing worker thread. If invoked from a thread 319 // that is not one of the workers, returns CONTINUE_ON_SHUTDOWN. 320 WorkerShutdown LockedCurrentThreadShutdownBehavior() const; 321 322 // Gets new task. There are 3 cases depending on the return value: 323 // 324 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should 325 // be run immediately. 326 // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run, 327 // and |task| is not filled in. In this case, the caller should wait until 328 // a task is posted. 329 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run 330 // immediately, and |task| is not filled in. Likewise, |wait_time| is 331 // filled in the time to wait until the next task to run. In this case, the 332 // caller should wait the time. 333 // 334 // In any case, the calling code should clear the given 335 // delete_these_outside_lock vector the next time the lock is released. 336 // See the implementation for a more detailed description. 337 GetWorkStatus GetWork(SequencedTask* task, 338 TimeDelta* wait_time, 339 std::vector<Closure>* delete_these_outside_lock); 340 341 void HandleCleanup(); 342 343 // Peforms init and cleanup around running the given task. WillRun... 344 // returns the value from PrepareToStartAdditionalThreadIfNecessary. 345 // The calling code should call FinishStartingAdditionalThread once the 346 // lock is released if the return values is nonzero. 347 int WillRunWorkerTask(const SequencedTask& task); 348 void DidRunWorkerTask(const SequencedTask& task); 349 350 // Returns true if there are no threads currently running the given 351 // sequence token. 352 bool IsSequenceTokenRunnable(int sequence_token_id) const; 353 354 // Checks if all threads are busy and the addition of one more could run an 355 // additional task waiting in the queue. This must be called from within 356 // the lock. 357 // 358 // If another thread is helpful, this will mark the thread as being in the 359 // process of starting and returns the index of the new thread which will be 360 // 0 or more. The caller should then call FinishStartingAdditionalThread to 361 // complete initialization once the lock is released. 362 // 363 // If another thread is not necessary, returne 0; 364 // 365 // See the implementedion for more. 366 int PrepareToStartAdditionalThreadIfHelpful(); 367 368 // The second part of thread creation after 369 // PrepareToStartAdditionalThreadIfHelpful with the thread number it 370 // generated. This actually creates the thread and should be called outside 371 // the lock to avoid blocking important work starting a thread in the lock. 372 void FinishStartingAdditionalThread(int thread_number); 373 374 // Signal |has_work_| and increment |has_work_signal_count_|. 375 void SignalHasWork(); 376 377 // Checks whether there is work left that's blocking shutdown. Must be 378 // called inside the lock. 379 bool CanShutdown() const; 380 381 SequencedWorkerPool* const worker_pool_; 382 383 // The last sequence number used. Managed by GetSequenceToken, since this 384 // only does threadsafe increment operations, you do not need to hold the 385 // lock. 386 volatile subtle::Atomic32 last_sequence_number_; 387 388 // This lock protects |everything in this class|. Do not read or modify 389 // anything without holding this lock. Do not block while holding this 390 // lock. 391 mutable Lock lock_; 392 393 // Condition variable that is waited on by worker threads until new 394 // tasks are posted or shutdown starts. 395 ConditionVariable has_work_cv_; 396 397 // Condition variable that is waited on by non-worker threads (in 398 // Shutdown()) until CanShutdown() goes to true. 399 ConditionVariable can_shutdown_cv_; 400 401 // The maximum number of worker threads we'll create. 402 const size_t max_threads_; 403 404 const std::string thread_name_prefix_; 405 406 // Associates all known sequence token names with their IDs. 407 std::map<std::string, int> named_sequence_tokens_; 408 409 // Owning pointers to all threads we've created so far, indexed by 410 // ID. Since we lazily create threads, this may be less than 411 // max_threads_ and will be initially empty. 412 typedef std::map<PlatformThreadId, linked_ptr<Worker> > ThreadMap; 413 ThreadMap threads_; 414 415 // Set to true when we're in the process of creating another thread. 416 // See PrepareToStartAdditionalThreadIfHelpful for more. 417 bool thread_being_created_; 418 419 // Number of threads currently waiting for work. 420 size_t waiting_thread_count_; 421 422 // Number of threads currently running tasks that have the BLOCK_SHUTDOWN 423 // or SKIP_ON_SHUTDOWN flag set. 424 size_t blocking_shutdown_thread_count_; 425 426 // A set of all pending tasks in time-to-run order. These are tasks that are 427 // either waiting for a thread to run on, waiting for their time to run, 428 // or blocked on a previous task in their sequence. We have to iterate over 429 // the tasks by time-to-run order, so we use the set instead of the 430 // traditional priority_queue. 431 typedef std::set<SequencedTask, SequencedTaskLessThan> PendingTaskSet; 432 PendingTaskSet pending_tasks_; 433 434 // The next sequence number for a new sequenced task. 435 int64 next_sequence_task_number_; 436 437 // Number of tasks in the pending_tasks_ list that are marked as blocking 438 // shutdown. 439 size_t blocking_shutdown_pending_task_count_; 440 441 // Lists all sequence tokens currently executing. 442 std::set<int> current_sequences_; 443 444 // An ID for each posted task to distinguish the task from others in traces. 445 int trace_id_; 446 447 // Set when Shutdown is called and no further tasks should be 448 // allowed, though we may still be running existing tasks. 449 bool shutdown_called_; 450 451 // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown() 452 // has been called. 453 int max_blocking_tasks_after_shutdown_; 454 455 // State used to cleanup for testing, all guarded by lock_. 456 CleanupState cleanup_state_; 457 size_t cleanup_idlers_; 458 ConditionVariable cleanup_cv_; 459 460 TestingObserver* const testing_observer_; 461 462 DISALLOW_COPY_AND_ASSIGN(Inner); 463}; 464 465// Worker definitions --------------------------------------------------------- 466 467SequencedWorkerPool::Worker::Worker( 468 const scoped_refptr<SequencedWorkerPool>& worker_pool, 469 int thread_number, 470 const std::string& prefix) 471 : SimpleThread( 472 prefix + StringPrintf("Worker%d", thread_number).c_str()), 473 worker_pool_(worker_pool), 474 running_shutdown_behavior_(CONTINUE_ON_SHUTDOWN) { 475 Start(); 476} 477 478SequencedWorkerPool::Worker::~Worker() { 479} 480 481void SequencedWorkerPool::Worker::Run() { 482 // Just jump back to the Inner object to run the thread, since it has all the 483 // tracking information and queues. It might be more natural to implement 484 // using DelegateSimpleThread and have Inner implement the Delegate to avoid 485 // having these worker objects at all, but that method lacks the ability to 486 // send thread-specific information easily to the thread loop. 487 worker_pool_->inner_->ThreadLoop(this); 488 // Release our cyclic reference once we're done. 489 worker_pool_ = NULL; 490} 491 492// Inner definitions --------------------------------------------------------- 493 494SequencedWorkerPool::Inner::Inner( 495 SequencedWorkerPool* worker_pool, 496 size_t max_threads, 497 const std::string& thread_name_prefix, 498 TestingObserver* observer) 499 : worker_pool_(worker_pool), 500 last_sequence_number_(0), 501 lock_(), 502 has_work_cv_(&lock_), 503 can_shutdown_cv_(&lock_), 504 max_threads_(max_threads), 505 thread_name_prefix_(thread_name_prefix), 506 thread_being_created_(false), 507 waiting_thread_count_(0), 508 blocking_shutdown_thread_count_(0), 509 next_sequence_task_number_(0), 510 blocking_shutdown_pending_task_count_(0), 511 trace_id_(0), 512 shutdown_called_(false), 513 max_blocking_tasks_after_shutdown_(0), 514 cleanup_state_(CLEANUP_DONE), 515 cleanup_idlers_(0), 516 cleanup_cv_(&lock_), 517 testing_observer_(observer) {} 518 519SequencedWorkerPool::Inner::~Inner() { 520 // You must call Shutdown() before destroying the pool. 521 DCHECK(shutdown_called_); 522 523 // Need to explicitly join with the threads before they're destroyed or else 524 // they will be running when our object is half torn down. 525 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) 526 it->second->Join(); 527 threads_.clear(); 528 529 if (testing_observer_) 530 testing_observer_->OnDestruct(); 531} 532 533SequencedWorkerPool::SequenceToken 534SequencedWorkerPool::Inner::GetSequenceToken() { 535 subtle::Atomic32 result = 536 subtle::NoBarrier_AtomicIncrement(&last_sequence_number_, 1); 537 return SequenceToken(static_cast<int>(result)); 538} 539 540SequencedWorkerPool::SequenceToken 541SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { 542 AutoLock lock(lock_); 543 return SequenceToken(LockedGetNamedTokenID(name)); 544} 545 546bool SequencedWorkerPool::Inner::PostTask( 547 const std::string* optional_token_name, 548 SequenceToken sequence_token, 549 WorkerShutdown shutdown_behavior, 550 const tracked_objects::Location& from_here, 551 const Closure& task, 552 TimeDelta delay) { 553 DCHECK(delay == TimeDelta() || shutdown_behavior == SKIP_ON_SHUTDOWN); 554 SequencedTask sequenced(from_here); 555 sequenced.sequence_token_id = sequence_token.id_; 556 sequenced.shutdown_behavior = shutdown_behavior; 557 sequenced.posted_from = from_here; 558 sequenced.task = 559 shutdown_behavior == BLOCK_SHUTDOWN ? 560 base::MakeCriticalClosure(task) : task; 561 sequenced.time_to_run = TimeTicks::Now() + delay; 562 563 int create_thread_id = 0; 564 { 565 AutoLock lock(lock_); 566 if (shutdown_called_) { 567 if (shutdown_behavior != BLOCK_SHUTDOWN || 568 LockedCurrentThreadShutdownBehavior() == CONTINUE_ON_SHUTDOWN) { 569 return false; 570 } 571 if (max_blocking_tasks_after_shutdown_ <= 0) { 572 DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed"; 573 return false; 574 } 575 max_blocking_tasks_after_shutdown_ -= 1; 576 } 577 578 // The trace_id is used for identifying the task in about:tracing. 579 sequenced.trace_id = trace_id_++; 580 581 TRACE_EVENT_FLOW_BEGIN0("task", "SequencedWorkerPool::PostTask", 582 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this)))); 583 584 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); 585 586 // Now that we have the lock, apply the named token rules. 587 if (optional_token_name) 588 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); 589 590 pending_tasks_.insert(sequenced); 591 if (shutdown_behavior == BLOCK_SHUTDOWN) 592 blocking_shutdown_pending_task_count_++; 593 594 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); 595 } 596 597 // Actually start the additional thread or signal an existing one now that 598 // we're outside the lock. 599 if (create_thread_id) 600 FinishStartingAdditionalThread(create_thread_id); 601 else 602 SignalHasWork(); 603 604 return true; 605} 606 607bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { 608 AutoLock lock(lock_); 609 return ContainsKey(threads_, PlatformThread::CurrentId()); 610} 611 612bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( 613 SequenceToken sequence_token) const { 614 AutoLock lock(lock_); 615 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 616 if (found == threads_.end()) 617 return false; 618 return found->second->running_sequence().Equals(sequence_token); 619} 620 621// See https://code.google.com/p/chromium/issues/detail?id=168415 622void SequencedWorkerPool::Inner::CleanupForTesting() { 623 DCHECK(!RunsTasksOnCurrentThread()); 624 base::ThreadRestrictions::ScopedAllowWait allow_wait; 625 AutoLock lock(lock_); 626 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 627 if (shutdown_called_) 628 return; 629 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) 630 return; 631 cleanup_state_ = CLEANUP_REQUESTED; 632 cleanup_idlers_ = 0; 633 has_work_cv_.Signal(); 634 while (cleanup_state_ != CLEANUP_DONE) 635 cleanup_cv_.Wait(); 636} 637 638void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { 639 SignalHasWork(); 640} 641 642void SequencedWorkerPool::Inner::Shutdown( 643 int max_new_blocking_tasks_after_shutdown) { 644 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); 645 { 646 AutoLock lock(lock_); 647 // Cleanup and Shutdown should not be called concurrently. 648 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 649 if (shutdown_called_) 650 return; 651 shutdown_called_ = true; 652 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; 653 654 // Tickle the threads. This will wake up a waiting one so it will know that 655 // it can exit, which in turn will wake up any other waiting ones. 656 SignalHasWork(); 657 658 // There are no pending or running tasks blocking shutdown, we're done. 659 if (CanShutdown()) 660 return; 661 } 662 663 // If we're here, then something is blocking shutdown. So wait for 664 // CanShutdown() to go to true. 665 666 if (testing_observer_) 667 testing_observer_->WillWaitForShutdown(); 668 669 TimeTicks shutdown_wait_begin = TimeTicks::Now(); 670 671 { 672 base::ThreadRestrictions::ScopedAllowWait allow_wait; 673 AutoLock lock(lock_); 674 while (!CanShutdown()) 675 can_shutdown_cv_.Wait(); 676 } 677 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime", 678 TimeTicks::Now() - shutdown_wait_begin); 679} 680 681void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { 682 { 683 AutoLock lock(lock_); 684 DCHECK(thread_being_created_); 685 thread_being_created_ = false; 686 std::pair<ThreadMap::iterator, bool> result = 687 threads_.insert( 688 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker))); 689 DCHECK(result.second); 690 691 while (true) { 692#if defined(OS_MACOSX) 693 base::mac::ScopedNSAutoreleasePool autorelease_pool; 694#endif 695 696 HandleCleanup(); 697 698 // See GetWork for what delete_these_outside_lock is doing. 699 SequencedTask task; 700 TimeDelta wait_time; 701 std::vector<Closure> delete_these_outside_lock; 702 GetWorkStatus status = 703 GetWork(&task, &wait_time, &delete_these_outside_lock); 704 if (status == GET_WORK_FOUND) { 705 TRACE_EVENT_FLOW_END0("task", "SequencedWorkerPool::PostTask", 706 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this)))); 707 TRACE_EVENT2("task", "SequencedWorkerPool::ThreadLoop", 708 "src_file", task.posted_from.file_name(), 709 "src_func", task.posted_from.function_name()); 710 int new_thread_id = WillRunWorkerTask(task); 711 { 712 AutoUnlock unlock(lock_); 713 // There may be more work available, so wake up another 714 // worker thread. (Technically not required, since we 715 // already get a signal for each new task, but it doesn't 716 // hurt.) 717 SignalHasWork(); 718 delete_these_outside_lock.clear(); 719 720 // Complete thread creation outside the lock if necessary. 721 if (new_thread_id) 722 FinishStartingAdditionalThread(new_thread_id); 723 724 this_worker->set_running_task_info( 725 SequenceToken(task.sequence_token_id), task.shutdown_behavior); 726 727 tracked_objects::TrackedTime start_time = 728 tracked_objects::ThreadData::NowForStartOfRun(task.birth_tally); 729 730 task.task.Run(); 731 732 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(task, 733 start_time, tracked_objects::ThreadData::NowForEndOfRun()); 734 735 // Make sure our task is erased outside the lock for the 736 // same reason we do this with delete_these_oustide_lock. 737 // Also, do it before calling set_running_task_info() so 738 // that sequence-checking from within the task's destructor 739 // still works. 740 task.task = Closure(); 741 742 this_worker->set_running_task_info( 743 SequenceToken(), CONTINUE_ON_SHUTDOWN); 744 } 745 DidRunWorkerTask(task); // Must be done inside the lock. 746 } else if (cleanup_state_ == CLEANUP_RUNNING) { 747 switch (status) { 748 case GET_WORK_WAIT: { 749 AutoUnlock unlock(lock_); 750 delete_these_outside_lock.clear(); 751 } 752 break; 753 case GET_WORK_NOT_FOUND: 754 CHECK(delete_these_outside_lock.empty()); 755 cleanup_state_ = CLEANUP_FINISHING; 756 cleanup_cv_.Broadcast(); 757 break; 758 default: 759 NOTREACHED(); 760 } 761 } else { 762 // When we're terminating and there's no more work, we can 763 // shut down, other workers can complete any pending or new tasks. 764 // We can get additional tasks posted after shutdown_called_ is set 765 // but only worker threads are allowed to post tasks at that time, and 766 // the workers responsible for posting those tasks will be available 767 // to run them. Also, there may be some tasks stuck behind running 768 // ones with the same sequence token, but additional threads won't 769 // help this case. 770 if (shutdown_called_ && 771 blocking_shutdown_pending_task_count_ == 0) 772 break; 773 waiting_thread_count_++; 774 775 switch (status) { 776 case GET_WORK_NOT_FOUND: 777 has_work_cv_.Wait(); 778 break; 779 case GET_WORK_WAIT: 780 has_work_cv_.TimedWait(wait_time); 781 break; 782 default: 783 NOTREACHED(); 784 } 785 waiting_thread_count_--; 786 } 787 } 788 } // Release lock_. 789 790 // We noticed we should exit. Wake up the next worker so it knows it should 791 // exit as well (because the Shutdown() code only signals once). 792 SignalHasWork(); 793 794 // Possibly unblock shutdown. 795 can_shutdown_cv_.Signal(); 796} 797 798void SequencedWorkerPool::Inner::HandleCleanup() { 799 lock_.AssertAcquired(); 800 if (cleanup_state_ == CLEANUP_DONE) 801 return; 802 if (cleanup_state_ == CLEANUP_REQUESTED) { 803 // We win, we get to do the cleanup as soon as the others wise up and idle. 804 cleanup_state_ = CLEANUP_STARTING; 805 while (thread_being_created_ || 806 cleanup_idlers_ != threads_.size() - 1) { 807 has_work_cv_.Signal(); 808 cleanup_cv_.Wait(); 809 } 810 cleanup_state_ = CLEANUP_RUNNING; 811 return; 812 } 813 if (cleanup_state_ == CLEANUP_STARTING) { 814 // Another worker thread is cleaning up, we idle here until thats done. 815 ++cleanup_idlers_; 816 cleanup_cv_.Broadcast(); 817 while (cleanup_state_ != CLEANUP_FINISHING) { 818 cleanup_cv_.Wait(); 819 } 820 --cleanup_idlers_; 821 cleanup_cv_.Broadcast(); 822 return; 823 } 824 if (cleanup_state_ == CLEANUP_FINISHING) { 825 // We wait for all idlers to wake up prior to being DONE. 826 while (cleanup_idlers_ != 0) { 827 cleanup_cv_.Broadcast(); 828 cleanup_cv_.Wait(); 829 } 830 if (cleanup_state_ == CLEANUP_FINISHING) { 831 cleanup_state_ = CLEANUP_DONE; 832 cleanup_cv_.Signal(); 833 } 834 return; 835 } 836} 837 838int SequencedWorkerPool::Inner::LockedGetNamedTokenID( 839 const std::string& name) { 840 lock_.AssertAcquired(); 841 DCHECK(!name.empty()); 842 843 std::map<std::string, int>::const_iterator found = 844 named_sequence_tokens_.find(name); 845 if (found != named_sequence_tokens_.end()) 846 return found->second; // Got an existing one. 847 848 // Create a new one for this name. 849 SequenceToken result = GetSequenceToken(); 850 named_sequence_tokens_.insert(std::make_pair(name, result.id_)); 851 return result.id_; 852} 853 854int64 SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { 855 lock_.AssertAcquired(); 856 // We assume that we never create enough tasks to wrap around. 857 return next_sequence_task_number_++; 858} 859 860SequencedWorkerPool::WorkerShutdown 861SequencedWorkerPool::Inner::LockedCurrentThreadShutdownBehavior() const { 862 lock_.AssertAcquired(); 863 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 864 if (found == threads_.end()) 865 return CONTINUE_ON_SHUTDOWN; 866 return found->second->running_shutdown_behavior(); 867} 868 869SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( 870 SequencedTask* task, 871 TimeDelta* wait_time, 872 std::vector<Closure>* delete_these_outside_lock) { 873 lock_.AssertAcquired(); 874 875 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount", 876 static_cast<int>(pending_tasks_.size())); 877 878 // Find the next task with a sequence token that's not currently in use. 879 // If the token is in use, that means another thread is running something 880 // in that sequence, and we can't run it without going out-of-order. 881 // 882 // This algorithm is simple and fair, but inefficient in some cases. For 883 // example, say somebody schedules 1000 slow tasks with the same sequence 884 // number. We'll have to go through all those tasks each time we feel like 885 // there might be work to schedule. If this proves to be a problem, we 886 // should make this more efficient. 887 // 888 // One possible enhancement would be to keep a map from sequence ID to a 889 // list of pending but currently blocked SequencedTasks for that ID. 890 // When a worker finishes a task of one sequence token, it can pick up the 891 // next one from that token right away. 892 // 893 // This may lead to starvation if there are sufficient numbers of sequences 894 // in use. To alleviate this, we could add an incrementing priority counter 895 // to each SequencedTask. Then maintain a priority_queue of all runnable 896 // tasks, sorted by priority counter. When a sequenced task is completed 897 // we would pop the head element off of that tasks pending list and add it 898 // to the priority queue. Then we would run the first item in the priority 899 // queue. 900 901 GetWorkStatus status = GET_WORK_NOT_FOUND; 902 int unrunnable_tasks = 0; 903 PendingTaskSet::iterator i = pending_tasks_.begin(); 904 // We assume that the loop below doesn't take too long and so we can just do 905 // a single call to TimeTicks::Now(). 906 const TimeTicks current_time = TimeTicks::Now(); 907 while (i != pending_tasks_.end()) { 908 if (!IsSequenceTokenRunnable(i->sequence_token_id)) { 909 unrunnable_tasks++; 910 ++i; 911 continue; 912 } 913 914 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { 915 // We're shutting down and the task we just found isn't blocking 916 // shutdown. Delete it and get more work. 917 // 918 // Note that we do not want to delete unrunnable tasks. Deleting a task 919 // can have side effects (like freeing some objects) and deleting a 920 // task that's supposed to run after one that's currently running could 921 // cause an obscure crash. 922 // 923 // We really want to delete these tasks outside the lock in case the 924 // closures are holding refs to objects that want to post work from 925 // their destructorss (which would deadlock). The closures are 926 // internally refcounted, so we just need to keep a copy of them alive 927 // until the lock is exited. The calling code can just clear() the 928 // vector they passed to us once the lock is exited to make this 929 // happen. 930 delete_these_outside_lock->push_back(i->task); 931 pending_tasks_.erase(i++); 932 continue; 933 } 934 935 if (i->time_to_run > current_time) { 936 // The time to run has not come yet. 937 *wait_time = i->time_to_run - current_time; 938 status = GET_WORK_WAIT; 939 if (cleanup_state_ == CLEANUP_RUNNING) { 940 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. 941 delete_these_outside_lock->push_back(i->task); 942 pending_tasks_.erase(i); 943 } 944 break; 945 } 946 947 // Found a runnable task. 948 *task = *i; 949 pending_tasks_.erase(i); 950 if (task->shutdown_behavior == BLOCK_SHUTDOWN) { 951 blocking_shutdown_pending_task_count_--; 952 } 953 954 status = GET_WORK_FOUND; 955 break; 956 } 957 958 // Track the number of tasks we had to skip over to see if we should be 959 // making this more efficient. If this number ever becomes large or is 960 // frequently "some", we should consider the optimization above. 961 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount", 962 unrunnable_tasks); 963 return status; 964} 965 966int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { 967 lock_.AssertAcquired(); 968 969 // Mark the task's sequence number as in use. 970 if (task.sequence_token_id) 971 current_sequences_.insert(task.sequence_token_id); 972 973 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN 974 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread 975 // completes. 976 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) 977 blocking_shutdown_thread_count_++; 978 979 // We just picked up a task. Since StartAdditionalThreadIfHelpful only 980 // creates a new thread if there is no free one, there is a race when posting 981 // tasks that many tasks could have been posted before a thread started 982 // running them, so only one thread would have been created. So we also check 983 // whether we should create more threads after removing our task from the 984 // queue, which also has the nice side effect of creating the workers from 985 // background threads rather than the main thread of the app. 986 // 987 // If another thread wasn't created, we want to wake up an existing thread 988 // if there is one waiting to pick up the next task. 989 // 990 // Note that we really need to do this *before* running the task, not 991 // after. Otherwise, if more than one task is posted, the creation of the 992 // second thread (since we only create one at a time) will be blocked by 993 // the execution of the first task, which could be arbitrarily long. 994 return PrepareToStartAdditionalThreadIfHelpful(); 995} 996 997void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { 998 lock_.AssertAcquired(); 999 1000 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { 1001 DCHECK_GT(blocking_shutdown_thread_count_, 0u); 1002 blocking_shutdown_thread_count_--; 1003 } 1004 1005 if (task.sequence_token_id) 1006 current_sequences_.erase(task.sequence_token_id); 1007} 1008 1009bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( 1010 int sequence_token_id) const { 1011 lock_.AssertAcquired(); 1012 return !sequence_token_id || 1013 current_sequences_.find(sequence_token_id) == 1014 current_sequences_.end(); 1015} 1016 1017int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { 1018 lock_.AssertAcquired(); 1019 // How thread creation works: 1020 // 1021 // We'de like to avoid creating threads with the lock held. However, we 1022 // need to be sure that we have an accurate accounting of the threads for 1023 // proper Joining and deltion on shutdown. 1024 // 1025 // We need to figure out if we need another thread with the lock held, which 1026 // is what this function does. It then marks us as in the process of creating 1027 // a thread. When we do shutdown, we wait until the thread_being_created_ 1028 // flag is cleared, which ensures that the new thread is properly added to 1029 // all the data structures and we can't leak it. Once shutdown starts, we'll 1030 // refuse to create more threads or they would be leaked. 1031 // 1032 // Note that this creates a mostly benign race condition on shutdown that 1033 // will cause fewer workers to be created than one would expect. It isn't 1034 // much of an issue in real life, but affects some tests. Since we only spawn 1035 // one worker at a time, the following sequence of events can happen: 1036 // 1037 // 1. Main thread posts a bunch of unrelated tasks that would normally be 1038 // run on separate threads. 1039 // 2. The first task post causes us to start a worker. Other tasks do not 1040 // cause a worker to start since one is pending. 1041 // 3. Main thread initiates shutdown. 1042 // 4. No more threads are created since the shutdown_called_ flag is set. 1043 // 1044 // The result is that one may expect that max_threads_ workers to be created 1045 // given the workload, but in reality fewer may be created because the 1046 // sequence of thread creation on the background threads is racing with the 1047 // shutdown call. 1048 if (!shutdown_called_ && 1049 !thread_being_created_ && 1050 cleanup_state_ == CLEANUP_DONE && 1051 threads_.size() < max_threads_ && 1052 waiting_thread_count_ == 0) { 1053 // We could use an additional thread if there's work to be done. 1054 for (PendingTaskSet::const_iterator i = pending_tasks_.begin(); 1055 i != pending_tasks_.end(); ++i) { 1056 if (IsSequenceTokenRunnable(i->sequence_token_id)) { 1057 // Found a runnable task, mark the thread as being started. 1058 thread_being_created_ = true; 1059 return static_cast<int>(threads_.size() + 1); 1060 } 1061 } 1062 } 1063 return 0; 1064} 1065 1066void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( 1067 int thread_number) { 1068 // Called outside of the lock. 1069 DCHECK(thread_number > 0); 1070 1071 // The worker is assigned to the list when the thread actually starts, which 1072 // will manage the memory of the pointer. 1073 new Worker(worker_pool_, thread_number, thread_name_prefix_); 1074} 1075 1076void SequencedWorkerPool::Inner::SignalHasWork() { 1077 has_work_cv_.Signal(); 1078 if (testing_observer_) { 1079 testing_observer_->OnHasWork(); 1080 } 1081} 1082 1083bool SequencedWorkerPool::Inner::CanShutdown() const { 1084 lock_.AssertAcquired(); 1085 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. 1086 return !thread_being_created_ && 1087 blocking_shutdown_thread_count_ == 0 && 1088 blocking_shutdown_pending_task_count_ == 0; 1089} 1090 1091// SequencedWorkerPool -------------------------------------------------------- 1092 1093SequencedWorkerPool::SequencedWorkerPool( 1094 size_t max_threads, 1095 const std::string& thread_name_prefix) 1096 : constructor_message_loop_(MessageLoopProxy::current()), 1097 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { 1098} 1099 1100SequencedWorkerPool::SequencedWorkerPool( 1101 size_t max_threads, 1102 const std::string& thread_name_prefix, 1103 TestingObserver* observer) 1104 : constructor_message_loop_(MessageLoopProxy::current()), 1105 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) { 1106} 1107 1108SequencedWorkerPool::~SequencedWorkerPool() {} 1109 1110void SequencedWorkerPool::OnDestruct() const { 1111 DCHECK(constructor_message_loop_.get()); 1112 // Avoid deleting ourselves on a worker thread (which would 1113 // deadlock). 1114 if (RunsTasksOnCurrentThread()) { 1115 constructor_message_loop_->DeleteSoon(FROM_HERE, this); 1116 } else { 1117 delete this; 1118 } 1119} 1120 1121SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { 1122 return inner_->GetSequenceToken(); 1123} 1124 1125SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( 1126 const std::string& name) { 1127 return inner_->GetNamedSequenceToken(name); 1128} 1129 1130scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( 1131 SequenceToken token) { 1132 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); 1133} 1134 1135scoped_refptr<SequencedTaskRunner> 1136SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior( 1137 SequenceToken token, WorkerShutdown shutdown_behavior) { 1138 return new SequencedWorkerPoolSequencedTaskRunner( 1139 this, token, shutdown_behavior); 1140} 1141 1142scoped_refptr<TaskRunner> 1143SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior( 1144 WorkerShutdown shutdown_behavior) { 1145 return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior); 1146} 1147 1148bool SequencedWorkerPool::PostWorkerTask( 1149 const tracked_objects::Location& from_here, 1150 const Closure& task) { 1151 return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN, 1152 from_here, task, TimeDelta()); 1153} 1154 1155bool SequencedWorkerPool::PostDelayedWorkerTask( 1156 const tracked_objects::Location& from_here, 1157 const Closure& task, 1158 TimeDelta delay) { 1159 WorkerShutdown shutdown_behavior = 1160 delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN; 1161 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, 1162 from_here, task, delay); 1163} 1164 1165bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior( 1166 const tracked_objects::Location& from_here, 1167 const Closure& task, 1168 WorkerShutdown shutdown_behavior) { 1169 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, 1170 from_here, task, TimeDelta()); 1171} 1172 1173bool SequencedWorkerPool::PostSequencedWorkerTask( 1174 SequenceToken sequence_token, 1175 const tracked_objects::Location& from_here, 1176 const Closure& task) { 1177 return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN, 1178 from_here, task, TimeDelta()); 1179} 1180 1181bool SequencedWorkerPool::PostDelayedSequencedWorkerTask( 1182 SequenceToken sequence_token, 1183 const tracked_objects::Location& from_here, 1184 const Closure& task, 1185 TimeDelta delay) { 1186 WorkerShutdown shutdown_behavior = 1187 delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN; 1188 return inner_->PostTask(NULL, sequence_token, shutdown_behavior, 1189 from_here, task, delay); 1190} 1191 1192bool SequencedWorkerPool::PostNamedSequencedWorkerTask( 1193 const std::string& token_name, 1194 const tracked_objects::Location& from_here, 1195 const Closure& task) { 1196 DCHECK(!token_name.empty()); 1197 return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN, 1198 from_here, task, TimeDelta()); 1199} 1200 1201bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior( 1202 SequenceToken sequence_token, 1203 const tracked_objects::Location& from_here, 1204 const Closure& task, 1205 WorkerShutdown shutdown_behavior) { 1206 return inner_->PostTask(NULL, sequence_token, shutdown_behavior, 1207 from_here, task, TimeDelta()); 1208} 1209 1210bool SequencedWorkerPool::PostDelayedTask( 1211 const tracked_objects::Location& from_here, 1212 const Closure& task, 1213 TimeDelta delay) { 1214 return PostDelayedWorkerTask(from_here, task, delay); 1215} 1216 1217bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { 1218 return inner_->RunsTasksOnCurrentThread(); 1219} 1220 1221bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( 1222 SequenceToken sequence_token) const { 1223 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); 1224} 1225 1226void SequencedWorkerPool::FlushForTesting() { 1227 inner_->CleanupForTesting(); 1228} 1229 1230void SequencedWorkerPool::SignalHasWorkForTesting() { 1231 inner_->SignalHasWorkForTesting(); 1232} 1233 1234void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1235 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); 1236 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1237} 1238 1239} // namespace base 1240