1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "media/blink/buffered_data_source.h"
6
7#include "base/bind.h"
8#include "base/callback_helpers.h"
9#include "base/single_thread_task_runner.h"
10#include "media/base/media_log.h"
11#include "net/base/net_errors.h"
12
13using blink::WebFrame;
14
15namespace {
16
17// BufferedDataSource has an intermediate buffer, this value governs the initial
18// size of that buffer. It is set to 32KB because this is a typical read size
19// of FFmpeg.
20const int kInitialReadBufferSize = 32768;
21
22// Number of cache misses we allow for a single Read() before signaling an
23// error.
24const int kNumCacheMissRetries = 3;
25
26}  // namespace
27
28namespace media {
29
30class BufferedDataSource::ReadOperation {
31 public:
32  ReadOperation(int64 position, int size, uint8* data,
33                const DataSource::ReadCB& callback);
34  ~ReadOperation();
35
36  // Runs |callback_| with the given |result|, deleting the operation
37  // afterwards.
38  static void Run(scoped_ptr<ReadOperation> read_op, int result);
39
40  // State for the number of times this read operation has been retried.
41  int retries() { return retries_; }
42  void IncrementRetries() { ++retries_; }
43
44  int64 position() { return position_; }
45  int size() { return size_; }
46  uint8* data() { return data_; }
47
48 private:
49  int retries_;
50
51  const int64 position_;
52  const int size_;
53  uint8* data_;
54  DataSource::ReadCB callback_;
55
56  DISALLOW_IMPLICIT_CONSTRUCTORS(ReadOperation);
57};
58
59BufferedDataSource::ReadOperation::ReadOperation(
60    int64 position, int size, uint8* data,
61    const DataSource::ReadCB& callback)
62    : retries_(0),
63      position_(position),
64      size_(size),
65      data_(data),
66      callback_(callback) {
67  DCHECK(!callback_.is_null());
68}
69
70BufferedDataSource::ReadOperation::~ReadOperation() {
71  DCHECK(callback_.is_null());
72}
73
74// static
75void BufferedDataSource::ReadOperation::Run(
76    scoped_ptr<ReadOperation> read_op, int result) {
77  base::ResetAndReturn(&read_op->callback_).Run(result);
78}
79
80BufferedDataSource::BufferedDataSource(
81    const GURL& url,
82    BufferedResourceLoader::CORSMode cors_mode,
83    const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
84    WebFrame* frame,
85    MediaLog* media_log,
86    BufferedDataSourceHost* host,
87    const DownloadingCB& downloading_cb)
88    : url_(url),
89      cors_mode_(cors_mode),
90      total_bytes_(kPositionNotSpecified),
91      streaming_(false),
92      frame_(frame),
93      intermediate_read_buffer_(kInitialReadBufferSize),
94      render_task_runner_(task_runner),
95      stop_signal_received_(false),
96      media_has_played_(false),
97      preload_(AUTO),
98      bitrate_(0),
99      playback_rate_(0.0),
100      media_log_(media_log),
101      host_(host),
102      downloading_cb_(downloading_cb),
103      weak_factory_(this) {
104  DCHECK(host_);
105  DCHECK(!downloading_cb_.is_null());
106}
107
108BufferedDataSource::~BufferedDataSource() {}
109
110// A factory method to create BufferedResourceLoader using the read parameters.
111// This method can be overridden to inject mock BufferedResourceLoader object
112// for testing purpose.
113BufferedResourceLoader* BufferedDataSource::CreateResourceLoader(
114    int64 first_byte_position, int64 last_byte_position) {
115  DCHECK(render_task_runner_->BelongsToCurrentThread());
116
117  BufferedResourceLoader::DeferStrategy strategy = preload_ == METADATA ?
118      BufferedResourceLoader::kReadThenDefer :
119      BufferedResourceLoader::kCapacityDefer;
120
121  return new BufferedResourceLoader(url_,
122                                    cors_mode_,
123                                    first_byte_position,
124                                    last_byte_position,
125                                    strategy,
126                                    bitrate_,
127                                    playback_rate_,
128                                    media_log_.get());
129}
130
131void BufferedDataSource::Initialize(const InitializeCB& init_cb) {
132  DCHECK(render_task_runner_->BelongsToCurrentThread());
133  DCHECK(!init_cb.is_null());
134  DCHECK(!loader_.get());
135
136  init_cb_ = init_cb;
137
138  if (url_.SchemeIsHTTPOrHTTPS()) {
139    // Do an unbounded range request starting at the beginning.  If the server
140    // responds with 200 instead of 206 we'll fall back into a streaming mode.
141    loader_.reset(CreateResourceLoader(0, kPositionNotSpecified));
142  } else {
143    // For all other protocols, assume they support range request. We fetch
144    // the full range of the resource to obtain the instance size because
145    // we won't be served HTTP headers.
146    loader_.reset(CreateResourceLoader(kPositionNotSpecified,
147                                       kPositionNotSpecified));
148  }
149
150  base::WeakPtr<BufferedDataSource> weak_this = weak_factory_.GetWeakPtr();
151  loader_->Start(
152      base::Bind(&BufferedDataSource::StartCallback, weak_this),
153      base::Bind(&BufferedDataSource::LoadingStateChangedCallback, weak_this),
154      base::Bind(&BufferedDataSource::ProgressCallback, weak_this),
155      frame_);
156}
157
158void BufferedDataSource::SetPreload(Preload preload) {
159  DCHECK(render_task_runner_->BelongsToCurrentThread());
160  preload_ = preload;
161}
162
163bool BufferedDataSource::HasSingleOrigin() {
164  DCHECK(render_task_runner_->BelongsToCurrentThread());
165  DCHECK(init_cb_.is_null() && loader_.get())
166      << "Initialize() must complete before calling HasSingleOrigin()";
167  return loader_->HasSingleOrigin();
168}
169
170bool BufferedDataSource::DidPassCORSAccessCheck() const {
171  return loader_.get() && loader_->DidPassCORSAccessCheck();
172}
173
174void BufferedDataSource::Abort() {
175  DCHECK(render_task_runner_->BelongsToCurrentThread());
176  {
177    base::AutoLock auto_lock(lock_);
178    StopInternal_Locked();
179  }
180  StopLoader();
181  frame_ = NULL;
182}
183
184void BufferedDataSource::MediaPlaybackRateChanged(float playback_rate) {
185  DCHECK(render_task_runner_->BelongsToCurrentThread());
186  DCHECK(loader_.get());
187
188  if (playback_rate < 0.0f)
189    return;
190
191  playback_rate_ = playback_rate;
192  loader_->SetPlaybackRate(playback_rate);
193}
194
195void BufferedDataSource::MediaIsPlaying() {
196  DCHECK(render_task_runner_->BelongsToCurrentThread());
197  media_has_played_ = true;
198  UpdateDeferStrategy(false);
199}
200
201void BufferedDataSource::MediaIsPaused() {
202  DCHECK(render_task_runner_->BelongsToCurrentThread());
203  UpdateDeferStrategy(true);
204}
205
206/////////////////////////////////////////////////////////////////////////////
207// DataSource implementation.
208void BufferedDataSource::Stop() {
209  {
210    base::AutoLock auto_lock(lock_);
211    StopInternal_Locked();
212  }
213
214  render_task_runner_->PostTask(
215      FROM_HERE,
216      base::Bind(&BufferedDataSource::StopLoader, weak_factory_.GetWeakPtr()));
217}
218
219void BufferedDataSource::SetBitrate(int bitrate) {
220  render_task_runner_->PostTask(FROM_HERE,
221                         base::Bind(&BufferedDataSource::SetBitrateTask,
222                                    weak_factory_.GetWeakPtr(),
223                                    bitrate));
224}
225
226void BufferedDataSource::Read(
227    int64 position, int size, uint8* data,
228    const DataSource::ReadCB& read_cb) {
229  DVLOG(1) << "Read: " << position << " offset, " << size << " bytes";
230  DCHECK(!read_cb.is_null());
231
232  {
233    base::AutoLock auto_lock(lock_);
234    DCHECK(!read_op_);
235
236    if (stop_signal_received_) {
237      read_cb.Run(kReadError);
238      return;
239    }
240
241    read_op_.reset(new ReadOperation(position, size, data, read_cb));
242  }
243
244  render_task_runner_->PostTask(
245      FROM_HERE,
246      base::Bind(&BufferedDataSource::ReadTask, weak_factory_.GetWeakPtr()));
247}
248
249bool BufferedDataSource::GetSize(int64* size_out) {
250  if (total_bytes_ != kPositionNotSpecified) {
251    *size_out = total_bytes_;
252    return true;
253  }
254  *size_out = 0;
255  return false;
256}
257
258bool BufferedDataSource::IsStreaming() {
259  return streaming_;
260}
261
262/////////////////////////////////////////////////////////////////////////////
263// Render thread tasks.
264void BufferedDataSource::ReadTask() {
265  DCHECK(render_task_runner_->BelongsToCurrentThread());
266  ReadInternal();
267}
268
269void BufferedDataSource::StopInternal_Locked() {
270  lock_.AssertAcquired();
271  if (stop_signal_received_)
272    return;
273
274  stop_signal_received_ = true;
275
276  // Initialize() isn't part of the DataSource interface so don't call it in
277  // response to Stop().
278  init_cb_.Reset();
279
280  if (read_op_)
281    ReadOperation::Run(read_op_.Pass(), kReadError);
282}
283
284void BufferedDataSource::StopLoader() {
285  DCHECK(render_task_runner_->BelongsToCurrentThread());
286
287  if (loader_)
288    loader_->Stop();
289}
290
291void BufferedDataSource::SetBitrateTask(int bitrate) {
292  DCHECK(render_task_runner_->BelongsToCurrentThread());
293  DCHECK(loader_.get());
294
295  bitrate_ = bitrate;
296  loader_->SetBitrate(bitrate);
297}
298
299// This method is the place where actual read happens, |loader_| must be valid
300// prior to make this method call.
301void BufferedDataSource::ReadInternal() {
302  DCHECK(render_task_runner_->BelongsToCurrentThread());
303  int64 position = 0;
304  int size = 0;
305  {
306    base::AutoLock auto_lock(lock_);
307    if (stop_signal_received_)
308      return;
309
310    position = read_op_->position();
311    size = read_op_->size();
312  }
313
314  // First we prepare the intermediate read buffer for BufferedResourceLoader
315  // to write to.
316  if (static_cast<int>(intermediate_read_buffer_.size()) < size)
317    intermediate_read_buffer_.resize(size);
318
319  // Perform the actual read with BufferedResourceLoader.
320  DCHECK(!intermediate_read_buffer_.empty());
321  loader_->Read(position,
322                size,
323                &intermediate_read_buffer_[0],
324                base::Bind(&BufferedDataSource::ReadCallback,
325                           weak_factory_.GetWeakPtr()));
326}
327
328
329/////////////////////////////////////////////////////////////////////////////
330// BufferedResourceLoader callback methods.
331void BufferedDataSource::StartCallback(
332    BufferedResourceLoader::Status status) {
333  DCHECK(render_task_runner_->BelongsToCurrentThread());
334  DCHECK(loader_.get());
335
336  bool init_cb_is_null = false;
337  {
338    base::AutoLock auto_lock(lock_);
339    init_cb_is_null = init_cb_.is_null();
340  }
341  if (init_cb_is_null) {
342    loader_->Stop();
343    return;
344  }
345
346  // All responses must be successful. Resources that are assumed to be fully
347  // buffered must have a known content length.
348  bool success = status == BufferedResourceLoader::kOk &&
349                 (!assume_fully_buffered() ||
350                  loader_->instance_size() != kPositionNotSpecified);
351
352  if (success) {
353    total_bytes_ = loader_->instance_size();
354    streaming_ =
355        !assume_fully_buffered() &&
356        (total_bytes_ == kPositionNotSpecified || !loader_->range_supported());
357
358    media_log_->SetDoubleProperty("total_bytes",
359                                  static_cast<double>(total_bytes_));
360    media_log_->SetBooleanProperty("streaming", streaming_);
361  } else {
362    loader_->Stop();
363  }
364
365  // TODO(scherkus): we shouldn't have to lock to signal host(), see
366  // http://crbug.com/113712 for details.
367  base::AutoLock auto_lock(lock_);
368  if (stop_signal_received_)
369    return;
370
371  if (success) {
372    if (total_bytes_ != kPositionNotSpecified) {
373      host_->SetTotalBytes(total_bytes_);
374      if (assume_fully_buffered())
375        host_->AddBufferedByteRange(0, total_bytes_);
376    }
377
378    media_log_->SetBooleanProperty("single_origin", loader_->HasSingleOrigin());
379    media_log_->SetBooleanProperty("passed_cors_access_check",
380                                   loader_->DidPassCORSAccessCheck());
381    media_log_->SetBooleanProperty("range_header_supported",
382                                   loader_->range_supported());
383  }
384
385  base::ResetAndReturn(&init_cb_).Run(success);
386}
387
388void BufferedDataSource::PartialReadStartCallback(
389    BufferedResourceLoader::Status status) {
390  DCHECK(render_task_runner_->BelongsToCurrentThread());
391  DCHECK(loader_.get());
392
393  if (status == BufferedResourceLoader::kOk) {
394    // Once the request has started successfully, we can proceed with
395    // reading from it.
396    ReadInternal();
397    return;
398  }
399
400  // Stop the resource loader since we have received an error.
401  loader_->Stop();
402
403  // TODO(scherkus): we shouldn't have to lock to signal host(), see
404  // http://crbug.com/113712 for details.
405  base::AutoLock auto_lock(lock_);
406  if (stop_signal_received_)
407    return;
408  ReadOperation::Run(read_op_.Pass(), kReadError);
409}
410
411void BufferedDataSource::ReadCallback(
412    BufferedResourceLoader::Status status,
413    int bytes_read) {
414  DCHECK(render_task_runner_->BelongsToCurrentThread());
415
416  // TODO(scherkus): we shouldn't have to lock to signal host(), see
417  // http://crbug.com/113712 for details.
418  base::AutoLock auto_lock(lock_);
419  if (stop_signal_received_)
420    return;
421
422  if (status != BufferedResourceLoader::kOk) {
423    // Stop the resource load if it failed.
424    loader_->Stop();
425
426    if (status == BufferedResourceLoader::kCacheMiss &&
427        read_op_->retries() < kNumCacheMissRetries) {
428      read_op_->IncrementRetries();
429
430      // Recreate a loader starting from where we last left off until the
431      // end of the resource.
432      loader_.reset(CreateResourceLoader(
433          read_op_->position(), kPositionNotSpecified));
434
435      base::WeakPtr<BufferedDataSource> weak_this = weak_factory_.GetWeakPtr();
436      loader_->Start(
437          base::Bind(&BufferedDataSource::PartialReadStartCallback, weak_this),
438          base::Bind(&BufferedDataSource::LoadingStateChangedCallback,
439                     weak_this),
440          base::Bind(&BufferedDataSource::ProgressCallback, weak_this),
441          frame_);
442      return;
443    }
444
445    ReadOperation::Run(read_op_.Pass(), kReadError);
446    return;
447  }
448
449  if (bytes_read > 0) {
450    DCHECK(!intermediate_read_buffer_.empty());
451    memcpy(read_op_->data(), &intermediate_read_buffer_[0], bytes_read);
452  } else if (bytes_read == 0 && total_bytes_ == kPositionNotSpecified) {
453    // We've reached the end of the file and we didn't know the total size
454    // before. Update the total size so Read()s past the end of the file will
455    // fail like they would if we had known the file size at the beginning.
456    total_bytes_ = loader_->instance_size();
457
458    if (total_bytes_ != kPositionNotSpecified) {
459      host_->SetTotalBytes(total_bytes_);
460      host_->AddBufferedByteRange(loader_->first_byte_position(),
461                                  total_bytes_);
462    }
463  }
464  ReadOperation::Run(read_op_.Pass(), bytes_read);
465}
466
467void BufferedDataSource::LoadingStateChangedCallback(
468    BufferedResourceLoader::LoadingState state) {
469  DCHECK(render_task_runner_->BelongsToCurrentThread());
470
471  if (assume_fully_buffered())
472    return;
473
474  bool is_downloading_data;
475  switch (state) {
476    case BufferedResourceLoader::kLoading:
477      is_downloading_data = true;
478      break;
479    case BufferedResourceLoader::kLoadingDeferred:
480    case BufferedResourceLoader::kLoadingFinished:
481      is_downloading_data = false;
482      break;
483
484    // TODO(scherkus): we don't signal network activity changes when loads
485    // fail to preserve existing behaviour when deferring is toggled, however
486    // we should consider changing DownloadingCB to also propagate loading
487    // state. For example there isn't any signal today to notify the client that
488    // loading has failed (we only get errors on subsequent reads).
489    case BufferedResourceLoader::kLoadingFailed:
490      return;
491  }
492
493  downloading_cb_.Run(is_downloading_data);
494}
495
496void BufferedDataSource::ProgressCallback(int64 position) {
497  DCHECK(render_task_runner_->BelongsToCurrentThread());
498
499  if (assume_fully_buffered())
500    return;
501
502  // TODO(scherkus): we shouldn't have to lock to signal host(), see
503  // http://crbug.com/113712 for details.
504  base::AutoLock auto_lock(lock_);
505  if (stop_signal_received_)
506    return;
507
508  host_->AddBufferedByteRange(loader_->first_byte_position(), position);
509}
510
511void BufferedDataSource::UpdateDeferStrategy(bool paused) {
512  // No need to aggressively buffer when we are assuming the resource is fully
513  // buffered.
514  if (assume_fully_buffered()) {
515    loader_->UpdateDeferStrategy(BufferedResourceLoader::kCapacityDefer);
516    return;
517  }
518
519  // If the playback has started (at which point the preload value is ignored)
520  // and we're paused, then try to load as much as possible (the loader will
521  // fall back to kCapacityDefer if it knows the current response won't be
522  // useful from the cache in the future).
523  if (media_has_played_ && paused && loader_->range_supported()) {
524    loader_->UpdateDeferStrategy(BufferedResourceLoader::kNeverDefer);
525    return;
526  }
527
528  // If media is currently playing or the page indicated preload=auto or the
529  // the server does not support the byte range request or we do not want to go
530  // too far ahead of the read head, use threshold strategy to enable/disable
531  // deferring when the buffer is full/depleted.
532  loader_->UpdateDeferStrategy(BufferedResourceLoader::kCapacityDefer);
533}
534
535}  // namespace media
536