sequenced_worker_pool.cc revision 5d1f7b1de12d16ceb2c938c56701a3e8bfa558f7
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "base/threading/sequenced_worker_pool.h" 6 7#include <list> 8#include <map> 9#include <set> 10#include <utility> 11#include <vector> 12 13#include "base/atomic_sequence_num.h" 14#include "base/callback.h" 15#include "base/compiler_specific.h" 16#include "base/critical_closure.h" 17#include "base/debug/trace_event.h" 18#include "base/lazy_instance.h" 19#include "base/logging.h" 20#include "base/memory/linked_ptr.h" 21#include "base/message_loop/message_loop_proxy.h" 22#include "base/stl_util.h" 23#include "base/strings/stringprintf.h" 24#include "base/synchronization/condition_variable.h" 25#include "base/synchronization/lock.h" 26#include "base/threading/platform_thread.h" 27#include "base/threading/simple_thread.h" 28#include "base/threading/thread_local.h" 29#include "base/threading/thread_restrictions.h" 30#include "base/time/time.h" 31#include "base/tracked_objects.h" 32 33#if defined(OS_MACOSX) 34#include "base/mac/scoped_nsautorelease_pool.h" 35#endif 36 37#if !defined(OS_NACL) 38#include "base/metrics/histogram.h" 39#endif 40 41namespace base { 42 43namespace { 44 45struct SequencedTask : public TrackingInfo { 46 SequencedTask() 47 : sequence_token_id(0), 48 trace_id(0), 49 sequence_task_number(0), 50 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} 51 52 explicit SequencedTask(const tracked_objects::Location& from_here) 53 : base::TrackingInfo(from_here, TimeTicks()), 54 sequence_token_id(0), 55 trace_id(0), 56 sequence_task_number(0), 57 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} 58 59 ~SequencedTask() {} 60 61 int sequence_token_id; 62 int trace_id; 63 int64 sequence_task_number; 64 SequencedWorkerPool::WorkerShutdown shutdown_behavior; 65 tracked_objects::Location posted_from; 66 Closure task; 67 68 // Non-delayed tasks and delayed tasks are managed together by time-to-run 69 // order. We calculate the time by adding the posted time and the given delay. 70 TimeTicks time_to_run; 71}; 72 73struct SequencedTaskLessThan { 74 public: 75 bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const { 76 if (lhs.time_to_run < rhs.time_to_run) 77 return true; 78 79 if (lhs.time_to_run > rhs.time_to_run) 80 return false; 81 82 // If the time happen to match, then we use the sequence number to decide. 83 return lhs.sequence_task_number < rhs.sequence_task_number; 84 } 85}; 86 87// SequencedWorkerPoolTaskRunner --------------------------------------------- 88// A TaskRunner which posts tasks to a SequencedWorkerPool with a 89// fixed ShutdownBehavior. 90// 91// Note that this class is RefCountedThreadSafe (inherited from TaskRunner). 92class SequencedWorkerPoolTaskRunner : public TaskRunner { 93 public: 94 SequencedWorkerPoolTaskRunner( 95 const scoped_refptr<SequencedWorkerPool>& pool, 96 SequencedWorkerPool::WorkerShutdown shutdown_behavior); 97 98 // TaskRunner implementation 99 virtual bool PostDelayedTask(const tracked_objects::Location& from_here, 100 const Closure& task, 101 TimeDelta delay) OVERRIDE; 102 virtual bool RunsTasksOnCurrentThread() const OVERRIDE; 103 104 private: 105 virtual ~SequencedWorkerPoolTaskRunner(); 106 107 const scoped_refptr<SequencedWorkerPool> pool_; 108 109 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_; 110 111 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner); 112}; 113 114SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner( 115 const scoped_refptr<SequencedWorkerPool>& pool, 116 SequencedWorkerPool::WorkerShutdown shutdown_behavior) 117 : pool_(pool), 118 shutdown_behavior_(shutdown_behavior) { 119} 120 121SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() { 122} 123 124bool SequencedWorkerPoolTaskRunner::PostDelayedTask( 125 const tracked_objects::Location& from_here, 126 const Closure& task, 127 TimeDelta delay) { 128 if (delay == TimeDelta()) { 129 return pool_->PostWorkerTaskWithShutdownBehavior( 130 from_here, task, shutdown_behavior_); 131 } 132 return pool_->PostDelayedWorkerTask(from_here, task, delay); 133} 134 135bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const { 136 return pool_->RunsTasksOnCurrentThread(); 137} 138 139// SequencedWorkerPoolSequencedTaskRunner ------------------------------------ 140// A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a 141// fixed sequence token. 142// 143// Note that this class is RefCountedThreadSafe (inherited from TaskRunner). 144class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner { 145 public: 146 SequencedWorkerPoolSequencedTaskRunner( 147 const scoped_refptr<SequencedWorkerPool>& pool, 148 SequencedWorkerPool::SequenceToken token, 149 SequencedWorkerPool::WorkerShutdown shutdown_behavior); 150 151 // TaskRunner implementation 152 virtual bool PostDelayedTask(const tracked_objects::Location& from_here, 153 const Closure& task, 154 TimeDelta delay) OVERRIDE; 155 virtual bool RunsTasksOnCurrentThread() const OVERRIDE; 156 157 // SequencedTaskRunner implementation 158 virtual bool PostNonNestableDelayedTask( 159 const tracked_objects::Location& from_here, 160 const Closure& task, 161 TimeDelta delay) OVERRIDE; 162 163 private: 164 virtual ~SequencedWorkerPoolSequencedTaskRunner(); 165 166 const scoped_refptr<SequencedWorkerPool> pool_; 167 168 const SequencedWorkerPool::SequenceToken token_; 169 170 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_; 171 172 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner); 173}; 174 175SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner( 176 const scoped_refptr<SequencedWorkerPool>& pool, 177 SequencedWorkerPool::SequenceToken token, 178 SequencedWorkerPool::WorkerShutdown shutdown_behavior) 179 : pool_(pool), 180 token_(token), 181 shutdown_behavior_(shutdown_behavior) { 182} 183 184SequencedWorkerPoolSequencedTaskRunner:: 185~SequencedWorkerPoolSequencedTaskRunner() { 186} 187 188bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask( 189 const tracked_objects::Location& from_here, 190 const Closure& task, 191 TimeDelta delay) { 192 if (delay == TimeDelta()) { 193 return pool_->PostSequencedWorkerTaskWithShutdownBehavior( 194 token_, from_here, task, shutdown_behavior_); 195 } 196 return pool_->PostDelayedSequencedWorkerTask(token_, from_here, task, delay); 197} 198 199bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const { 200 return pool_->IsRunningSequenceOnCurrentThread(token_); 201} 202 203bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask( 204 const tracked_objects::Location& from_here, 205 const Closure& task, 206 TimeDelta delay) { 207 // There's no way to run nested tasks, so simply forward to 208 // PostDelayedTask. 209 return PostDelayedTask(from_here, task, delay); 210} 211 212// Create a process-wide unique ID to represent this task in trace events. This 213// will be mangled with a Process ID hash to reduce the likelyhood of colliding 214// with MessageLoop pointers on other processes. 215uint64 GetTaskTraceID(const SequencedTask& task, 216 void* pool) { 217 return (static_cast<uint64>(task.trace_id) << 32) | 218 static_cast<uint64>(reinterpret_cast<intptr_t>(pool)); 219} 220 221base::LazyInstance<base::ThreadLocalPointer< 222 SequencedWorkerPool::SequenceToken> >::Leaky g_lazy_tls_ptr = 223 LAZY_INSTANCE_INITIALIZER; 224 225} // namespace 226 227// Worker --------------------------------------------------------------------- 228 229class SequencedWorkerPool::Worker : public SimpleThread { 230 public: 231 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it 232 // around as long as we are running. 233 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, 234 int thread_number, 235 const std::string& thread_name_prefix); 236 virtual ~Worker(); 237 238 // SimpleThread implementation. This actually runs the background thread. 239 virtual void Run() OVERRIDE; 240 241 void set_running_task_info(SequenceToken token, 242 WorkerShutdown shutdown_behavior) { 243 running_sequence_ = token; 244 running_shutdown_behavior_ = shutdown_behavior; 245 } 246 247 SequenceToken running_sequence() const { 248 return running_sequence_; 249 } 250 251 WorkerShutdown running_shutdown_behavior() const { 252 return running_shutdown_behavior_; 253 } 254 255 private: 256 scoped_refptr<SequencedWorkerPool> worker_pool_; 257 SequenceToken running_sequence_; 258 WorkerShutdown running_shutdown_behavior_; 259 260 DISALLOW_COPY_AND_ASSIGN(Worker); 261}; 262 263// Inner ---------------------------------------------------------------------- 264 265class SequencedWorkerPool::Inner { 266 public: 267 // Take a raw pointer to |worker| to avoid cycles (since we're owned 268 // by it). 269 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, 270 const std::string& thread_name_prefix, 271 TestingObserver* observer); 272 273 ~Inner(); 274 275 SequenceToken GetSequenceToken(); 276 277 SequenceToken GetNamedSequenceToken(const std::string& name); 278 279 // This function accepts a name and an ID. If the name is null, the 280 // token ID is used. This allows us to implement the optional name lookup 281 // from a single function without having to enter the lock a separate time. 282 bool PostTask(const std::string* optional_token_name, 283 SequenceToken sequence_token, 284 WorkerShutdown shutdown_behavior, 285 const tracked_objects::Location& from_here, 286 const Closure& task, 287 TimeDelta delay); 288 289 bool RunsTasksOnCurrentThread() const; 290 291 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; 292 293 void CleanupForTesting(); 294 295 void SignalHasWorkForTesting(); 296 297 int GetWorkSignalCountForTesting() const; 298 299 void Shutdown(int max_blocking_tasks_after_shutdown); 300 301 bool IsShutdownInProgress(); 302 303 // Runs the worker loop on the background thread. 304 void ThreadLoop(Worker* this_worker); 305 306 private: 307 enum GetWorkStatus { 308 GET_WORK_FOUND, 309 GET_WORK_NOT_FOUND, 310 GET_WORK_WAIT, 311 }; 312 313 enum CleanupState { 314 CLEANUP_REQUESTED, 315 CLEANUP_STARTING, 316 CLEANUP_RUNNING, 317 CLEANUP_FINISHING, 318 CLEANUP_DONE, 319 }; 320 321 // Called from within the lock, this converts the given token name into a 322 // token ID, creating a new one if necessary. 323 int LockedGetNamedTokenID(const std::string& name); 324 325 // Called from within the lock, this returns the next sequence task number. 326 int64 LockedGetNextSequenceTaskNumber(); 327 328 // Called from within the lock, returns the shutdown behavior of the task 329 // running on the currently executing worker thread. If invoked from a thread 330 // that is not one of the workers, returns CONTINUE_ON_SHUTDOWN. 331 WorkerShutdown LockedCurrentThreadShutdownBehavior() const; 332 333 // Gets new task. There are 3 cases depending on the return value: 334 // 335 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should 336 // be run immediately. 337 // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run, 338 // and |task| is not filled in. In this case, the caller should wait until 339 // a task is posted. 340 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run 341 // immediately, and |task| is not filled in. Likewise, |wait_time| is 342 // filled in the time to wait until the next task to run. In this case, the 343 // caller should wait the time. 344 // 345 // In any case, the calling code should clear the given 346 // delete_these_outside_lock vector the next time the lock is released. 347 // See the implementation for a more detailed description. 348 GetWorkStatus GetWork(SequencedTask* task, 349 TimeDelta* wait_time, 350 std::vector<Closure>* delete_these_outside_lock); 351 352 void HandleCleanup(); 353 354 // Peforms init and cleanup around running the given task. WillRun... 355 // returns the value from PrepareToStartAdditionalThreadIfNecessary. 356 // The calling code should call FinishStartingAdditionalThread once the 357 // lock is released if the return values is nonzero. 358 int WillRunWorkerTask(const SequencedTask& task); 359 void DidRunWorkerTask(const SequencedTask& task); 360 361 // Returns true if there are no threads currently running the given 362 // sequence token. 363 bool IsSequenceTokenRunnable(int sequence_token_id) const; 364 365 // Checks if all threads are busy and the addition of one more could run an 366 // additional task waiting in the queue. This must be called from within 367 // the lock. 368 // 369 // If another thread is helpful, this will mark the thread as being in the 370 // process of starting and returns the index of the new thread which will be 371 // 0 or more. The caller should then call FinishStartingAdditionalThread to 372 // complete initialization once the lock is released. 373 // 374 // If another thread is not necessary, returne 0; 375 // 376 // See the implementedion for more. 377 int PrepareToStartAdditionalThreadIfHelpful(); 378 379 // The second part of thread creation after 380 // PrepareToStartAdditionalThreadIfHelpful with the thread number it 381 // generated. This actually creates the thread and should be called outside 382 // the lock to avoid blocking important work starting a thread in the lock. 383 void FinishStartingAdditionalThread(int thread_number); 384 385 // Signal |has_work_| and increment |has_work_signal_count_|. 386 void SignalHasWork(); 387 388 // Checks whether there is work left that's blocking shutdown. Must be 389 // called inside the lock. 390 bool CanShutdown() const; 391 392 SequencedWorkerPool* const worker_pool_; 393 394 // The last sequence number used. Managed by GetSequenceToken, since this 395 // only does threadsafe increment operations, you do not need to hold the 396 // lock. This is class-static to make SequenceTokens issued by 397 // GetSequenceToken unique across SequencedWorkerPool instances. 398 static base::StaticAtomicSequenceNumber g_last_sequence_number_; 399 400 // This lock protects |everything in this class|. Do not read or modify 401 // anything without holding this lock. Do not block while holding this 402 // lock. 403 mutable Lock lock_; 404 405 // Condition variable that is waited on by worker threads until new 406 // tasks are posted or shutdown starts. 407 ConditionVariable has_work_cv_; 408 409 // Condition variable that is waited on by non-worker threads (in 410 // Shutdown()) until CanShutdown() goes to true. 411 ConditionVariable can_shutdown_cv_; 412 413 // The maximum number of worker threads we'll create. 414 const size_t max_threads_; 415 416 const std::string thread_name_prefix_; 417 418 // Associates all known sequence token names with their IDs. 419 std::map<std::string, int> named_sequence_tokens_; 420 421 // Owning pointers to all threads we've created so far, indexed by 422 // ID. Since we lazily create threads, this may be less than 423 // max_threads_ and will be initially empty. 424 typedef std::map<PlatformThreadId, linked_ptr<Worker> > ThreadMap; 425 ThreadMap threads_; 426 427 // Set to true when we're in the process of creating another thread. 428 // See PrepareToStartAdditionalThreadIfHelpful for more. 429 bool thread_being_created_; 430 431 // Number of threads currently waiting for work. 432 size_t waiting_thread_count_; 433 434 // Number of threads currently running tasks that have the BLOCK_SHUTDOWN 435 // or SKIP_ON_SHUTDOWN flag set. 436 size_t blocking_shutdown_thread_count_; 437 438 // A set of all pending tasks in time-to-run order. These are tasks that are 439 // either waiting for a thread to run on, waiting for their time to run, 440 // or blocked on a previous task in their sequence. We have to iterate over 441 // the tasks by time-to-run order, so we use the set instead of the 442 // traditional priority_queue. 443 typedef std::set<SequencedTask, SequencedTaskLessThan> PendingTaskSet; 444 PendingTaskSet pending_tasks_; 445 446 // The next sequence number for a new sequenced task. 447 int64 next_sequence_task_number_; 448 449 // Number of tasks in the pending_tasks_ list that are marked as blocking 450 // shutdown. 451 size_t blocking_shutdown_pending_task_count_; 452 453 // Lists all sequence tokens currently executing. 454 std::set<int> current_sequences_; 455 456 // An ID for each posted task to distinguish the task from others in traces. 457 int trace_id_; 458 459 // Set when Shutdown is called and no further tasks should be 460 // allowed, though we may still be running existing tasks. 461 bool shutdown_called_; 462 463 // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown() 464 // has been called. 465 int max_blocking_tasks_after_shutdown_; 466 467 // State used to cleanup for testing, all guarded by lock_. 468 CleanupState cleanup_state_; 469 size_t cleanup_idlers_; 470 ConditionVariable cleanup_cv_; 471 472 TestingObserver* const testing_observer_; 473 474 DISALLOW_COPY_AND_ASSIGN(Inner); 475}; 476 477// Worker definitions --------------------------------------------------------- 478 479SequencedWorkerPool::Worker::Worker( 480 const scoped_refptr<SequencedWorkerPool>& worker_pool, 481 int thread_number, 482 const std::string& prefix) 483 : SimpleThread( 484 prefix + StringPrintf("Worker%d", thread_number).c_str()), 485 worker_pool_(worker_pool), 486 running_shutdown_behavior_(CONTINUE_ON_SHUTDOWN) { 487 Start(); 488} 489 490SequencedWorkerPool::Worker::~Worker() { 491} 492 493void SequencedWorkerPool::Worker::Run() { 494 // Store a pointer to the running sequence in thread local storage for 495 // static function access. 496 g_lazy_tls_ptr.Get().Set(&running_sequence_); 497 498 // Just jump back to the Inner object to run the thread, since it has all the 499 // tracking information and queues. It might be more natural to implement 500 // using DelegateSimpleThread and have Inner implement the Delegate to avoid 501 // having these worker objects at all, but that method lacks the ability to 502 // send thread-specific information easily to the thread loop. 503 worker_pool_->inner_->ThreadLoop(this); 504 // Release our cyclic reference once we're done. 505 worker_pool_ = NULL; 506} 507 508// Inner definitions --------------------------------------------------------- 509 510SequencedWorkerPool::Inner::Inner( 511 SequencedWorkerPool* worker_pool, 512 size_t max_threads, 513 const std::string& thread_name_prefix, 514 TestingObserver* observer) 515 : worker_pool_(worker_pool), 516 lock_(), 517 has_work_cv_(&lock_), 518 can_shutdown_cv_(&lock_), 519 max_threads_(max_threads), 520 thread_name_prefix_(thread_name_prefix), 521 thread_being_created_(false), 522 waiting_thread_count_(0), 523 blocking_shutdown_thread_count_(0), 524 next_sequence_task_number_(0), 525 blocking_shutdown_pending_task_count_(0), 526 trace_id_(0), 527 shutdown_called_(false), 528 max_blocking_tasks_after_shutdown_(0), 529 cleanup_state_(CLEANUP_DONE), 530 cleanup_idlers_(0), 531 cleanup_cv_(&lock_), 532 testing_observer_(observer) {} 533 534SequencedWorkerPool::Inner::~Inner() { 535 // You must call Shutdown() before destroying the pool. 536 DCHECK(shutdown_called_); 537 538 // Need to explicitly join with the threads before they're destroyed or else 539 // they will be running when our object is half torn down. 540 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) 541 it->second->Join(); 542 threads_.clear(); 543 544 if (testing_observer_) 545 testing_observer_->OnDestruct(); 546} 547 548SequencedWorkerPool::SequenceToken 549SequencedWorkerPool::Inner::GetSequenceToken() { 550 // Need to add one because StaticAtomicSequenceNumber starts at zero, which 551 // is used as a sentinel value in SequenceTokens. 552 return SequenceToken(g_last_sequence_number_.GetNext() + 1); 553} 554 555SequencedWorkerPool::SequenceToken 556SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { 557 AutoLock lock(lock_); 558 return SequenceToken(LockedGetNamedTokenID(name)); 559} 560 561bool SequencedWorkerPool::Inner::PostTask( 562 const std::string* optional_token_name, 563 SequenceToken sequence_token, 564 WorkerShutdown shutdown_behavior, 565 const tracked_objects::Location& from_here, 566 const Closure& task, 567 TimeDelta delay) { 568 DCHECK(delay == TimeDelta() || shutdown_behavior == SKIP_ON_SHUTDOWN); 569 SequencedTask sequenced(from_here); 570 sequenced.sequence_token_id = sequence_token.id_; 571 sequenced.shutdown_behavior = shutdown_behavior; 572 sequenced.posted_from = from_here; 573 sequenced.task = 574 shutdown_behavior == BLOCK_SHUTDOWN ? 575 base::MakeCriticalClosure(task) : task; 576 sequenced.time_to_run = TimeTicks::Now() + delay; 577 578 int create_thread_id = 0; 579 { 580 AutoLock lock(lock_); 581 if (shutdown_called_) { 582 if (shutdown_behavior != BLOCK_SHUTDOWN || 583 LockedCurrentThreadShutdownBehavior() == CONTINUE_ON_SHUTDOWN) { 584 return false; 585 } 586 if (max_blocking_tasks_after_shutdown_ <= 0) { 587 DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed"; 588 return false; 589 } 590 max_blocking_tasks_after_shutdown_ -= 1; 591 } 592 593 // The trace_id is used for identifying the task in about:tracing. 594 sequenced.trace_id = trace_id_++; 595 596 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), 597 "SequencedWorkerPool::PostTask", 598 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this)))); 599 600 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); 601 602 // Now that we have the lock, apply the named token rules. 603 if (optional_token_name) 604 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); 605 606 pending_tasks_.insert(sequenced); 607 if (shutdown_behavior == BLOCK_SHUTDOWN) 608 blocking_shutdown_pending_task_count_++; 609 610 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); 611 } 612 613 // Actually start the additional thread or signal an existing one now that 614 // we're outside the lock. 615 if (create_thread_id) 616 FinishStartingAdditionalThread(create_thread_id); 617 else 618 SignalHasWork(); 619 620 return true; 621} 622 623bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { 624 AutoLock lock(lock_); 625 return ContainsKey(threads_, PlatformThread::CurrentId()); 626} 627 628bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( 629 SequenceToken sequence_token) const { 630 AutoLock lock(lock_); 631 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 632 if (found == threads_.end()) 633 return false; 634 return sequence_token.Equals(found->second->running_sequence()); 635} 636 637// See https://code.google.com/p/chromium/issues/detail?id=168415 638void SequencedWorkerPool::Inner::CleanupForTesting() { 639 DCHECK(!RunsTasksOnCurrentThread()); 640 base::ThreadRestrictions::ScopedAllowWait allow_wait; 641 AutoLock lock(lock_); 642 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 643 if (shutdown_called_) 644 return; 645 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) 646 return; 647 cleanup_state_ = CLEANUP_REQUESTED; 648 cleanup_idlers_ = 0; 649 has_work_cv_.Signal(); 650 while (cleanup_state_ != CLEANUP_DONE) 651 cleanup_cv_.Wait(); 652} 653 654void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { 655 SignalHasWork(); 656} 657 658void SequencedWorkerPool::Inner::Shutdown( 659 int max_new_blocking_tasks_after_shutdown) { 660 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); 661 { 662 AutoLock lock(lock_); 663 // Cleanup and Shutdown should not be called concurrently. 664 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 665 if (shutdown_called_) 666 return; 667 shutdown_called_ = true; 668 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; 669 670 // Tickle the threads. This will wake up a waiting one so it will know that 671 // it can exit, which in turn will wake up any other waiting ones. 672 SignalHasWork(); 673 674 // There are no pending or running tasks blocking shutdown, we're done. 675 if (CanShutdown()) 676 return; 677 } 678 679 // If we're here, then something is blocking shutdown. So wait for 680 // CanShutdown() to go to true. 681 682 if (testing_observer_) 683 testing_observer_->WillWaitForShutdown(); 684 685#if !defined(OS_NACL) 686 TimeTicks shutdown_wait_begin = TimeTicks::Now(); 687#endif 688 689 { 690 base::ThreadRestrictions::ScopedAllowWait allow_wait; 691 AutoLock lock(lock_); 692 while (!CanShutdown()) 693 can_shutdown_cv_.Wait(); 694 } 695#if !defined(OS_NACL) 696 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime", 697 TimeTicks::Now() - shutdown_wait_begin); 698#endif 699} 700 701bool SequencedWorkerPool::Inner::IsShutdownInProgress() { 702 AutoLock lock(lock_); 703 return shutdown_called_; 704} 705 706void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { 707 { 708 AutoLock lock(lock_); 709 DCHECK(thread_being_created_); 710 thread_being_created_ = false; 711 std::pair<ThreadMap::iterator, bool> result = 712 threads_.insert( 713 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker))); 714 DCHECK(result.second); 715 716 while (true) { 717#if defined(OS_MACOSX) 718 base::mac::ScopedNSAutoreleasePool autorelease_pool; 719#endif 720 721 HandleCleanup(); 722 723 // See GetWork for what delete_these_outside_lock is doing. 724 SequencedTask task; 725 TimeDelta wait_time; 726 std::vector<Closure> delete_these_outside_lock; 727 GetWorkStatus status = 728 GetWork(&task, &wait_time, &delete_these_outside_lock); 729 if (status == GET_WORK_FOUND) { 730 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), 731 "SequencedWorkerPool::PostTask", 732 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this)))); 733 TRACE_EVENT2("toplevel", "SequencedWorkerPool::ThreadLoop", 734 "src_file", task.posted_from.file_name(), 735 "src_func", task.posted_from.function_name()); 736 int new_thread_id = WillRunWorkerTask(task); 737 { 738 AutoUnlock unlock(lock_); 739 // There may be more work available, so wake up another 740 // worker thread. (Technically not required, since we 741 // already get a signal for each new task, but it doesn't 742 // hurt.) 743 SignalHasWork(); 744 delete_these_outside_lock.clear(); 745 746 // Complete thread creation outside the lock if necessary. 747 if (new_thread_id) 748 FinishStartingAdditionalThread(new_thread_id); 749 750 this_worker->set_running_task_info( 751 SequenceToken(task.sequence_token_id), task.shutdown_behavior); 752 753 tracked_objects::TrackedTime start_time = 754 tracked_objects::ThreadData::NowForStartOfRun(task.birth_tally); 755 756 task.task.Run(); 757 758 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(task, 759 start_time, tracked_objects::ThreadData::NowForEndOfRun()); 760 761 // Make sure our task is erased outside the lock for the 762 // same reason we do this with delete_these_oustide_lock. 763 // Also, do it before calling set_running_task_info() so 764 // that sequence-checking from within the task's destructor 765 // still works. 766 task.task = Closure(); 767 768 this_worker->set_running_task_info( 769 SequenceToken(), CONTINUE_ON_SHUTDOWN); 770 } 771 DidRunWorkerTask(task); // Must be done inside the lock. 772 } else if (cleanup_state_ == CLEANUP_RUNNING) { 773 switch (status) { 774 case GET_WORK_WAIT: { 775 AutoUnlock unlock(lock_); 776 delete_these_outside_lock.clear(); 777 } 778 break; 779 case GET_WORK_NOT_FOUND: 780 CHECK(delete_these_outside_lock.empty()); 781 cleanup_state_ = CLEANUP_FINISHING; 782 cleanup_cv_.Broadcast(); 783 break; 784 default: 785 NOTREACHED(); 786 } 787 } else { 788 // When we're terminating and there's no more work, we can 789 // shut down, other workers can complete any pending or new tasks. 790 // We can get additional tasks posted after shutdown_called_ is set 791 // but only worker threads are allowed to post tasks at that time, and 792 // the workers responsible for posting those tasks will be available 793 // to run them. Also, there may be some tasks stuck behind running 794 // ones with the same sequence token, but additional threads won't 795 // help this case. 796 if (shutdown_called_ && 797 blocking_shutdown_pending_task_count_ == 0) 798 break; 799 waiting_thread_count_++; 800 801 switch (status) { 802 case GET_WORK_NOT_FOUND: 803 has_work_cv_.Wait(); 804 break; 805 case GET_WORK_WAIT: 806 has_work_cv_.TimedWait(wait_time); 807 break; 808 default: 809 NOTREACHED(); 810 } 811 waiting_thread_count_--; 812 } 813 } 814 } // Release lock_. 815 816 // We noticed we should exit. Wake up the next worker so it knows it should 817 // exit as well (because the Shutdown() code only signals once). 818 SignalHasWork(); 819 820 // Possibly unblock shutdown. 821 can_shutdown_cv_.Signal(); 822} 823 824void SequencedWorkerPool::Inner::HandleCleanup() { 825 lock_.AssertAcquired(); 826 if (cleanup_state_ == CLEANUP_DONE) 827 return; 828 if (cleanup_state_ == CLEANUP_REQUESTED) { 829 // We win, we get to do the cleanup as soon as the others wise up and idle. 830 cleanup_state_ = CLEANUP_STARTING; 831 while (thread_being_created_ || 832 cleanup_idlers_ != threads_.size() - 1) { 833 has_work_cv_.Signal(); 834 cleanup_cv_.Wait(); 835 } 836 cleanup_state_ = CLEANUP_RUNNING; 837 return; 838 } 839 if (cleanup_state_ == CLEANUP_STARTING) { 840 // Another worker thread is cleaning up, we idle here until thats done. 841 ++cleanup_idlers_; 842 cleanup_cv_.Broadcast(); 843 while (cleanup_state_ != CLEANUP_FINISHING) { 844 cleanup_cv_.Wait(); 845 } 846 --cleanup_idlers_; 847 cleanup_cv_.Broadcast(); 848 return; 849 } 850 if (cleanup_state_ == CLEANUP_FINISHING) { 851 // We wait for all idlers to wake up prior to being DONE. 852 while (cleanup_idlers_ != 0) { 853 cleanup_cv_.Broadcast(); 854 cleanup_cv_.Wait(); 855 } 856 if (cleanup_state_ == CLEANUP_FINISHING) { 857 cleanup_state_ = CLEANUP_DONE; 858 cleanup_cv_.Signal(); 859 } 860 return; 861 } 862} 863 864int SequencedWorkerPool::Inner::LockedGetNamedTokenID( 865 const std::string& name) { 866 lock_.AssertAcquired(); 867 DCHECK(!name.empty()); 868 869 std::map<std::string, int>::const_iterator found = 870 named_sequence_tokens_.find(name); 871 if (found != named_sequence_tokens_.end()) 872 return found->second; // Got an existing one. 873 874 // Create a new one for this name. 875 SequenceToken result = GetSequenceToken(); 876 named_sequence_tokens_.insert(std::make_pair(name, result.id_)); 877 return result.id_; 878} 879 880int64 SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { 881 lock_.AssertAcquired(); 882 // We assume that we never create enough tasks to wrap around. 883 return next_sequence_task_number_++; 884} 885 886SequencedWorkerPool::WorkerShutdown 887SequencedWorkerPool::Inner::LockedCurrentThreadShutdownBehavior() const { 888 lock_.AssertAcquired(); 889 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 890 if (found == threads_.end()) 891 return CONTINUE_ON_SHUTDOWN; 892 return found->second->running_shutdown_behavior(); 893} 894 895SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( 896 SequencedTask* task, 897 TimeDelta* wait_time, 898 std::vector<Closure>* delete_these_outside_lock) { 899 lock_.AssertAcquired(); 900 901#if !defined(OS_NACL) 902 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount", 903 static_cast<int>(pending_tasks_.size())); 904#endif 905 906 // Find the next task with a sequence token that's not currently in use. 907 // If the token is in use, that means another thread is running something 908 // in that sequence, and we can't run it without going out-of-order. 909 // 910 // This algorithm is simple and fair, but inefficient in some cases. For 911 // example, say somebody schedules 1000 slow tasks with the same sequence 912 // number. We'll have to go through all those tasks each time we feel like 913 // there might be work to schedule. If this proves to be a problem, we 914 // should make this more efficient. 915 // 916 // One possible enhancement would be to keep a map from sequence ID to a 917 // list of pending but currently blocked SequencedTasks for that ID. 918 // When a worker finishes a task of one sequence token, it can pick up the 919 // next one from that token right away. 920 // 921 // This may lead to starvation if there are sufficient numbers of sequences 922 // in use. To alleviate this, we could add an incrementing priority counter 923 // to each SequencedTask. Then maintain a priority_queue of all runnable 924 // tasks, sorted by priority counter. When a sequenced task is completed 925 // we would pop the head element off of that tasks pending list and add it 926 // to the priority queue. Then we would run the first item in the priority 927 // queue. 928 929 GetWorkStatus status = GET_WORK_NOT_FOUND; 930 int unrunnable_tasks = 0; 931 PendingTaskSet::iterator i = pending_tasks_.begin(); 932 // We assume that the loop below doesn't take too long and so we can just do 933 // a single call to TimeTicks::Now(). 934 const TimeTicks current_time = TimeTicks::Now(); 935 while (i != pending_tasks_.end()) { 936 if (!IsSequenceTokenRunnable(i->sequence_token_id)) { 937 unrunnable_tasks++; 938 ++i; 939 continue; 940 } 941 942 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { 943 // We're shutting down and the task we just found isn't blocking 944 // shutdown. Delete it and get more work. 945 // 946 // Note that we do not want to delete unrunnable tasks. Deleting a task 947 // can have side effects (like freeing some objects) and deleting a 948 // task that's supposed to run after one that's currently running could 949 // cause an obscure crash. 950 // 951 // We really want to delete these tasks outside the lock in case the 952 // closures are holding refs to objects that want to post work from 953 // their destructorss (which would deadlock). The closures are 954 // internally refcounted, so we just need to keep a copy of them alive 955 // until the lock is exited. The calling code can just clear() the 956 // vector they passed to us once the lock is exited to make this 957 // happen. 958 delete_these_outside_lock->push_back(i->task); 959 pending_tasks_.erase(i++); 960 continue; 961 } 962 963 if (i->time_to_run > current_time) { 964 // The time to run has not come yet. 965 *wait_time = i->time_to_run - current_time; 966 status = GET_WORK_WAIT; 967 if (cleanup_state_ == CLEANUP_RUNNING) { 968 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. 969 delete_these_outside_lock->push_back(i->task); 970 pending_tasks_.erase(i); 971 } 972 break; 973 } 974 975 // Found a runnable task. 976 *task = *i; 977 pending_tasks_.erase(i); 978 if (task->shutdown_behavior == BLOCK_SHUTDOWN) { 979 blocking_shutdown_pending_task_count_--; 980 } 981 982 status = GET_WORK_FOUND; 983 break; 984 } 985 986 // Track the number of tasks we had to skip over to see if we should be 987 // making this more efficient. If this number ever becomes large or is 988 // frequently "some", we should consider the optimization above. 989#if !defined(OS_NACL) 990 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount", 991 unrunnable_tasks); 992#endif 993 return status; 994} 995 996int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { 997 lock_.AssertAcquired(); 998 999 // Mark the task's sequence number as in use. 1000 if (task.sequence_token_id) 1001 current_sequences_.insert(task.sequence_token_id); 1002 1003 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN 1004 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread 1005 // completes. 1006 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) 1007 blocking_shutdown_thread_count_++; 1008 1009 // We just picked up a task. Since StartAdditionalThreadIfHelpful only 1010 // creates a new thread if there is no free one, there is a race when posting 1011 // tasks that many tasks could have been posted before a thread started 1012 // running them, so only one thread would have been created. So we also check 1013 // whether we should create more threads after removing our task from the 1014 // queue, which also has the nice side effect of creating the workers from 1015 // background threads rather than the main thread of the app. 1016 // 1017 // If another thread wasn't created, we want to wake up an existing thread 1018 // if there is one waiting to pick up the next task. 1019 // 1020 // Note that we really need to do this *before* running the task, not 1021 // after. Otherwise, if more than one task is posted, the creation of the 1022 // second thread (since we only create one at a time) will be blocked by 1023 // the execution of the first task, which could be arbitrarily long. 1024 return PrepareToStartAdditionalThreadIfHelpful(); 1025} 1026 1027void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { 1028 lock_.AssertAcquired(); 1029 1030 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { 1031 DCHECK_GT(blocking_shutdown_thread_count_, 0u); 1032 blocking_shutdown_thread_count_--; 1033 } 1034 1035 if (task.sequence_token_id) 1036 current_sequences_.erase(task.sequence_token_id); 1037} 1038 1039bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( 1040 int sequence_token_id) const { 1041 lock_.AssertAcquired(); 1042 return !sequence_token_id || 1043 current_sequences_.find(sequence_token_id) == 1044 current_sequences_.end(); 1045} 1046 1047int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { 1048 lock_.AssertAcquired(); 1049 // How thread creation works: 1050 // 1051 // We'de like to avoid creating threads with the lock held. However, we 1052 // need to be sure that we have an accurate accounting of the threads for 1053 // proper Joining and deltion on shutdown. 1054 // 1055 // We need to figure out if we need another thread with the lock held, which 1056 // is what this function does. It then marks us as in the process of creating 1057 // a thread. When we do shutdown, we wait until the thread_being_created_ 1058 // flag is cleared, which ensures that the new thread is properly added to 1059 // all the data structures and we can't leak it. Once shutdown starts, we'll 1060 // refuse to create more threads or they would be leaked. 1061 // 1062 // Note that this creates a mostly benign race condition on shutdown that 1063 // will cause fewer workers to be created than one would expect. It isn't 1064 // much of an issue in real life, but affects some tests. Since we only spawn 1065 // one worker at a time, the following sequence of events can happen: 1066 // 1067 // 1. Main thread posts a bunch of unrelated tasks that would normally be 1068 // run on separate threads. 1069 // 2. The first task post causes us to start a worker. Other tasks do not 1070 // cause a worker to start since one is pending. 1071 // 3. Main thread initiates shutdown. 1072 // 4. No more threads are created since the shutdown_called_ flag is set. 1073 // 1074 // The result is that one may expect that max_threads_ workers to be created 1075 // given the workload, but in reality fewer may be created because the 1076 // sequence of thread creation on the background threads is racing with the 1077 // shutdown call. 1078 if (!shutdown_called_ && 1079 !thread_being_created_ && 1080 cleanup_state_ == CLEANUP_DONE && 1081 threads_.size() < max_threads_ && 1082 waiting_thread_count_ == 0) { 1083 // We could use an additional thread if there's work to be done. 1084 for (PendingTaskSet::const_iterator i = pending_tasks_.begin(); 1085 i != pending_tasks_.end(); ++i) { 1086 if (IsSequenceTokenRunnable(i->sequence_token_id)) { 1087 // Found a runnable task, mark the thread as being started. 1088 thread_being_created_ = true; 1089 return static_cast<int>(threads_.size() + 1); 1090 } 1091 } 1092 } 1093 return 0; 1094} 1095 1096void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( 1097 int thread_number) { 1098 // Called outside of the lock. 1099 DCHECK(thread_number > 0); 1100 1101 // The worker is assigned to the list when the thread actually starts, which 1102 // will manage the memory of the pointer. 1103 new Worker(worker_pool_, thread_number, thread_name_prefix_); 1104} 1105 1106void SequencedWorkerPool::Inner::SignalHasWork() { 1107 has_work_cv_.Signal(); 1108 if (testing_observer_) { 1109 testing_observer_->OnHasWork(); 1110 } 1111} 1112 1113bool SequencedWorkerPool::Inner::CanShutdown() const { 1114 lock_.AssertAcquired(); 1115 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. 1116 return !thread_being_created_ && 1117 blocking_shutdown_thread_count_ == 0 && 1118 blocking_shutdown_pending_task_count_ == 0; 1119} 1120 1121base::StaticAtomicSequenceNumber 1122SequencedWorkerPool::Inner::g_last_sequence_number_; 1123 1124// SequencedWorkerPool -------------------------------------------------------- 1125 1126// static 1127SequencedWorkerPool::SequenceToken 1128SequencedWorkerPool::GetSequenceTokenForCurrentThread() { 1129 // Don't construct lazy instance on check. 1130 if (g_lazy_tls_ptr == NULL) 1131 return SequenceToken(); 1132 1133 SequencedWorkerPool::SequenceToken* token = g_lazy_tls_ptr.Get().Get(); 1134 if (!token) 1135 return SequenceToken(); 1136 return *token; 1137} 1138 1139SequencedWorkerPool::SequencedWorkerPool( 1140 size_t max_threads, 1141 const std::string& thread_name_prefix) 1142 : constructor_message_loop_(MessageLoopProxy::current()), 1143 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { 1144} 1145 1146SequencedWorkerPool::SequencedWorkerPool( 1147 size_t max_threads, 1148 const std::string& thread_name_prefix, 1149 TestingObserver* observer) 1150 : constructor_message_loop_(MessageLoopProxy::current()), 1151 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) { 1152} 1153 1154SequencedWorkerPool::~SequencedWorkerPool() {} 1155 1156void SequencedWorkerPool::OnDestruct() const { 1157 DCHECK(constructor_message_loop_.get()); 1158 // Avoid deleting ourselves on a worker thread (which would 1159 // deadlock). 1160 if (RunsTasksOnCurrentThread()) { 1161 constructor_message_loop_->DeleteSoon(FROM_HERE, this); 1162 } else { 1163 delete this; 1164 } 1165} 1166 1167SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { 1168 return inner_->GetSequenceToken(); 1169} 1170 1171SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( 1172 const std::string& name) { 1173 return inner_->GetNamedSequenceToken(name); 1174} 1175 1176scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( 1177 SequenceToken token) { 1178 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); 1179} 1180 1181scoped_refptr<SequencedTaskRunner> 1182SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior( 1183 SequenceToken token, WorkerShutdown shutdown_behavior) { 1184 return new SequencedWorkerPoolSequencedTaskRunner( 1185 this, token, shutdown_behavior); 1186} 1187 1188scoped_refptr<TaskRunner> 1189SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior( 1190 WorkerShutdown shutdown_behavior) { 1191 return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior); 1192} 1193 1194bool SequencedWorkerPool::PostWorkerTask( 1195 const tracked_objects::Location& from_here, 1196 const Closure& task) { 1197 return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN, 1198 from_here, task, TimeDelta()); 1199} 1200 1201bool SequencedWorkerPool::PostDelayedWorkerTask( 1202 const tracked_objects::Location& from_here, 1203 const Closure& task, 1204 TimeDelta delay) { 1205 WorkerShutdown shutdown_behavior = 1206 delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN; 1207 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, 1208 from_here, task, delay); 1209} 1210 1211bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior( 1212 const tracked_objects::Location& from_here, 1213 const Closure& task, 1214 WorkerShutdown shutdown_behavior) { 1215 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, 1216 from_here, task, TimeDelta()); 1217} 1218 1219bool SequencedWorkerPool::PostSequencedWorkerTask( 1220 SequenceToken sequence_token, 1221 const tracked_objects::Location& from_here, 1222 const Closure& task) { 1223 return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN, 1224 from_here, task, TimeDelta()); 1225} 1226 1227bool SequencedWorkerPool::PostDelayedSequencedWorkerTask( 1228 SequenceToken sequence_token, 1229 const tracked_objects::Location& from_here, 1230 const Closure& task, 1231 TimeDelta delay) { 1232 WorkerShutdown shutdown_behavior = 1233 delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN; 1234 return inner_->PostTask(NULL, sequence_token, shutdown_behavior, 1235 from_here, task, delay); 1236} 1237 1238bool SequencedWorkerPool::PostNamedSequencedWorkerTask( 1239 const std::string& token_name, 1240 const tracked_objects::Location& from_here, 1241 const Closure& task) { 1242 DCHECK(!token_name.empty()); 1243 return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN, 1244 from_here, task, TimeDelta()); 1245} 1246 1247bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior( 1248 SequenceToken sequence_token, 1249 const tracked_objects::Location& from_here, 1250 const Closure& task, 1251 WorkerShutdown shutdown_behavior) { 1252 return inner_->PostTask(NULL, sequence_token, shutdown_behavior, 1253 from_here, task, TimeDelta()); 1254} 1255 1256bool SequencedWorkerPool::PostDelayedTask( 1257 const tracked_objects::Location& from_here, 1258 const Closure& task, 1259 TimeDelta delay) { 1260 return PostDelayedWorkerTask(from_here, task, delay); 1261} 1262 1263bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { 1264 return inner_->RunsTasksOnCurrentThread(); 1265} 1266 1267bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( 1268 SequenceToken sequence_token) const { 1269 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); 1270} 1271 1272void SequencedWorkerPool::FlushForTesting() { 1273 inner_->CleanupForTesting(); 1274} 1275 1276void SequencedWorkerPool::SignalHasWorkForTesting() { 1277 inner_->SignalHasWorkForTesting(); 1278} 1279 1280void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1281 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); 1282 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1283} 1284 1285bool SequencedWorkerPool::IsShutdownInProgress() { 1286 return inner_->IsShutdownInProgress(); 1287} 1288 1289} // namespace base 1290