1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "remoting/base/plugin_thread_task_runner.h"
6
7#include "base/bind.h"
8
9namespace {
10
11base::TimeDelta CalcTimeDelta(base::TimeTicks when) {
12  return std::max(when - base::TimeTicks::Now(), base::TimeDelta());
13}
14
15}  // namespace
16
17namespace remoting {
18
19PluginThreadTaskRunner::Delegate::~Delegate() {
20}
21
22PluginThreadTaskRunner::PluginThreadTaskRunner(Delegate* delegate)
23    : plugin_thread_id_(base::PlatformThread::CurrentId()),
24      event_(false, false),
25      delegate_(delegate),
26      next_sequence_num_(0),
27      quit_received_(false),
28      stopped_(false) {
29}
30
31PluginThreadTaskRunner::~PluginThreadTaskRunner() {
32  DCHECK(delegate_ == NULL);
33  DCHECK(stopped_);
34}
35
36void PluginThreadTaskRunner::DetachAndRunShutdownLoop() {
37  DCHECK(BelongsToCurrentThread());
38
39  // Detach from the plugin thread and redirect all tasks posted after this
40  // point to the shutdown task loop.
41  {
42    base::AutoLock auto_lock(lock_);
43
44    DCHECK(delegate_ != NULL);
45    DCHECK(!stopped_);
46
47    delegate_ = NULL;
48    stopped_ = quit_received_;
49  }
50
51  // When DetachAndRunShutdownLoop() is called from NPP_Destroy() all scheduled
52  // timers are cancelled. It is OK to clear |scheduled_timers_| even if
53  // the timers weren't actually cancelled (i.e. DetachAndRunShutdownLoop() is
54  // called before NPP_Destroy()).
55  scheduled_timers_.clear();
56
57  // Run all tasks that are due.
58  ProcessIncomingTasks();
59  RunDueTasks(base::TimeTicks::Now());
60
61  while (!stopped_) {
62    if (delayed_queue_.empty()) {
63      event_.Wait();
64    } else {
65      event_.TimedWait(CalcTimeDelta(delayed_queue_.top().delayed_run_time));
66    }
67
68    // Run all tasks that are due.
69    ProcessIncomingTasks();
70    RunDueTasks(base::TimeTicks::Now());
71
72    base::AutoLock auto_lock(lock_);
73    stopped_ = quit_received_;
74  }
75}
76
77void PluginThreadTaskRunner::Quit() {
78  base::AutoLock auto_lock(lock_);
79
80  if (!quit_received_) {
81    quit_received_ = true;
82    event_.Signal();
83  }
84}
85
86bool PluginThreadTaskRunner::PostDelayedTask(
87    const tracked_objects::Location& from_here,
88    const base::Closure& task,
89    base::TimeDelta delay) {
90
91  // Wrap the task into |base::PendingTask|.
92  base::TimeTicks delayed_run_time;
93  if (delay > base::TimeDelta()) {
94    delayed_run_time = base::TimeTicks::Now() + delay;
95  } else {
96    DCHECK_EQ(delay.InMilliseconds(), 0) << "delay should not be negative";
97  }
98
99  base::PendingTask pending_task(from_here, task, delayed_run_time, false);
100
101  // Push the task to the incoming queue.
102  base::AutoLock locked(lock_);
103
104  // Initialize the sequence number. The sequence number provides FIFO ordering
105  // for tasks with the same |delayed_run_time|.
106  pending_task.sequence_num = next_sequence_num_++;
107
108  // Post an asynchronous call on the plugin thread to process the task.
109  if (incoming_queue_.empty()) {
110    PostRunTasks();
111  }
112
113  incoming_queue_.push(pending_task);
114  pending_task.task.Reset();
115
116  // No tasks should be posted after Quit() has been called.
117  DCHECK(!quit_received_);
118  return true;
119}
120
121bool PluginThreadTaskRunner::PostNonNestableDelayedTask(
122    const tracked_objects::Location& from_here,
123    const base::Closure& task,
124    base::TimeDelta delay) {
125  // All tasks running on this task loop are non-nestable.
126  return PostDelayedTask(from_here, task, delay);
127}
128
129bool PluginThreadTaskRunner::RunsTasksOnCurrentThread() const {
130  // In pepper plugins ideally we should use pp::Core::IsMainThread,
131  // but it is problematic because we would need to keep reference to
132  // Core somewhere, e.g. make the delegate ref-counted.
133  return base::PlatformThread::CurrentId() == plugin_thread_id_;
134}
135
136void PluginThreadTaskRunner::PostRunTasks() {
137  // Post tasks to the plugin thread when it is availabe or spin the shutdown
138  // task loop.
139  if (delegate_ != NULL) {
140    base::Closure closure = base::Bind(&PluginThreadTaskRunner::RunTasks, this);
141    delegate_->RunOnPluginThread(
142        base::TimeDelta(),
143        &PluginThreadTaskRunner::TaskSpringboard,
144        new base::Closure(closure));
145  } else {
146    event_.Signal();
147  }
148}
149
150void PluginThreadTaskRunner::PostDelayedRunTasks(base::TimeTicks when) {
151  DCHECK(BelongsToCurrentThread());
152
153  // |delegate_| is updated from the plugin thread only, so it is safe to access
154  // it here without taking the lock.
155  if (delegate_ != NULL) {
156    // Schedule RunDelayedTasks() to be called at |when| if it hasn't been
157    // scheduled already.
158    if (scheduled_timers_.insert(when).second) {
159      base::TimeDelta delay = CalcTimeDelta(when);
160      base::Closure closure =
161          base::Bind(&PluginThreadTaskRunner::RunDelayedTasks, this, when);
162      delegate_->RunOnPluginThread(
163          delay,
164          &PluginThreadTaskRunner::TaskSpringboard,
165          new base::Closure(closure));
166    }
167  } else {
168    // Spin the shutdown loop if the task runner has already been detached.
169    // The shutdown loop will pick the tasks to run itself.
170    event_.Signal();
171  }
172}
173
174void PluginThreadTaskRunner::ProcessIncomingTasks() {
175  DCHECK(BelongsToCurrentThread());
176
177  // Grab all unsorted tasks accomulated so far.
178  base::TaskQueue work_queue;
179  {
180    base::AutoLock locked(lock_);
181    incoming_queue_.Swap(&work_queue);
182  }
183
184  while (!work_queue.empty()) {
185    base::PendingTask pending_task = work_queue.front();
186    work_queue.pop();
187
188    if (pending_task.delayed_run_time.is_null()) {
189      pending_task.task.Run();
190    } else {
191      delayed_queue_.push(pending_task);
192    }
193  }
194}
195
196void PluginThreadTaskRunner::RunDelayedTasks(base::TimeTicks when) {
197  DCHECK(BelongsToCurrentThread());
198
199  scheduled_timers_.erase(when);
200
201  // |stopped_| is updated by the plugin thread only, so it is safe to access
202  // it here without taking the lock.
203  if (!stopped_) {
204    ProcessIncomingTasks();
205    RunDueTasks(base::TimeTicks::Now());
206  }
207}
208
209void PluginThreadTaskRunner::RunDueTasks(base::TimeTicks now) {
210  DCHECK(BelongsToCurrentThread());
211
212  // Run all due tasks.
213  while (!delayed_queue_.empty() &&
214         delayed_queue_.top().delayed_run_time <= now) {
215    delayed_queue_.top().task.Run();
216    delayed_queue_.pop();
217  }
218
219  // Post a delayed asynchronous call to the plugin thread to process tasks from
220  // the delayed queue.
221  if (!delayed_queue_.empty()) {
222    base::TimeTicks when = delayed_queue_.top().delayed_run_time;
223    if (scheduled_timers_.empty() || when < *scheduled_timers_.begin()) {
224      PostDelayedRunTasks(when);
225    }
226  }
227}
228
229void PluginThreadTaskRunner::RunTasks() {
230  DCHECK(BelongsToCurrentThread());
231
232  // |stopped_| is updated by the plugin thread only, so it is safe to access
233  // it here without taking the lock.
234  if (!stopped_) {
235    ProcessIncomingTasks();
236    RunDueTasks(base::TimeTicks::Now());
237  }
238}
239
240// static
241void PluginThreadTaskRunner::TaskSpringboard(void* data) {
242  base::Closure* task = reinterpret_cast<base::Closure*>(data);
243  task->Run();
244  delete task;
245}
246
247}  // namespace remoting
248