sequenced_worker_pool.cc revision 0529e5d033099cbfc42635f6f6183833b09dff6e
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "base/threading/sequenced_worker_pool.h" 6 7#include <list> 8#include <map> 9#include <set> 10#include <utility> 11#include <vector> 12 13#include "base/atomic_sequence_num.h" 14#include "base/callback.h" 15#include "base/compiler_specific.h" 16#include "base/critical_closure.h" 17#include "base/debug/trace_event.h" 18#include "base/lazy_instance.h" 19#include "base/logging.h" 20#include "base/memory/linked_ptr.h" 21#include "base/message_loop/message_loop_proxy.h" 22#include "base/stl_util.h" 23#include "base/strings/stringprintf.h" 24#include "base/synchronization/condition_variable.h" 25#include "base/synchronization/lock.h" 26#include "base/threading/platform_thread.h" 27#include "base/threading/simple_thread.h" 28#include "base/threading/thread_local.h" 29#include "base/threading/thread_restrictions.h" 30#include "base/time/time.h" 31#include "base/tracked_objects.h" 32 33#if defined(OS_MACOSX) 34#include "base/mac/scoped_nsautorelease_pool.h" 35#elif defined(OS_WIN) 36#include "base/win/scoped_com_initializer.h" 37#endif 38 39#if !defined(OS_NACL) 40#include "base/metrics/histogram.h" 41#endif 42 43namespace base { 44 45namespace { 46 47struct SequencedTask : public TrackingInfo { 48 SequencedTask() 49 : sequence_token_id(0), 50 trace_id(0), 51 sequence_task_number(0), 52 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} 53 54 explicit SequencedTask(const tracked_objects::Location& from_here) 55 : base::TrackingInfo(from_here, TimeTicks()), 56 sequence_token_id(0), 57 trace_id(0), 58 sequence_task_number(0), 59 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {} 60 61 ~SequencedTask() {} 62 63 int sequence_token_id; 64 int trace_id; 65 int64 sequence_task_number; 66 SequencedWorkerPool::WorkerShutdown shutdown_behavior; 67 tracked_objects::Location posted_from; 68 Closure task; 69 70 // Non-delayed tasks and delayed tasks are managed together by time-to-run 71 // order. We calculate the time by adding the posted time and the given delay. 72 TimeTicks time_to_run; 73}; 74 75struct SequencedTaskLessThan { 76 public: 77 bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const { 78 if (lhs.time_to_run < rhs.time_to_run) 79 return true; 80 81 if (lhs.time_to_run > rhs.time_to_run) 82 return false; 83 84 // If the time happen to match, then we use the sequence number to decide. 85 return lhs.sequence_task_number < rhs.sequence_task_number; 86 } 87}; 88 89// SequencedWorkerPoolTaskRunner --------------------------------------------- 90// A TaskRunner which posts tasks to a SequencedWorkerPool with a 91// fixed ShutdownBehavior. 92// 93// Note that this class is RefCountedThreadSafe (inherited from TaskRunner). 94class SequencedWorkerPoolTaskRunner : public TaskRunner { 95 public: 96 SequencedWorkerPoolTaskRunner( 97 const scoped_refptr<SequencedWorkerPool>& pool, 98 SequencedWorkerPool::WorkerShutdown shutdown_behavior); 99 100 // TaskRunner implementation 101 virtual bool PostDelayedTask(const tracked_objects::Location& from_here, 102 const Closure& task, 103 TimeDelta delay) OVERRIDE; 104 virtual bool RunsTasksOnCurrentThread() const OVERRIDE; 105 106 private: 107 virtual ~SequencedWorkerPoolTaskRunner(); 108 109 const scoped_refptr<SequencedWorkerPool> pool_; 110 111 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_; 112 113 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner); 114}; 115 116SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner( 117 const scoped_refptr<SequencedWorkerPool>& pool, 118 SequencedWorkerPool::WorkerShutdown shutdown_behavior) 119 : pool_(pool), 120 shutdown_behavior_(shutdown_behavior) { 121} 122 123SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() { 124} 125 126bool SequencedWorkerPoolTaskRunner::PostDelayedTask( 127 const tracked_objects::Location& from_here, 128 const Closure& task, 129 TimeDelta delay) { 130 if (delay == TimeDelta()) { 131 return pool_->PostWorkerTaskWithShutdownBehavior( 132 from_here, task, shutdown_behavior_); 133 } 134 return pool_->PostDelayedWorkerTask(from_here, task, delay); 135} 136 137bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const { 138 return pool_->RunsTasksOnCurrentThread(); 139} 140 141// SequencedWorkerPoolSequencedTaskRunner ------------------------------------ 142// A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a 143// fixed sequence token. 144// 145// Note that this class is RefCountedThreadSafe (inherited from TaskRunner). 146class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner { 147 public: 148 SequencedWorkerPoolSequencedTaskRunner( 149 const scoped_refptr<SequencedWorkerPool>& pool, 150 SequencedWorkerPool::SequenceToken token, 151 SequencedWorkerPool::WorkerShutdown shutdown_behavior); 152 153 // TaskRunner implementation 154 virtual bool PostDelayedTask(const tracked_objects::Location& from_here, 155 const Closure& task, 156 TimeDelta delay) OVERRIDE; 157 virtual bool RunsTasksOnCurrentThread() const OVERRIDE; 158 159 // SequencedTaskRunner implementation 160 virtual bool PostNonNestableDelayedTask( 161 const tracked_objects::Location& from_here, 162 const Closure& task, 163 TimeDelta delay) OVERRIDE; 164 165 private: 166 virtual ~SequencedWorkerPoolSequencedTaskRunner(); 167 168 const scoped_refptr<SequencedWorkerPool> pool_; 169 170 const SequencedWorkerPool::SequenceToken token_; 171 172 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_; 173 174 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner); 175}; 176 177SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner( 178 const scoped_refptr<SequencedWorkerPool>& pool, 179 SequencedWorkerPool::SequenceToken token, 180 SequencedWorkerPool::WorkerShutdown shutdown_behavior) 181 : pool_(pool), 182 token_(token), 183 shutdown_behavior_(shutdown_behavior) { 184} 185 186SequencedWorkerPoolSequencedTaskRunner:: 187~SequencedWorkerPoolSequencedTaskRunner() { 188} 189 190bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask( 191 const tracked_objects::Location& from_here, 192 const Closure& task, 193 TimeDelta delay) { 194 if (delay == TimeDelta()) { 195 return pool_->PostSequencedWorkerTaskWithShutdownBehavior( 196 token_, from_here, task, shutdown_behavior_); 197 } 198 return pool_->PostDelayedSequencedWorkerTask(token_, from_here, task, delay); 199} 200 201bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const { 202 return pool_->IsRunningSequenceOnCurrentThread(token_); 203} 204 205bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask( 206 const tracked_objects::Location& from_here, 207 const Closure& task, 208 TimeDelta delay) { 209 // There's no way to run nested tasks, so simply forward to 210 // PostDelayedTask. 211 return PostDelayedTask(from_here, task, delay); 212} 213 214// Create a process-wide unique ID to represent this task in trace events. This 215// will be mangled with a Process ID hash to reduce the likelyhood of colliding 216// with MessageLoop pointers on other processes. 217uint64 GetTaskTraceID(const SequencedTask& task, 218 void* pool) { 219 return (static_cast<uint64>(task.trace_id) << 32) | 220 static_cast<uint64>(reinterpret_cast<intptr_t>(pool)); 221} 222 223base::LazyInstance<base::ThreadLocalPointer< 224 SequencedWorkerPool::SequenceToken> >::Leaky g_lazy_tls_ptr = 225 LAZY_INSTANCE_INITIALIZER; 226 227} // namespace 228 229// Worker --------------------------------------------------------------------- 230 231class SequencedWorkerPool::Worker : public SimpleThread { 232 public: 233 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it 234 // around as long as we are running. 235 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool, 236 int thread_number, 237 const std::string& thread_name_prefix); 238 virtual ~Worker(); 239 240 // SimpleThread implementation. This actually runs the background thread. 241 virtual void Run() OVERRIDE; 242 243 void set_running_task_info(SequenceToken token, 244 WorkerShutdown shutdown_behavior) { 245 running_sequence_ = token; 246 running_shutdown_behavior_ = shutdown_behavior; 247 } 248 249 SequenceToken running_sequence() const { 250 return running_sequence_; 251 } 252 253 WorkerShutdown running_shutdown_behavior() const { 254 return running_shutdown_behavior_; 255 } 256 257 private: 258 scoped_refptr<SequencedWorkerPool> worker_pool_; 259 SequenceToken running_sequence_; 260 WorkerShutdown running_shutdown_behavior_; 261 262 DISALLOW_COPY_AND_ASSIGN(Worker); 263}; 264 265// Inner ---------------------------------------------------------------------- 266 267class SequencedWorkerPool::Inner { 268 public: 269 // Take a raw pointer to |worker| to avoid cycles (since we're owned 270 // by it). 271 Inner(SequencedWorkerPool* worker_pool, size_t max_threads, 272 const std::string& thread_name_prefix, 273 TestingObserver* observer); 274 275 ~Inner(); 276 277 SequenceToken GetSequenceToken(); 278 279 SequenceToken GetNamedSequenceToken(const std::string& name); 280 281 // This function accepts a name and an ID. If the name is null, the 282 // token ID is used. This allows us to implement the optional name lookup 283 // from a single function without having to enter the lock a separate time. 284 bool PostTask(const std::string* optional_token_name, 285 SequenceToken sequence_token, 286 WorkerShutdown shutdown_behavior, 287 const tracked_objects::Location& from_here, 288 const Closure& task, 289 TimeDelta delay); 290 291 bool RunsTasksOnCurrentThread() const; 292 293 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; 294 295 void CleanupForTesting(); 296 297 void SignalHasWorkForTesting(); 298 299 int GetWorkSignalCountForTesting() const; 300 301 void Shutdown(int max_blocking_tasks_after_shutdown); 302 303 bool IsShutdownInProgress(); 304 305 // Runs the worker loop on the background thread. 306 void ThreadLoop(Worker* this_worker); 307 308 private: 309 enum GetWorkStatus { 310 GET_WORK_FOUND, 311 GET_WORK_NOT_FOUND, 312 GET_WORK_WAIT, 313 }; 314 315 enum CleanupState { 316 CLEANUP_REQUESTED, 317 CLEANUP_STARTING, 318 CLEANUP_RUNNING, 319 CLEANUP_FINISHING, 320 CLEANUP_DONE, 321 }; 322 323 // Called from within the lock, this converts the given token name into a 324 // token ID, creating a new one if necessary. 325 int LockedGetNamedTokenID(const std::string& name); 326 327 // Called from within the lock, this returns the next sequence task number. 328 int64 LockedGetNextSequenceTaskNumber(); 329 330 // Called from within the lock, returns the shutdown behavior of the task 331 // running on the currently executing worker thread. If invoked from a thread 332 // that is not one of the workers, returns CONTINUE_ON_SHUTDOWN. 333 WorkerShutdown LockedCurrentThreadShutdownBehavior() const; 334 335 // Gets new task. There are 3 cases depending on the return value: 336 // 337 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should 338 // be run immediately. 339 // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run, 340 // and |task| is not filled in. In this case, the caller should wait until 341 // a task is posted. 342 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run 343 // immediately, and |task| is not filled in. Likewise, |wait_time| is 344 // filled in the time to wait until the next task to run. In this case, the 345 // caller should wait the time. 346 // 347 // In any case, the calling code should clear the given 348 // delete_these_outside_lock vector the next time the lock is released. 349 // See the implementation for a more detailed description. 350 GetWorkStatus GetWork(SequencedTask* task, 351 TimeDelta* wait_time, 352 std::vector<Closure>* delete_these_outside_lock); 353 354 void HandleCleanup(); 355 356 // Peforms init and cleanup around running the given task. WillRun... 357 // returns the value from PrepareToStartAdditionalThreadIfNecessary. 358 // The calling code should call FinishStartingAdditionalThread once the 359 // lock is released if the return values is nonzero. 360 int WillRunWorkerTask(const SequencedTask& task); 361 void DidRunWorkerTask(const SequencedTask& task); 362 363 // Returns true if there are no threads currently running the given 364 // sequence token. 365 bool IsSequenceTokenRunnable(int sequence_token_id) const; 366 367 // Checks if all threads are busy and the addition of one more could run an 368 // additional task waiting in the queue. This must be called from within 369 // the lock. 370 // 371 // If another thread is helpful, this will mark the thread as being in the 372 // process of starting and returns the index of the new thread which will be 373 // 0 or more. The caller should then call FinishStartingAdditionalThread to 374 // complete initialization once the lock is released. 375 // 376 // If another thread is not necessary, returne 0; 377 // 378 // See the implementedion for more. 379 int PrepareToStartAdditionalThreadIfHelpful(); 380 381 // The second part of thread creation after 382 // PrepareToStartAdditionalThreadIfHelpful with the thread number it 383 // generated. This actually creates the thread and should be called outside 384 // the lock to avoid blocking important work starting a thread in the lock. 385 void FinishStartingAdditionalThread(int thread_number); 386 387 // Signal |has_work_| and increment |has_work_signal_count_|. 388 void SignalHasWork(); 389 390 // Checks whether there is work left that's blocking shutdown. Must be 391 // called inside the lock. 392 bool CanShutdown() const; 393 394 SequencedWorkerPool* const worker_pool_; 395 396 // The last sequence number used. Managed by GetSequenceToken, since this 397 // only does threadsafe increment operations, you do not need to hold the 398 // lock. This is class-static to make SequenceTokens issued by 399 // GetSequenceToken unique across SequencedWorkerPool instances. 400 static base::StaticAtomicSequenceNumber g_last_sequence_number_; 401 402 // This lock protects |everything in this class|. Do not read or modify 403 // anything without holding this lock. Do not block while holding this 404 // lock. 405 mutable Lock lock_; 406 407 // Condition variable that is waited on by worker threads until new 408 // tasks are posted or shutdown starts. 409 ConditionVariable has_work_cv_; 410 411 // Condition variable that is waited on by non-worker threads (in 412 // Shutdown()) until CanShutdown() goes to true. 413 ConditionVariable can_shutdown_cv_; 414 415 // The maximum number of worker threads we'll create. 416 const size_t max_threads_; 417 418 const std::string thread_name_prefix_; 419 420 // Associates all known sequence token names with their IDs. 421 std::map<std::string, int> named_sequence_tokens_; 422 423 // Owning pointers to all threads we've created so far, indexed by 424 // ID. Since we lazily create threads, this may be less than 425 // max_threads_ and will be initially empty. 426 typedef std::map<PlatformThreadId, linked_ptr<Worker> > ThreadMap; 427 ThreadMap threads_; 428 429 // Set to true when we're in the process of creating another thread. 430 // See PrepareToStartAdditionalThreadIfHelpful for more. 431 bool thread_being_created_; 432 433 // Number of threads currently waiting for work. 434 size_t waiting_thread_count_; 435 436 // Number of threads currently running tasks that have the BLOCK_SHUTDOWN 437 // or SKIP_ON_SHUTDOWN flag set. 438 size_t blocking_shutdown_thread_count_; 439 440 // A set of all pending tasks in time-to-run order. These are tasks that are 441 // either waiting for a thread to run on, waiting for their time to run, 442 // or blocked on a previous task in their sequence. We have to iterate over 443 // the tasks by time-to-run order, so we use the set instead of the 444 // traditional priority_queue. 445 typedef std::set<SequencedTask, SequencedTaskLessThan> PendingTaskSet; 446 PendingTaskSet pending_tasks_; 447 448 // The next sequence number for a new sequenced task. 449 int64 next_sequence_task_number_; 450 451 // Number of tasks in the pending_tasks_ list that are marked as blocking 452 // shutdown. 453 size_t blocking_shutdown_pending_task_count_; 454 455 // Lists all sequence tokens currently executing. 456 std::set<int> current_sequences_; 457 458 // An ID for each posted task to distinguish the task from others in traces. 459 int trace_id_; 460 461 // Set when Shutdown is called and no further tasks should be 462 // allowed, though we may still be running existing tasks. 463 bool shutdown_called_; 464 465 // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown() 466 // has been called. 467 int max_blocking_tasks_after_shutdown_; 468 469 // State used to cleanup for testing, all guarded by lock_. 470 CleanupState cleanup_state_; 471 size_t cleanup_idlers_; 472 ConditionVariable cleanup_cv_; 473 474 TestingObserver* const testing_observer_; 475 476 DISALLOW_COPY_AND_ASSIGN(Inner); 477}; 478 479// Worker definitions --------------------------------------------------------- 480 481SequencedWorkerPool::Worker::Worker( 482 const scoped_refptr<SequencedWorkerPool>& worker_pool, 483 int thread_number, 484 const std::string& prefix) 485 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)), 486 worker_pool_(worker_pool), 487 running_shutdown_behavior_(CONTINUE_ON_SHUTDOWN) { 488 Start(); 489} 490 491SequencedWorkerPool::Worker::~Worker() { 492} 493 494void SequencedWorkerPool::Worker::Run() { 495#if defined(OS_WIN) 496 win::ScopedCOMInitializer com_initializer; 497#endif 498 499 // Store a pointer to the running sequence in thread local storage for 500 // static function access. 501 g_lazy_tls_ptr.Get().Set(&running_sequence_); 502 503 // Just jump back to the Inner object to run the thread, since it has all the 504 // tracking information and queues. It might be more natural to implement 505 // using DelegateSimpleThread and have Inner implement the Delegate to avoid 506 // having these worker objects at all, but that method lacks the ability to 507 // send thread-specific information easily to the thread loop. 508 worker_pool_->inner_->ThreadLoop(this); 509 // Release our cyclic reference once we're done. 510 worker_pool_ = NULL; 511} 512 513// Inner definitions --------------------------------------------------------- 514 515SequencedWorkerPool::Inner::Inner( 516 SequencedWorkerPool* worker_pool, 517 size_t max_threads, 518 const std::string& thread_name_prefix, 519 TestingObserver* observer) 520 : worker_pool_(worker_pool), 521 lock_(), 522 has_work_cv_(&lock_), 523 can_shutdown_cv_(&lock_), 524 max_threads_(max_threads), 525 thread_name_prefix_(thread_name_prefix), 526 thread_being_created_(false), 527 waiting_thread_count_(0), 528 blocking_shutdown_thread_count_(0), 529 next_sequence_task_number_(0), 530 blocking_shutdown_pending_task_count_(0), 531 trace_id_(0), 532 shutdown_called_(false), 533 max_blocking_tasks_after_shutdown_(0), 534 cleanup_state_(CLEANUP_DONE), 535 cleanup_idlers_(0), 536 cleanup_cv_(&lock_), 537 testing_observer_(observer) {} 538 539SequencedWorkerPool::Inner::~Inner() { 540 // You must call Shutdown() before destroying the pool. 541 DCHECK(shutdown_called_); 542 543 // Need to explicitly join with the threads before they're destroyed or else 544 // they will be running when our object is half torn down. 545 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it) 546 it->second->Join(); 547 threads_.clear(); 548 549 if (testing_observer_) 550 testing_observer_->OnDestruct(); 551} 552 553SequencedWorkerPool::SequenceToken 554SequencedWorkerPool::Inner::GetSequenceToken() { 555 // Need to add one because StaticAtomicSequenceNumber starts at zero, which 556 // is used as a sentinel value in SequenceTokens. 557 return SequenceToken(g_last_sequence_number_.GetNext() + 1); 558} 559 560SequencedWorkerPool::SequenceToken 561SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) { 562 AutoLock lock(lock_); 563 return SequenceToken(LockedGetNamedTokenID(name)); 564} 565 566bool SequencedWorkerPool::Inner::PostTask( 567 const std::string* optional_token_name, 568 SequenceToken sequence_token, 569 WorkerShutdown shutdown_behavior, 570 const tracked_objects::Location& from_here, 571 const Closure& task, 572 TimeDelta delay) { 573 DCHECK(delay == TimeDelta() || shutdown_behavior == SKIP_ON_SHUTDOWN); 574 SequencedTask sequenced(from_here); 575 sequenced.sequence_token_id = sequence_token.id_; 576 sequenced.shutdown_behavior = shutdown_behavior; 577 sequenced.posted_from = from_here; 578 sequenced.task = 579 shutdown_behavior == BLOCK_SHUTDOWN ? 580 base::MakeCriticalClosure(task) : task; 581 sequenced.time_to_run = TimeTicks::Now() + delay; 582 583 int create_thread_id = 0; 584 { 585 AutoLock lock(lock_); 586 if (shutdown_called_) { 587 if (shutdown_behavior != BLOCK_SHUTDOWN || 588 LockedCurrentThreadShutdownBehavior() == CONTINUE_ON_SHUTDOWN) { 589 return false; 590 } 591 if (max_blocking_tasks_after_shutdown_ <= 0) { 592 DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed"; 593 return false; 594 } 595 max_blocking_tasks_after_shutdown_ -= 1; 596 } 597 598 // The trace_id is used for identifying the task in about:tracing. 599 sequenced.trace_id = trace_id_++; 600 601 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), 602 "SequencedWorkerPool::PostTask", 603 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this)))); 604 605 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber(); 606 607 // Now that we have the lock, apply the named token rules. 608 if (optional_token_name) 609 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name); 610 611 pending_tasks_.insert(sequenced); 612 if (shutdown_behavior == BLOCK_SHUTDOWN) 613 blocking_shutdown_pending_task_count_++; 614 615 create_thread_id = PrepareToStartAdditionalThreadIfHelpful(); 616 } 617 618 // Actually start the additional thread or signal an existing one now that 619 // we're outside the lock. 620 if (create_thread_id) 621 FinishStartingAdditionalThread(create_thread_id); 622 else 623 SignalHasWork(); 624 625 return true; 626} 627 628bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const { 629 AutoLock lock(lock_); 630 return ContainsKey(threads_, PlatformThread::CurrentId()); 631} 632 633bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread( 634 SequenceToken sequence_token) const { 635 AutoLock lock(lock_); 636 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 637 if (found == threads_.end()) 638 return false; 639 return sequence_token.Equals(found->second->running_sequence()); 640} 641 642// See https://code.google.com/p/chromium/issues/detail?id=168415 643void SequencedWorkerPool::Inner::CleanupForTesting() { 644 DCHECK(!RunsTasksOnCurrentThread()); 645 base::ThreadRestrictions::ScopedAllowWait allow_wait; 646 AutoLock lock(lock_); 647 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 648 if (shutdown_called_) 649 return; 650 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size()) 651 return; 652 cleanup_state_ = CLEANUP_REQUESTED; 653 cleanup_idlers_ = 0; 654 has_work_cv_.Signal(); 655 while (cleanup_state_ != CLEANUP_DONE) 656 cleanup_cv_.Wait(); 657} 658 659void SequencedWorkerPool::Inner::SignalHasWorkForTesting() { 660 SignalHasWork(); 661} 662 663void SequencedWorkerPool::Inner::Shutdown( 664 int max_new_blocking_tasks_after_shutdown) { 665 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0); 666 { 667 AutoLock lock(lock_); 668 // Cleanup and Shutdown should not be called concurrently. 669 CHECK_EQ(CLEANUP_DONE, cleanup_state_); 670 if (shutdown_called_) 671 return; 672 shutdown_called_ = true; 673 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown; 674 675 // Tickle the threads. This will wake up a waiting one so it will know that 676 // it can exit, which in turn will wake up any other waiting ones. 677 SignalHasWork(); 678 679 // There are no pending or running tasks blocking shutdown, we're done. 680 if (CanShutdown()) 681 return; 682 } 683 684 // If we're here, then something is blocking shutdown. So wait for 685 // CanShutdown() to go to true. 686 687 if (testing_observer_) 688 testing_observer_->WillWaitForShutdown(); 689 690#if !defined(OS_NACL) 691 TimeTicks shutdown_wait_begin = TimeTicks::Now(); 692#endif 693 694 { 695 base::ThreadRestrictions::ScopedAllowWait allow_wait; 696 AutoLock lock(lock_); 697 while (!CanShutdown()) 698 can_shutdown_cv_.Wait(); 699 } 700#if !defined(OS_NACL) 701 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime", 702 TimeTicks::Now() - shutdown_wait_begin); 703#endif 704} 705 706bool SequencedWorkerPool::Inner::IsShutdownInProgress() { 707 AutoLock lock(lock_); 708 return shutdown_called_; 709} 710 711void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) { 712 { 713 AutoLock lock(lock_); 714 DCHECK(thread_being_created_); 715 thread_being_created_ = false; 716 std::pair<ThreadMap::iterator, bool> result = 717 threads_.insert( 718 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker))); 719 DCHECK(result.second); 720 721 while (true) { 722#if defined(OS_MACOSX) 723 base::mac::ScopedNSAutoreleasePool autorelease_pool; 724#endif 725 726 HandleCleanup(); 727 728 // See GetWork for what delete_these_outside_lock is doing. 729 SequencedTask task; 730 TimeDelta wait_time; 731 std::vector<Closure> delete_these_outside_lock; 732 GetWorkStatus status = 733 GetWork(&task, &wait_time, &delete_these_outside_lock); 734 if (status == GET_WORK_FOUND) { 735 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"), 736 "SequencedWorkerPool::PostTask", 737 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this)))); 738 TRACE_EVENT2("toplevel", "SequencedWorkerPool::ThreadLoop", 739 "src_file", task.posted_from.file_name(), 740 "src_func", task.posted_from.function_name()); 741 int new_thread_id = WillRunWorkerTask(task); 742 { 743 AutoUnlock unlock(lock_); 744 // There may be more work available, so wake up another 745 // worker thread. (Technically not required, since we 746 // already get a signal for each new task, but it doesn't 747 // hurt.) 748 SignalHasWork(); 749 delete_these_outside_lock.clear(); 750 751 // Complete thread creation outside the lock if necessary. 752 if (new_thread_id) 753 FinishStartingAdditionalThread(new_thread_id); 754 755 this_worker->set_running_task_info( 756 SequenceToken(task.sequence_token_id), task.shutdown_behavior); 757 758 tracked_objects::TrackedTime start_time = 759 tracked_objects::ThreadData::NowForStartOfRun(task.birth_tally); 760 761 task.task.Run(); 762 763 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(task, 764 start_time, tracked_objects::ThreadData::NowForEndOfRun()); 765 766 // Make sure our task is erased outside the lock for the 767 // same reason we do this with delete_these_oustide_lock. 768 // Also, do it before calling set_running_task_info() so 769 // that sequence-checking from within the task's destructor 770 // still works. 771 task.task = Closure(); 772 773 this_worker->set_running_task_info( 774 SequenceToken(), CONTINUE_ON_SHUTDOWN); 775 } 776 DidRunWorkerTask(task); // Must be done inside the lock. 777 } else if (cleanup_state_ == CLEANUP_RUNNING) { 778 switch (status) { 779 case GET_WORK_WAIT: { 780 AutoUnlock unlock(lock_); 781 delete_these_outside_lock.clear(); 782 } 783 break; 784 case GET_WORK_NOT_FOUND: 785 CHECK(delete_these_outside_lock.empty()); 786 cleanup_state_ = CLEANUP_FINISHING; 787 cleanup_cv_.Broadcast(); 788 break; 789 default: 790 NOTREACHED(); 791 } 792 } else { 793 // When we're terminating and there's no more work, we can 794 // shut down, other workers can complete any pending or new tasks. 795 // We can get additional tasks posted after shutdown_called_ is set 796 // but only worker threads are allowed to post tasks at that time, and 797 // the workers responsible for posting those tasks will be available 798 // to run them. Also, there may be some tasks stuck behind running 799 // ones with the same sequence token, but additional threads won't 800 // help this case. 801 if (shutdown_called_ && 802 blocking_shutdown_pending_task_count_ == 0) 803 break; 804 waiting_thread_count_++; 805 806 switch (status) { 807 case GET_WORK_NOT_FOUND: 808 has_work_cv_.Wait(); 809 break; 810 case GET_WORK_WAIT: 811 has_work_cv_.TimedWait(wait_time); 812 break; 813 default: 814 NOTREACHED(); 815 } 816 waiting_thread_count_--; 817 } 818 } 819 } // Release lock_. 820 821 // We noticed we should exit. Wake up the next worker so it knows it should 822 // exit as well (because the Shutdown() code only signals once). 823 SignalHasWork(); 824 825 // Possibly unblock shutdown. 826 can_shutdown_cv_.Signal(); 827} 828 829void SequencedWorkerPool::Inner::HandleCleanup() { 830 lock_.AssertAcquired(); 831 if (cleanup_state_ == CLEANUP_DONE) 832 return; 833 if (cleanup_state_ == CLEANUP_REQUESTED) { 834 // We win, we get to do the cleanup as soon as the others wise up and idle. 835 cleanup_state_ = CLEANUP_STARTING; 836 while (thread_being_created_ || 837 cleanup_idlers_ != threads_.size() - 1) { 838 has_work_cv_.Signal(); 839 cleanup_cv_.Wait(); 840 } 841 cleanup_state_ = CLEANUP_RUNNING; 842 return; 843 } 844 if (cleanup_state_ == CLEANUP_STARTING) { 845 // Another worker thread is cleaning up, we idle here until thats done. 846 ++cleanup_idlers_; 847 cleanup_cv_.Broadcast(); 848 while (cleanup_state_ != CLEANUP_FINISHING) { 849 cleanup_cv_.Wait(); 850 } 851 --cleanup_idlers_; 852 cleanup_cv_.Broadcast(); 853 return; 854 } 855 if (cleanup_state_ == CLEANUP_FINISHING) { 856 // We wait for all idlers to wake up prior to being DONE. 857 while (cleanup_idlers_ != 0) { 858 cleanup_cv_.Broadcast(); 859 cleanup_cv_.Wait(); 860 } 861 if (cleanup_state_ == CLEANUP_FINISHING) { 862 cleanup_state_ = CLEANUP_DONE; 863 cleanup_cv_.Signal(); 864 } 865 return; 866 } 867} 868 869int SequencedWorkerPool::Inner::LockedGetNamedTokenID( 870 const std::string& name) { 871 lock_.AssertAcquired(); 872 DCHECK(!name.empty()); 873 874 std::map<std::string, int>::const_iterator found = 875 named_sequence_tokens_.find(name); 876 if (found != named_sequence_tokens_.end()) 877 return found->second; // Got an existing one. 878 879 // Create a new one for this name. 880 SequenceToken result = GetSequenceToken(); 881 named_sequence_tokens_.insert(std::make_pair(name, result.id_)); 882 return result.id_; 883} 884 885int64 SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() { 886 lock_.AssertAcquired(); 887 // We assume that we never create enough tasks to wrap around. 888 return next_sequence_task_number_++; 889} 890 891SequencedWorkerPool::WorkerShutdown 892SequencedWorkerPool::Inner::LockedCurrentThreadShutdownBehavior() const { 893 lock_.AssertAcquired(); 894 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId()); 895 if (found == threads_.end()) 896 return CONTINUE_ON_SHUTDOWN; 897 return found->second->running_shutdown_behavior(); 898} 899 900SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork( 901 SequencedTask* task, 902 TimeDelta* wait_time, 903 std::vector<Closure>* delete_these_outside_lock) { 904 lock_.AssertAcquired(); 905 906#if !defined(OS_NACL) 907 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount", 908 static_cast<int>(pending_tasks_.size())); 909#endif 910 911 // Find the next task with a sequence token that's not currently in use. 912 // If the token is in use, that means another thread is running something 913 // in that sequence, and we can't run it without going out-of-order. 914 // 915 // This algorithm is simple and fair, but inefficient in some cases. For 916 // example, say somebody schedules 1000 slow tasks with the same sequence 917 // number. We'll have to go through all those tasks each time we feel like 918 // there might be work to schedule. If this proves to be a problem, we 919 // should make this more efficient. 920 // 921 // One possible enhancement would be to keep a map from sequence ID to a 922 // list of pending but currently blocked SequencedTasks for that ID. 923 // When a worker finishes a task of one sequence token, it can pick up the 924 // next one from that token right away. 925 // 926 // This may lead to starvation if there are sufficient numbers of sequences 927 // in use. To alleviate this, we could add an incrementing priority counter 928 // to each SequencedTask. Then maintain a priority_queue of all runnable 929 // tasks, sorted by priority counter. When a sequenced task is completed 930 // we would pop the head element off of that tasks pending list and add it 931 // to the priority queue. Then we would run the first item in the priority 932 // queue. 933 934 GetWorkStatus status = GET_WORK_NOT_FOUND; 935 int unrunnable_tasks = 0; 936 PendingTaskSet::iterator i = pending_tasks_.begin(); 937 // We assume that the loop below doesn't take too long and so we can just do 938 // a single call to TimeTicks::Now(). 939 const TimeTicks current_time = TimeTicks::Now(); 940 while (i != pending_tasks_.end()) { 941 if (!IsSequenceTokenRunnable(i->sequence_token_id)) { 942 unrunnable_tasks++; 943 ++i; 944 continue; 945 } 946 947 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) { 948 // We're shutting down and the task we just found isn't blocking 949 // shutdown. Delete it and get more work. 950 // 951 // Note that we do not want to delete unrunnable tasks. Deleting a task 952 // can have side effects (like freeing some objects) and deleting a 953 // task that's supposed to run after one that's currently running could 954 // cause an obscure crash. 955 // 956 // We really want to delete these tasks outside the lock in case the 957 // closures are holding refs to objects that want to post work from 958 // their destructorss (which would deadlock). The closures are 959 // internally refcounted, so we just need to keep a copy of them alive 960 // until the lock is exited. The calling code can just clear() the 961 // vector they passed to us once the lock is exited to make this 962 // happen. 963 delete_these_outside_lock->push_back(i->task); 964 pending_tasks_.erase(i++); 965 continue; 966 } 967 968 if (i->time_to_run > current_time) { 969 // The time to run has not come yet. 970 *wait_time = i->time_to_run - current_time; 971 status = GET_WORK_WAIT; 972 if (cleanup_state_ == CLEANUP_RUNNING) { 973 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop. 974 delete_these_outside_lock->push_back(i->task); 975 pending_tasks_.erase(i); 976 } 977 break; 978 } 979 980 // Found a runnable task. 981 *task = *i; 982 pending_tasks_.erase(i); 983 if (task->shutdown_behavior == BLOCK_SHUTDOWN) { 984 blocking_shutdown_pending_task_count_--; 985 } 986 987 status = GET_WORK_FOUND; 988 break; 989 } 990 991 // Track the number of tasks we had to skip over to see if we should be 992 // making this more efficient. If this number ever becomes large or is 993 // frequently "some", we should consider the optimization above. 994#if !defined(OS_NACL) 995 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount", 996 unrunnable_tasks); 997#endif 998 return status; 999} 1000 1001int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) { 1002 lock_.AssertAcquired(); 1003 1004 // Mark the task's sequence number as in use. 1005 if (task.sequence_token_id) 1006 current_sequences_.insert(task.sequence_token_id); 1007 1008 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN 1009 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread 1010 // completes. 1011 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) 1012 blocking_shutdown_thread_count_++; 1013 1014 // We just picked up a task. Since StartAdditionalThreadIfHelpful only 1015 // creates a new thread if there is no free one, there is a race when posting 1016 // tasks that many tasks could have been posted before a thread started 1017 // running them, so only one thread would have been created. So we also check 1018 // whether we should create more threads after removing our task from the 1019 // queue, which also has the nice side effect of creating the workers from 1020 // background threads rather than the main thread of the app. 1021 // 1022 // If another thread wasn't created, we want to wake up an existing thread 1023 // if there is one waiting to pick up the next task. 1024 // 1025 // Note that we really need to do this *before* running the task, not 1026 // after. Otherwise, if more than one task is posted, the creation of the 1027 // second thread (since we only create one at a time) will be blocked by 1028 // the execution of the first task, which could be arbitrarily long. 1029 return PrepareToStartAdditionalThreadIfHelpful(); 1030} 1031 1032void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) { 1033 lock_.AssertAcquired(); 1034 1035 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) { 1036 DCHECK_GT(blocking_shutdown_thread_count_, 0u); 1037 blocking_shutdown_thread_count_--; 1038 } 1039 1040 if (task.sequence_token_id) 1041 current_sequences_.erase(task.sequence_token_id); 1042} 1043 1044bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable( 1045 int sequence_token_id) const { 1046 lock_.AssertAcquired(); 1047 return !sequence_token_id || 1048 current_sequences_.find(sequence_token_id) == 1049 current_sequences_.end(); 1050} 1051 1052int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() { 1053 lock_.AssertAcquired(); 1054 // How thread creation works: 1055 // 1056 // We'de like to avoid creating threads with the lock held. However, we 1057 // need to be sure that we have an accurate accounting of the threads for 1058 // proper Joining and deltion on shutdown. 1059 // 1060 // We need to figure out if we need another thread with the lock held, which 1061 // is what this function does. It then marks us as in the process of creating 1062 // a thread. When we do shutdown, we wait until the thread_being_created_ 1063 // flag is cleared, which ensures that the new thread is properly added to 1064 // all the data structures and we can't leak it. Once shutdown starts, we'll 1065 // refuse to create more threads or they would be leaked. 1066 // 1067 // Note that this creates a mostly benign race condition on shutdown that 1068 // will cause fewer workers to be created than one would expect. It isn't 1069 // much of an issue in real life, but affects some tests. Since we only spawn 1070 // one worker at a time, the following sequence of events can happen: 1071 // 1072 // 1. Main thread posts a bunch of unrelated tasks that would normally be 1073 // run on separate threads. 1074 // 2. The first task post causes us to start a worker. Other tasks do not 1075 // cause a worker to start since one is pending. 1076 // 3. Main thread initiates shutdown. 1077 // 4. No more threads are created since the shutdown_called_ flag is set. 1078 // 1079 // The result is that one may expect that max_threads_ workers to be created 1080 // given the workload, but in reality fewer may be created because the 1081 // sequence of thread creation on the background threads is racing with the 1082 // shutdown call. 1083 if (!shutdown_called_ && 1084 !thread_being_created_ && 1085 cleanup_state_ == CLEANUP_DONE && 1086 threads_.size() < max_threads_ && 1087 waiting_thread_count_ == 0) { 1088 // We could use an additional thread if there's work to be done. 1089 for (PendingTaskSet::const_iterator i = pending_tasks_.begin(); 1090 i != pending_tasks_.end(); ++i) { 1091 if (IsSequenceTokenRunnable(i->sequence_token_id)) { 1092 // Found a runnable task, mark the thread as being started. 1093 thread_being_created_ = true; 1094 return static_cast<int>(threads_.size() + 1); 1095 } 1096 } 1097 } 1098 return 0; 1099} 1100 1101void SequencedWorkerPool::Inner::FinishStartingAdditionalThread( 1102 int thread_number) { 1103 // Called outside of the lock. 1104 DCHECK(thread_number > 0); 1105 1106 // The worker is assigned to the list when the thread actually starts, which 1107 // will manage the memory of the pointer. 1108 new Worker(worker_pool_, thread_number, thread_name_prefix_); 1109} 1110 1111void SequencedWorkerPool::Inner::SignalHasWork() { 1112 has_work_cv_.Signal(); 1113 if (testing_observer_) { 1114 testing_observer_->OnHasWork(); 1115 } 1116} 1117 1118bool SequencedWorkerPool::Inner::CanShutdown() const { 1119 lock_.AssertAcquired(); 1120 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works. 1121 return !thread_being_created_ && 1122 blocking_shutdown_thread_count_ == 0 && 1123 blocking_shutdown_pending_task_count_ == 0; 1124} 1125 1126base::StaticAtomicSequenceNumber 1127SequencedWorkerPool::Inner::g_last_sequence_number_; 1128 1129// SequencedWorkerPool -------------------------------------------------------- 1130 1131// static 1132SequencedWorkerPool::SequenceToken 1133SequencedWorkerPool::GetSequenceTokenForCurrentThread() { 1134 // Don't construct lazy instance on check. 1135 if (g_lazy_tls_ptr == NULL) 1136 return SequenceToken(); 1137 1138 SequencedWorkerPool::SequenceToken* token = g_lazy_tls_ptr.Get().Get(); 1139 if (!token) 1140 return SequenceToken(); 1141 return *token; 1142} 1143 1144SequencedWorkerPool::SequencedWorkerPool( 1145 size_t max_threads, 1146 const std::string& thread_name_prefix) 1147 : constructor_message_loop_(MessageLoopProxy::current()), 1148 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) { 1149} 1150 1151SequencedWorkerPool::SequencedWorkerPool( 1152 size_t max_threads, 1153 const std::string& thread_name_prefix, 1154 TestingObserver* observer) 1155 : constructor_message_loop_(MessageLoopProxy::current()), 1156 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) { 1157} 1158 1159SequencedWorkerPool::~SequencedWorkerPool() {} 1160 1161void SequencedWorkerPool::OnDestruct() const { 1162 DCHECK(constructor_message_loop_.get()); 1163 // Avoid deleting ourselves on a worker thread (which would 1164 // deadlock). 1165 if (RunsTasksOnCurrentThread()) { 1166 constructor_message_loop_->DeleteSoon(FROM_HERE, this); 1167 } else { 1168 delete this; 1169 } 1170} 1171 1172SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() { 1173 return inner_->GetSequenceToken(); 1174} 1175 1176SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken( 1177 const std::string& name) { 1178 return inner_->GetNamedSequenceToken(name); 1179} 1180 1181scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner( 1182 SequenceToken token) { 1183 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN); 1184} 1185 1186scoped_refptr<SequencedTaskRunner> 1187SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior( 1188 SequenceToken token, WorkerShutdown shutdown_behavior) { 1189 return new SequencedWorkerPoolSequencedTaskRunner( 1190 this, token, shutdown_behavior); 1191} 1192 1193scoped_refptr<TaskRunner> 1194SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior( 1195 WorkerShutdown shutdown_behavior) { 1196 return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior); 1197} 1198 1199bool SequencedWorkerPool::PostWorkerTask( 1200 const tracked_objects::Location& from_here, 1201 const Closure& task) { 1202 return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN, 1203 from_here, task, TimeDelta()); 1204} 1205 1206bool SequencedWorkerPool::PostDelayedWorkerTask( 1207 const tracked_objects::Location& from_here, 1208 const Closure& task, 1209 TimeDelta delay) { 1210 WorkerShutdown shutdown_behavior = 1211 delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN; 1212 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, 1213 from_here, task, delay); 1214} 1215 1216bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior( 1217 const tracked_objects::Location& from_here, 1218 const Closure& task, 1219 WorkerShutdown shutdown_behavior) { 1220 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior, 1221 from_here, task, TimeDelta()); 1222} 1223 1224bool SequencedWorkerPool::PostSequencedWorkerTask( 1225 SequenceToken sequence_token, 1226 const tracked_objects::Location& from_here, 1227 const Closure& task) { 1228 return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN, 1229 from_here, task, TimeDelta()); 1230} 1231 1232bool SequencedWorkerPool::PostDelayedSequencedWorkerTask( 1233 SequenceToken sequence_token, 1234 const tracked_objects::Location& from_here, 1235 const Closure& task, 1236 TimeDelta delay) { 1237 WorkerShutdown shutdown_behavior = 1238 delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN; 1239 return inner_->PostTask(NULL, sequence_token, shutdown_behavior, 1240 from_here, task, delay); 1241} 1242 1243bool SequencedWorkerPool::PostNamedSequencedWorkerTask( 1244 const std::string& token_name, 1245 const tracked_objects::Location& from_here, 1246 const Closure& task) { 1247 DCHECK(!token_name.empty()); 1248 return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN, 1249 from_here, task, TimeDelta()); 1250} 1251 1252bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior( 1253 SequenceToken sequence_token, 1254 const tracked_objects::Location& from_here, 1255 const Closure& task, 1256 WorkerShutdown shutdown_behavior) { 1257 return inner_->PostTask(NULL, sequence_token, shutdown_behavior, 1258 from_here, task, TimeDelta()); 1259} 1260 1261bool SequencedWorkerPool::PostDelayedTask( 1262 const tracked_objects::Location& from_here, 1263 const Closure& task, 1264 TimeDelta delay) { 1265 return PostDelayedWorkerTask(from_here, task, delay); 1266} 1267 1268bool SequencedWorkerPool::RunsTasksOnCurrentThread() const { 1269 return inner_->RunsTasksOnCurrentThread(); 1270} 1271 1272bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread( 1273 SequenceToken sequence_token) const { 1274 return inner_->IsRunningSequenceOnCurrentThread(sequence_token); 1275} 1276 1277void SequencedWorkerPool::FlushForTesting() { 1278 inner_->CleanupForTesting(); 1279} 1280 1281void SequencedWorkerPool::SignalHasWorkForTesting() { 1282 inner_->SignalHasWorkForTesting(); 1283} 1284 1285void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) { 1286 DCHECK(constructor_message_loop_->BelongsToCurrentThread()); 1287 inner_->Shutdown(max_new_blocking_tasks_after_shutdown); 1288} 1289 1290bool SequencedWorkerPool::IsShutdownInProgress() { 1291 return inner_->IsShutdownInProgress(); 1292} 1293 1294} // namespace base 1295