balanced_media_task_runner_factory.cc revision 1320f92c476a1ad9d19dba2a48c72b75566198e9
1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "chromecast/media/cma/base/balanced_media_task_runner_factory.h"
6
7#include <map>
8
9#include "base/bind.h"
10#include "base/callback_helpers.h"
11#include "base/logging.h"
12#include "base/single_thread_task_runner.h"
13#include "chromecast/media/cma/base/media_task_runner.h"
14#include "media/base/buffers.h"
15
16namespace chromecast {
17namespace media {
18
19// MediaTaskRunnerWithNotification -
20// Media task runner which also behaves as a media task runner observer.
21class MediaTaskRunnerWithNotification : public MediaTaskRunner {
22 public:
23  // Wraps a MediaTaskRunner so that a third party can:
24  // - be notified when a PostMediaTask is performed on this media task runner.
25  //   |new_task_cb| is invoked in that case.
26  // - monitor the lifetime of the media task runner, i.e. check when the media
27  //   task runner is not needed anymore.
28  //   |shutdown_cb| is invoked in that case.
29  MediaTaskRunnerWithNotification(
30      const scoped_refptr<MediaTaskRunner>& media_task_runner,
31      const base::Closure& new_task_cb,
32      const base::Closure& shutdown_cb);
33
34  // MediaTaskRunner implementation.
35  virtual bool PostMediaTask(
36      const tracked_objects::Location& from_here,
37      const base::Closure& task,
38      base::TimeDelta timestamp) OVERRIDE;
39
40 private:
41  virtual ~MediaTaskRunnerWithNotification();
42
43  scoped_refptr<MediaTaskRunner> const media_task_runner_;
44
45  const base::Closure new_task_cb_;
46  const base::Closure shutdown_cb_;
47
48  DISALLOW_COPY_AND_ASSIGN(MediaTaskRunnerWithNotification);
49};
50
51MediaTaskRunnerWithNotification::MediaTaskRunnerWithNotification(
52    const scoped_refptr<MediaTaskRunner>& media_task_runner,
53    const base::Closure& new_task_cb,
54    const base::Closure& shutdown_cb)
55  : media_task_runner_(media_task_runner),
56    new_task_cb_(new_task_cb),
57    shutdown_cb_(shutdown_cb) {
58}
59
60MediaTaskRunnerWithNotification::~MediaTaskRunnerWithNotification() {
61  shutdown_cb_.Run();
62}
63
64bool MediaTaskRunnerWithNotification::PostMediaTask(
65    const tracked_objects::Location& from_here,
66    const base::Closure& task,
67    base::TimeDelta timestamp) {
68  bool may_run_in_future =
69      media_task_runner_->PostMediaTask(from_here, task, timestamp);
70  if (may_run_in_future)
71    new_task_cb_.Run();
72  return may_run_in_future;
73}
74
75
76// BalancedMediaTaskRunner -
77// Run media tasks whose timestamp is less or equal to a max timestamp.
78//
79// Restrictions of BalancedMediaTaskRunner:
80// - Can have at most one task in the queue.
81// - Tasks should be given by increasing timestamps.
82class BalancedMediaTaskRunner
83    : public MediaTaskRunner {
84 public:
85  explicit BalancedMediaTaskRunner(
86      const scoped_refptr<base::SingleThreadTaskRunner>& task_runner);
87
88  // Schedule tasks whose timestamp is less than or equal to |max_timestamp|.
89  void ScheduleWork(base::TimeDelta max_timestamp);
90
91  // Return the timestamp of the last media task.
92  // Return ::media::kNoTimestamp() if no media task has been posted.
93  base::TimeDelta GetMediaTimestamp() const;
94
95  // MediaTaskRunner implementation.
96  virtual bool PostMediaTask(
97      const tracked_objects::Location& from_here,
98      const base::Closure& task,
99      base::TimeDelta timestamp) OVERRIDE;
100
101 private:
102  virtual ~BalancedMediaTaskRunner();
103
104  scoped_refptr<base::SingleThreadTaskRunner> const task_runner_;
105
106  // Protects the following variables.
107  mutable base::Lock lock_;
108
109  // Possible pending media task.
110  tracked_objects::Location from_here_;
111  base::Closure pending_task_;
112
113  // Timestamp of the last posted task.
114  // Is initialized to ::media::kNoTimestamp().
115  base::TimeDelta last_timestamp_;
116
117  DISALLOW_COPY_AND_ASSIGN(BalancedMediaTaskRunner);
118};
119
120BalancedMediaTaskRunner::BalancedMediaTaskRunner(
121    const scoped_refptr<base::SingleThreadTaskRunner>& task_runner)
122  : task_runner_(task_runner),
123    last_timestamp_(::media::kNoTimestamp()) {
124}
125
126BalancedMediaTaskRunner::~BalancedMediaTaskRunner() {
127}
128
129void BalancedMediaTaskRunner::ScheduleWork(base::TimeDelta max_media_time) {
130  base::Closure task;
131  {
132    base::AutoLock auto_lock(lock_);
133    if (pending_task_.is_null())
134      return;
135
136    if (last_timestamp_ != ::media::kNoTimestamp() &&
137        last_timestamp_ >= max_media_time) {
138      return;
139    }
140
141    task = base::ResetAndReturn(&pending_task_);
142  }
143  task_runner_->PostTask(from_here_, task);
144}
145
146base::TimeDelta BalancedMediaTaskRunner::GetMediaTimestamp() const {
147  base::AutoLock auto_lock(lock_);
148  return last_timestamp_;
149}
150
151bool BalancedMediaTaskRunner::PostMediaTask(
152    const tracked_objects::Location& from_here,
153    const base::Closure& task,
154    base::TimeDelta timestamp) {
155  DCHECK(!task.is_null());
156
157  // Pass through for a task with no timestamp.
158  if (timestamp == ::media::kNoTimestamp()) {
159    return task_runner_->PostTask(from_here, task);
160  }
161
162  base::AutoLock auto_lock(lock_);
163
164  // Timestamps must be in order.
165  // Any task that does not meet that condition is simply discarded.
166  if (last_timestamp_ != ::media::kNoTimestamp() &&
167      timestamp < last_timestamp_) {
168    return false;
169  }
170
171  // Only support one pending task at a time.
172  DCHECK(pending_task_.is_null());
173  from_here_ = from_here;
174  pending_task_ = task;
175  last_timestamp_ = timestamp;
176
177  return true;
178}
179
180
181BalancedMediaTaskRunnerFactory::BalancedMediaTaskRunnerFactory(
182    base::TimeDelta max_delta)
183  : max_delta_(max_delta) {
184}
185
186BalancedMediaTaskRunnerFactory::~BalancedMediaTaskRunnerFactory() {
187}
188
189scoped_refptr<MediaTaskRunner>
190BalancedMediaTaskRunnerFactory::CreateMediaTaskRunner(
191    const scoped_refptr<base::SingleThreadTaskRunner>& task_runner) {
192  scoped_refptr<BalancedMediaTaskRunner> media_task_runner(
193      new BalancedMediaTaskRunner(task_runner));
194  scoped_refptr<MediaTaskRunnerWithNotification> media_task_runner_wrapper(
195      new MediaTaskRunnerWithNotification(
196          media_task_runner,
197          base::Bind(&BalancedMediaTaskRunnerFactory::OnNewTask, this),
198          base::Bind(
199              &BalancedMediaTaskRunnerFactory::UnregisterMediaTaskRunner,
200              this, media_task_runner)));
201  base::AutoLock auto_lock(lock_);
202  // Note that |media_task_runner| is inserted here and
203  // not |media_task_runner_wrapper|. Otherwise, we would always have one
204  // ref on |media_task_runner_wrapper| and would never get the release
205  // notification.
206  // When |media_task_runner_wrapper| is going away,
207  // BalancedMediaTaskRunnerFactory will receive a notification and will in
208  // turn remove |media_task_runner|.
209  task_runners_.insert(media_task_runner);
210  return media_task_runner_wrapper;
211}
212
213void BalancedMediaTaskRunnerFactory::OnNewTask() {
214  typedef
215      std::multimap<base::TimeDelta, scoped_refptr<BalancedMediaTaskRunner> >
216      TaskRunnerMap;
217  TaskRunnerMap runnable_task_runner;
218
219  base::AutoLock auto_lock(lock_);
220
221  // Get the minimum timestamp among all streams.
222  for (MediaTaskRunnerSet::const_iterator it = task_runners_.begin();
223       it != task_runners_.end(); ++it) {
224    base::TimeDelta timestamp((*it)->GetMediaTimestamp());
225    if (timestamp == ::media::kNoTimestamp())
226      continue;
227    runnable_task_runner.insert(
228        std::pair<base::TimeDelta, scoped_refptr<BalancedMediaTaskRunner> >(
229            timestamp, *it));
230  }
231
232  // If there is no media task, just returns.
233  if (runnable_task_runner.empty())
234    return;
235
236  // Run tasks which meet the balancing criteria.
237  base::TimeDelta min_timestamp(runnable_task_runner.begin()->first);
238  base::TimeDelta max_timestamp = min_timestamp + max_delta_;
239  for (TaskRunnerMap::iterator it = runnable_task_runner.begin();
240       it != runnable_task_runner.end(); ++it) {
241    (*it).second->ScheduleWork(max_timestamp);
242  }
243}
244
245void BalancedMediaTaskRunnerFactory::UnregisterMediaTaskRunner(
246      const scoped_refptr<BalancedMediaTaskRunner>& media_task_runner) {
247  base::AutoLock auto_lock(lock_);
248  task_runners_.erase(media_task_runner);
249}
250
251}  // namespace media
252}  // namespace chromecast
253