1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "cc/resources/worker_pool.h"
6
7#include <algorithm>
8#include <queue>
9
10#include "base/bind.h"
11#include "base/containers/hash_tables.h"
12#include "base/debug/trace_event.h"
13#include "base/strings/stringprintf.h"
14#include "base/synchronization/condition_variable.h"
15#include "base/threading/simple_thread.h"
16#include "base/threading/thread_restrictions.h"
17#include "cc/base/scoped_ptr_deque.h"
18
19namespace cc {
20
21namespace internal {
22
23WorkerPoolTask::WorkerPoolTask()
24    : did_schedule_(false),
25      did_run_(false),
26      did_complete_(false) {
27}
28
29WorkerPoolTask::~WorkerPoolTask() {
30  DCHECK_EQ(did_schedule_, did_complete_);
31  DCHECK(!did_run_ || did_schedule_);
32  DCHECK(!did_run_ || did_complete_);
33}
34
35void WorkerPoolTask::DidSchedule() {
36  DCHECK(!did_complete_);
37  did_schedule_ = true;
38}
39
40void WorkerPoolTask::WillRun() {
41  DCHECK(did_schedule_);
42  DCHECK(!did_complete_);
43  DCHECK(!did_run_);
44}
45
46void WorkerPoolTask::DidRun() {
47  did_run_ = true;
48}
49
50void WorkerPoolTask::WillComplete() {
51  DCHECK(!did_complete_);
52}
53
54void WorkerPoolTask::DidComplete() {
55  DCHECK(did_schedule_);
56  DCHECK(!did_complete_);
57  did_complete_ = true;
58}
59
60bool WorkerPoolTask::HasFinishedRunning() const {
61  return did_run_;
62}
63
64bool WorkerPoolTask::HasCompleted() const {
65  return did_complete_;
66}
67
68GraphNode::GraphNode(internal::WorkerPoolTask* task, unsigned priority)
69    : task_(task),
70      priority_(priority),
71      num_dependencies_(0) {
72}
73
74GraphNode::~GraphNode() {
75}
76
77}  // namespace internal
78
79// Internal to the worker pool. Any data or logic that needs to be
80// shared between threads lives in this class. All members are guarded
81// by |lock_|.
82class WorkerPool::Inner : public base::DelegateSimpleThread::Delegate {
83 public:
84  Inner(size_t num_threads, const std::string& thread_name_prefix);
85  virtual ~Inner();
86
87  void Shutdown();
88
89  // Schedule running of tasks in |graph|. Tasks previously scheduled but
90  // no longer needed will be canceled unless already running. Canceled
91  // tasks are moved to |completed_tasks_| without being run. The result
92  // is that once scheduled, a task is guaranteed to end up in the
93  // |completed_tasks_| queue even if they later get canceled by another
94  // call to SetTaskGraph().
95  void SetTaskGraph(TaskGraph* graph);
96
97  // Collect all completed tasks in |completed_tasks|.
98  void CollectCompletedTasks(TaskVector* completed_tasks);
99
100 private:
101  class PriorityComparator {
102   public:
103    bool operator()(const internal::GraphNode* a,
104                    const internal::GraphNode* b) {
105      // In this system, numerically lower priority is run first.
106      if (a->priority() != b->priority())
107        return a->priority() > b->priority();
108
109      // Run task with most dependents first when priority is the same.
110      return a->dependents().size() < b->dependents().size();
111    }
112  };
113
114  // Overridden from base::DelegateSimpleThread:
115  virtual void Run() OVERRIDE;
116
117  // This lock protects all members of this class except
118  // |worker_pool_on_origin_thread_|. Do not read or modify anything
119  // without holding this lock. Do not block while holding this lock.
120  mutable base::Lock lock_;
121
122  // Condition variable that is waited on by worker threads until new
123  // tasks are ready to run or shutdown starts.
124  base::ConditionVariable has_ready_to_run_tasks_cv_;
125
126  // Provides each running thread loop with a unique index. First thread
127  // loop index is 0.
128  unsigned next_thread_index_;
129
130  // Set during shutdown. Tells workers to exit when no more tasks
131  // are pending.
132  bool shutdown_;
133
134  // This set contains all pending tasks.
135  GraphNodeMap pending_tasks_;
136
137  // Ordered set of tasks that are ready to run.
138  typedef std::priority_queue<internal::GraphNode*,
139                              std::vector<internal::GraphNode*>,
140                              PriorityComparator> TaskQueue;
141  TaskQueue ready_to_run_tasks_;
142
143  // This set contains all currently running tasks.
144  GraphNodeMap running_tasks_;
145
146  // Completed tasks not yet collected by origin thread.
147  TaskVector completed_tasks_;
148
149  ScopedPtrDeque<base::DelegateSimpleThread> workers_;
150
151  DISALLOW_COPY_AND_ASSIGN(Inner);
152};
153
154WorkerPool::Inner::Inner(
155    size_t num_threads, const std::string& thread_name_prefix)
156    : lock_(),
157      has_ready_to_run_tasks_cv_(&lock_),
158      next_thread_index_(0),
159      shutdown_(false) {
160  base::AutoLock lock(lock_);
161
162  while (workers_.size() < num_threads) {
163    scoped_ptr<base::DelegateSimpleThread> worker = make_scoped_ptr(
164        new base::DelegateSimpleThread(
165            this,
166            thread_name_prefix +
167            base::StringPrintf(
168                "Worker%u",
169                static_cast<unsigned>(workers_.size() + 1)).c_str()));
170    worker->Start();
171#if defined(OS_ANDROID) || defined(OS_LINUX)
172    worker->SetThreadPriority(base::kThreadPriority_Background);
173#endif
174    workers_.push_back(worker.Pass());
175  }
176}
177
178WorkerPool::Inner::~Inner() {
179  base::AutoLock lock(lock_);
180
181  DCHECK(shutdown_);
182
183  DCHECK_EQ(0u, pending_tasks_.size());
184  DCHECK_EQ(0u, ready_to_run_tasks_.size());
185  DCHECK_EQ(0u, running_tasks_.size());
186  DCHECK_EQ(0u, completed_tasks_.size());
187}
188
189void WorkerPool::Inner::Shutdown() {
190  {
191    base::AutoLock lock(lock_);
192
193    DCHECK(!shutdown_);
194    shutdown_ = true;
195
196    // Wake up a worker so it knows it should exit. This will cause all workers
197    // to exit as each will wake up another worker before exiting.
198    has_ready_to_run_tasks_cv_.Signal();
199  }
200
201  while (workers_.size()) {
202    scoped_ptr<base::DelegateSimpleThread> worker = workers_.take_front();
203    // http://crbug.com/240453 - Join() is considered IO and will block this
204    // thread. See also http://crbug.com/239423 for further ideas.
205    base::ThreadRestrictions::ScopedAllowIO allow_io;
206    worker->Join();
207  }
208}
209
210void WorkerPool::Inner::SetTaskGraph(TaskGraph* graph) {
211  // It is OK to call SetTaskGraph() after shutdown if |graph| is empty.
212  DCHECK(graph->empty() || !shutdown_);
213
214  GraphNodeMap new_pending_tasks;
215  GraphNodeMap new_running_tasks;
216  TaskQueue new_ready_to_run_tasks;
217
218  new_pending_tasks.swap(*graph);
219
220  {
221    base::AutoLock lock(lock_);
222
223    // First remove all completed tasks from |new_pending_tasks| and
224    // adjust number of dependencies.
225    for (TaskVector::iterator it = completed_tasks_.begin();
226         it != completed_tasks_.end(); ++it) {
227      internal::WorkerPoolTask* task = it->get();
228
229      scoped_ptr<internal::GraphNode> node = new_pending_tasks.take_and_erase(
230          task);
231      if (node) {
232        for (internal::GraphNode::Vector::const_iterator it =
233                 node->dependents().begin();
234             it != node->dependents().end(); ++it) {
235          internal::GraphNode* dependent_node = *it;
236          dependent_node->remove_dependency();
237        }
238      }
239    }
240
241    // Build new running task set.
242    for (GraphNodeMap::iterator it = running_tasks_.begin();
243         it != running_tasks_.end(); ++it) {
244      internal::WorkerPoolTask* task = it->first;
245      // Transfer scheduled task value from |new_pending_tasks| to
246      // |new_running_tasks| if currently running. Value must be set to
247      // NULL if |new_pending_tasks| doesn't contain task. This does
248      // the right in both cases.
249      new_running_tasks.set(task, new_pending_tasks.take_and_erase(task));
250    }
251
252    // Build new "ready to run" tasks queue.
253    // TODO(reveman): Create this queue when building the task graph instead.
254    for (GraphNodeMap::iterator it = new_pending_tasks.begin();
255         it != new_pending_tasks.end(); ++it) {
256      internal::WorkerPoolTask* task = it->first;
257      DCHECK(task);
258      internal::GraphNode* node = it->second;
259
260      // Completed tasks should not exist in |new_pending_tasks|.
261      DCHECK(!task->HasFinishedRunning());
262
263      // Call DidSchedule() to indicate that this task has been scheduled.
264      // Note: This is only for debugging purposes.
265      task->DidSchedule();
266
267      if (!node->num_dependencies())
268        new_ready_to_run_tasks.push(node);
269
270      // Erase the task from old pending tasks.
271      pending_tasks_.erase(task);
272    }
273
274    completed_tasks_.reserve(completed_tasks_.size() + pending_tasks_.size());
275
276    // The items left in |pending_tasks_| need to be canceled.
277    for (GraphNodeMap::const_iterator it = pending_tasks_.begin();
278         it != pending_tasks_.end();
279         ++it) {
280      completed_tasks_.push_back(it->first);
281    }
282
283    // Swap task sets.
284    // Note: old tasks are intentionally destroyed after releasing |lock_|.
285    pending_tasks_.swap(new_pending_tasks);
286    running_tasks_.swap(new_running_tasks);
287    std::swap(ready_to_run_tasks_, new_ready_to_run_tasks);
288
289    // If |ready_to_run_tasks_| is empty, it means we either have
290    // running tasks, or we have no pending tasks.
291    DCHECK(!ready_to_run_tasks_.empty() ||
292           (pending_tasks_.empty() || !running_tasks_.empty()));
293
294    // If there is more work available, wake up worker thread.
295    if (!ready_to_run_tasks_.empty())
296      has_ready_to_run_tasks_cv_.Signal();
297  }
298}
299
300void WorkerPool::Inner::CollectCompletedTasks(TaskVector* completed_tasks) {
301  base::AutoLock lock(lock_);
302
303  DCHECK_EQ(0u, completed_tasks->size());
304  completed_tasks->swap(completed_tasks_);
305}
306
307void WorkerPool::Inner::Run() {
308  base::AutoLock lock(lock_);
309
310  // Get a unique thread index.
311  int thread_index = next_thread_index_++;
312
313  while (true) {
314    if (ready_to_run_tasks_.empty()) {
315      // Exit when shutdown is set and no more tasks are pending.
316      if (shutdown_ && pending_tasks_.empty())
317        break;
318
319      // Wait for more tasks.
320      has_ready_to_run_tasks_cv_.Wait();
321      continue;
322    }
323
324    // Take top priority task from |ready_to_run_tasks_|.
325    scoped_refptr<internal::WorkerPoolTask> task(
326        ready_to_run_tasks_.top()->task());
327    ready_to_run_tasks_.pop();
328
329    // Move task from |pending_tasks_| to |running_tasks_|.
330    DCHECK(pending_tasks_.contains(task.get()));
331    DCHECK(!running_tasks_.contains(task.get()));
332    running_tasks_.set(task.get(), pending_tasks_.take_and_erase(task.get()));
333
334    // There may be more work available, so wake up another worker thread.
335    has_ready_to_run_tasks_cv_.Signal();
336
337    // Call WillRun() before releasing |lock_| and running task.
338    task->WillRun();
339
340    {
341      base::AutoUnlock unlock(lock_);
342
343      task->RunOnWorkerThread(thread_index);
344    }
345
346    // This will mark task as finished running.
347    task->DidRun();
348
349    // Now iterate over all dependents to remove dependency and check
350    // if they are ready to run.
351    scoped_ptr<internal::GraphNode> node = running_tasks_.take_and_erase(
352        task.get());
353    if (node) {
354      for (internal::GraphNode::Vector::const_iterator it =
355               node->dependents().begin();
356           it != node->dependents().end(); ++it) {
357        internal::GraphNode* dependent_node = *it;
358
359        dependent_node->remove_dependency();
360        // Task is ready if it has no dependencies. Add it to
361        // |ready_to_run_tasks_|.
362        if (!dependent_node->num_dependencies())
363          ready_to_run_tasks_.push(dependent_node);
364      }
365    }
366
367    // Finally add task to |completed_tasks_|.
368    completed_tasks_.push_back(task);
369  }
370
371  // We noticed we should exit. Wake up the next worker so it knows it should
372  // exit as well (because the Shutdown() code only signals once).
373  has_ready_to_run_tasks_cv_.Signal();
374}
375
376WorkerPool::WorkerPool(size_t num_threads,
377                       const std::string& thread_name_prefix)
378    : in_dispatch_completion_callbacks_(false),
379      inner_(make_scoped_ptr(new Inner(num_threads, thread_name_prefix))) {
380}
381
382WorkerPool::~WorkerPool() {
383}
384
385void WorkerPool::Shutdown() {
386  TRACE_EVENT0("cc", "WorkerPool::Shutdown");
387
388  DCHECK(!in_dispatch_completion_callbacks_);
389
390  inner_->Shutdown();
391}
392
393void WorkerPool::CheckForCompletedTasks() {
394  TRACE_EVENT0("cc", "WorkerPool::CheckForCompletedTasks");
395
396  DCHECK(!in_dispatch_completion_callbacks_);
397
398  TaskVector completed_tasks;
399  inner_->CollectCompletedTasks(&completed_tasks);
400  ProcessCompletedTasks(completed_tasks);
401}
402
403void WorkerPool::ProcessCompletedTasks(
404    const TaskVector& completed_tasks) {
405  TRACE_EVENT1("cc", "WorkerPool::ProcessCompletedTasks",
406               "completed_task_count", completed_tasks.size());
407
408  // Worker pool instance is not reentrant while processing completed tasks.
409  in_dispatch_completion_callbacks_ = true;
410
411  for (TaskVector::const_iterator it = completed_tasks.begin();
412       it != completed_tasks.end();
413       ++it) {
414    internal::WorkerPoolTask* task = it->get();
415
416    task->WillComplete();
417    task->CompleteOnOriginThread();
418    task->DidComplete();
419  }
420
421  in_dispatch_completion_callbacks_ = false;
422}
423
424void WorkerPool::SetTaskGraph(TaskGraph* graph) {
425  TRACE_EVENT1("cc", "WorkerPool::SetTaskGraph",
426               "num_tasks", graph->size());
427
428  DCHECK(!in_dispatch_completion_callbacks_);
429
430  inner_->SetTaskGraph(graph);
431}
432
433}  // namespace cc
434