buffered_data_source.cc revision 513209b27ff55e2841eac0e4120199c23acce758
1// Copyright (c) 2010 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "base/callback.h"
6#include "base/compiler_specific.h"
7#include "base/message_loop.h"
8#include "base/process_util.h"
9#include "base/stl_util-inl.h"
10#include "base/string_util.h"
11#include "media/base/filter_host.h"
12#include "media/base/media_format.h"
13#include "net/base/load_flags.h"
14#include "net/base/net_errors.h"
15#include "net/http/http_response_headers.h"
16#include "webkit/glue/media/buffered_data_source.h"
17#include "webkit/glue/webkit_glue.h"
18#include "webkit/glue/webmediaplayer_impl.h"
19
20namespace {
21
22const char kHttpScheme[] = "http";
23const char kHttpsScheme[] = "https";
24const char kDataScheme[] = "data";
25const int64 kPositionNotSpecified = -1;
26const int kHttpOK = 200;
27const int kHttpPartialContent = 206;
28
29// Define the number of bytes in a megabyte.
30const size_t kMegabyte = 1024 * 1024;
31
32// Backward capacity of the buffer, by default 2MB.
33const size_t kBackwardCapcity = 2 * kMegabyte;
34
35// Forward capacity of the buffer, by default 10MB.
36const size_t kForwardCapacity = 10 * kMegabyte;
37
38// The threshold of bytes that we should wait until the data arrives in the
39// future instead of restarting a new connection. This number is defined in the
40// number of bytes, we should determine this value from typical connection speed
41// and amount of time for a suitable wait. Now I just make a guess for this
42// number to be 2MB.
43// TODO(hclam): determine a better value for this.
44const int kForwardWaitThreshold = 2 * kMegabyte;
45
46// Defines how long we should wait for more data before we declare a connection
47// timeout and start a new request.
48// TODO(hclam): Set it to 5s, calibrate this value later.
49const int kTimeoutMilliseconds = 5000;
50
51// Defines how many times we should try to read from a buffered resource loader
52// before we declare a read error. After each failure of read from a buffered
53// resource loader, a new one is created to be read.
54const int kReadTrials = 3;
55
56// BufferedDataSource has an intermediate buffer, this value governs the initial
57// size of that buffer. It is set to 32KB because this is a typical read size
58// of FFmpeg.
59const int kInitialReadBufferSize = 32768;
60
61// Returns true if |url| operates on HTTP protocol.
62bool IsHttpProtocol(const GURL& url) {
63  return url.SchemeIs(kHttpScheme) || url.SchemeIs(kHttpsScheme);
64}
65
66bool IsDataProtocol(const GURL& url) {
67  return url.SchemeIs(kDataScheme);
68}
69
70}  // namespace
71
72namespace webkit_glue {
73/////////////////////////////////////////////////////////////////////////////
74// BufferedResourceLoader
75BufferedResourceLoader::BufferedResourceLoader(
76    webkit_glue::MediaResourceLoaderBridgeFactory* bridge_factory,
77    const GURL& url,
78    int64 first_byte_position,
79    int64 last_byte_position)
80    : buffer_(new media::SeekableBuffer(kBackwardCapcity, kForwardCapacity)),
81      deferred_(false),
82      defer_allowed_(true),
83      completed_(false),
84      range_requested_(false),
85      partial_response_(false),
86      bridge_factory_(bridge_factory),
87      url_(url),
88      first_byte_position_(first_byte_position),
89      last_byte_position_(last_byte_position),
90      start_callback_(NULL),
91      bridge_(NULL),
92      offset_(0),
93      content_length_(kPositionNotSpecified),
94      instance_size_(kPositionNotSpecified),
95      read_callback_(NULL),
96      read_position_(0),
97      read_size_(0),
98      read_buffer_(NULL),
99      first_offset_(0),
100      last_offset_(0) {
101}
102
103BufferedResourceLoader::~BufferedResourceLoader() {
104}
105
106void BufferedResourceLoader::Start(net::CompletionCallback* start_callback,
107                                   NetworkEventCallback* event_callback) {
108  // Make sure we have not started.
109  DCHECK(!bridge_.get());
110  DCHECK(!start_callback_.get());
111  DCHECK(!event_callback_.get());
112  DCHECK(start_callback);
113  DCHECK(event_callback);
114
115  start_callback_.reset(start_callback);
116  event_callback_.reset(event_callback);
117
118  if (first_byte_position_ != kPositionNotSpecified) {
119    range_requested_ = true;
120    // TODO(hclam): server may not support range request so |offset_| may not
121    // equal to |first_byte_position_|.
122    offset_ = first_byte_position_;
123  }
124
125  // Creates the bridge on render thread since we can only access
126  // ResourceDispatcher on this thread.
127  bridge_.reset(
128      bridge_factory_->CreateBridge(
129          url_,
130          net::LOAD_NORMAL,
131          first_byte_position_,
132          last_byte_position_));
133
134  // Increment the reference count right before we start the request. This
135  // reference will be release when this request has ended.
136  AddRef();
137
138  // And start the resource loading.
139  bridge_->Start(this);
140}
141
142void BufferedResourceLoader::Stop() {
143  // Reset callbacks.
144  start_callback_.reset();
145  event_callback_.reset();
146  read_callback_.reset();
147
148  // Use the internal buffer to signal that we have been stopped.
149  // TODO(hclam): Not so pretty to do this.
150  if (!buffer_.get())
151    return;
152
153  // Destroy internal buffer.
154  buffer_.reset();
155
156  if (bridge_.get()) {
157    // Cancel the request. This method call will cancel the request
158    // asynchronously. We may still get data or messages until we receive
159    // a response completed message.
160    if (deferred_)
161      bridge_->SetDefersLoading(false);
162    deferred_ = false;
163    bridge_->Cancel();
164  }
165}
166
167void BufferedResourceLoader::Read(int64 position,
168                                  int read_size,
169                                  uint8* buffer,
170                                  net::CompletionCallback* read_callback) {
171  DCHECK(!read_callback_.get());
172  DCHECK(buffer_.get());
173  DCHECK(read_callback);
174  DCHECK(buffer);
175
176  // Save the parameter of reading.
177  read_callback_.reset(read_callback);
178  read_position_ = position;
179  read_size_ = read_size;
180  read_buffer_ = buffer;
181
182  // If read position is beyond the instance size, we cannot read there.
183  if (instance_size_ != kPositionNotSpecified &&
184      instance_size_ <= read_position_) {
185    DoneRead(0);
186    return;
187  }
188
189  // Make sure |offset_| and |read_position_| does not differ by a large
190  // amount.
191  if (read_position_ > offset_ + kint32max ||
192      read_position_ < offset_ + kint32min) {
193    DoneRead(net::ERR_CACHE_MISS);
194    return;
195  }
196
197  // Prepare the parameters.
198  first_offset_ = static_cast<int>(read_position_ - offset_);
199  last_offset_ = first_offset_ + read_size_;
200
201  // If we can serve the request now, do the actual read.
202  if (CanFulfillRead()) {
203    ReadInternal();
204    DisableDeferIfNeeded();
205    return;
206  }
207
208  // If we expected the read request to be fulfilled later, returns
209  // immediately and let more data to flow in.
210  if (WillFulfillRead())
211    return;
212
213  // Make a callback to report failure.
214  DoneRead(net::ERR_CACHE_MISS);
215}
216
217int64 BufferedResourceLoader::GetBufferedFirstBytePosition() {
218  if (buffer_.get())
219    return offset_ - static_cast<int>(buffer_->backward_bytes());
220  return kPositionNotSpecified;
221}
222
223int64 BufferedResourceLoader::GetBufferedLastBytePosition() {
224  if (buffer_.get())
225    return offset_ + static_cast<int>(buffer_->forward_bytes()) - 1;
226  return kPositionNotSpecified;
227}
228
229void BufferedResourceLoader::SetAllowDefer(bool is_allowed) {
230  defer_allowed_ = is_allowed;
231  DisableDeferIfNeeded();
232}
233
234/////////////////////////////////////////////////////////////////////////////
235// BufferedResourceLoader,
236//     webkit_glue::ResourceLoaderBridge::Peer implementations
237bool BufferedResourceLoader::OnReceivedRedirect(
238    const GURL& new_url,
239    const webkit_glue::ResourceResponseInfo& info,
240    bool* has_new_first_party_for_cookies,
241    GURL* new_first_party_for_cookies) {
242  DCHECK(bridge_.get());
243
244  // Save the new URL.
245  url_ = new_url;
246  // TODO(wtc): should we return a new first party for cookies URL?
247  *has_new_first_party_for_cookies = false;
248
249  // The load may have been stopped and |start_callback| is destroyed.
250  // In this case we shouldn't do anything.
251  if (!start_callback_.get())
252    return true;
253
254  if (!IsProtocolSupportedForMedia(new_url)) {
255    DoneStart(net::ERR_ADDRESS_INVALID);
256    Stop();
257    return false;
258  }
259  return true;
260}
261
262void BufferedResourceLoader::OnReceivedResponse(
263    const webkit_glue::ResourceResponseInfo& info,
264    bool content_filtered) {
265  DCHECK(bridge_.get());
266
267  // The loader may have been stopped and |start_callback| is destroyed.
268  // In this case we shouldn't do anything.
269  if (!start_callback_.get())
270    return;
271
272  // We make a strong assumption that when we reach here we have either
273  // received a response from HTTP/HTTPS protocol or the request was
274  // successful (in particular range request). So we only verify the partial
275  // response for HTTP and HTTPS protocol.
276  if (IsHttpProtocol(url_)) {
277    int error = net::OK;
278    if (!info.headers) {
279      // We expect to receive headers because this is a HTTP or HTTPS protocol,
280      // if not report failure.
281      error = net::ERR_INVALID_RESPONSE;
282    } else {
283      if (info.headers->response_code() == kHttpPartialContent)
284        partial_response_ = true;
285
286      if (range_requested_ && partial_response_) {
287        // If we have verified the partial response and it is correct, we will
288        // return net::OK.
289        if (!VerifyPartialResponse(info))
290          error = net::ERR_INVALID_RESPONSE;
291      } else if (info.headers->response_code() != kHttpOK) {
292        // We didn't request a range but server didn't reply with "200 OK".
293        error = net::ERR_FAILED;
294      }
295    }
296
297    if (error != net::OK) {
298      DoneStart(error);
299      Stop();
300      return;
301    }
302  } else {
303    // For any protocol other than HTTP and HTTPS, assume range request is
304    // always fulfilled.
305    partial_response_ = range_requested_;
306  }
307
308  // |info.content_length| can be -1, in that case |content_length_| is
309  // not specified and this is a streaming response.
310  content_length_ = info.content_length;
311
312  // If we have not requested a range, then the size of the instance is equal
313  // to the content length.
314  if (!partial_response_)
315    instance_size_ = content_length_;
316
317  // Calls with a successful response.
318  DoneStart(net::OK);
319}
320
321void BufferedResourceLoader::OnReceivedData(const char* data, int len) {
322  DCHECK(bridge_.get());
323
324  // If this loader has been stopped, |buffer_| would be destroyed.
325  // In this case we shouldn't do anything.
326  if (!buffer_.get())
327    return;
328
329  // Writes more data to |buffer_|.
330  buffer_->Append(reinterpret_cast<const uint8*>(data), len);
331
332  // If there is an active read request, try to fulfill the request.
333  if (HasPendingRead() && CanFulfillRead()) {
334    ReadInternal();
335  } else if (!defer_allowed_) {
336    // If we're not allowed to defer, slide the buffer window forward instead
337    // of deferring.
338    if (buffer_->forward_bytes() > buffer_->forward_capacity()) {
339      size_t excess = buffer_->forward_bytes() - buffer_->forward_capacity();
340      bool success = buffer_->Seek(excess);
341      DCHECK(success);
342      offset_ += first_offset_ + excess;
343    }
344  }
345
346  // At last see if the buffer is full and we need to defer the downloading.
347  EnableDeferIfNeeded();
348
349  // Notify that we have received some data.
350  NotifyNetworkEvent();
351}
352
353void BufferedResourceLoader::OnCompletedRequest(
354    const URLRequestStatus& status,
355    const std::string& security_info,
356    const base::Time& completion_time) {
357  DCHECK(bridge_.get());
358
359  // Saves the information that the request has completed.
360  completed_ = true;
361
362  // If there is a start callback, calls it.
363  if (start_callback_.get()) {
364    DoneStart(status.os_error());
365  }
366
367  // If there is a pending read but the request has ended, returns with what
368  // we have.
369  if (HasPendingRead()) {
370    // Make sure we have a valid buffer before we satisfy a read request.
371    DCHECK(buffer_.get());
372
373    if (status.is_success()) {
374      // Try to fulfill with what is in the buffer.
375      if (CanFulfillRead())
376        ReadInternal();
377      else
378        DoneRead(net::ERR_CACHE_MISS);
379    } else {
380      // If the request has failed, then fail the read.
381      DoneRead(net::ERR_FAILED);
382    }
383  }
384
385  // There must not be any outstanding read request.
386  DCHECK(!HasPendingRead());
387
388  // Notify that network response is completed.
389  NotifyNetworkEvent();
390
391  // We incremented the reference count when the loader was started. We balance
392  // that reference here so that we get destroyed. This is also the only safe
393  // place to destroy the ResourceLoaderBridge.
394  bridge_.reset();
395  Release();
396}
397
398/////////////////////////////////////////////////////////////////////////////
399// BufferedResourceLoader, private
400void BufferedResourceLoader::EnableDeferIfNeeded() {
401  if (!defer_allowed_)
402    return;
403
404  if (!deferred_ &&
405      buffer_->forward_bytes() >= buffer_->forward_capacity()) {
406    deferred_ = true;
407
408    if (bridge_.get())
409      bridge_->SetDefersLoading(true);
410
411    NotifyNetworkEvent();
412  }
413}
414
415void BufferedResourceLoader::DisableDeferIfNeeded() {
416  if (deferred_ &&
417      (!defer_allowed_ ||
418       buffer_->forward_bytes() < buffer_->forward_capacity() / 2)) {
419    deferred_ = false;
420
421    if (bridge_.get())
422      bridge_->SetDefersLoading(false);
423
424    NotifyNetworkEvent();
425  }
426}
427
428bool BufferedResourceLoader::CanFulfillRead() {
429  // If we are reading too far in the backward direction.
430  if (first_offset_ < 0 &&
431      first_offset_ + static_cast<int>(buffer_->backward_bytes()) < 0)
432    return false;
433
434  // If the start offset is too far ahead.
435  if (first_offset_ >= static_cast<int>(buffer_->forward_bytes()))
436    return false;
437
438  // At the point, we verified that first byte requested is within the buffer.
439  // If the request has completed, then just returns with what we have now.
440  if (completed_)
441    return true;
442
443  // If the resource request is still active, make sure the whole requested
444  // range is covered.
445  if (last_offset_ > static_cast<int>(buffer_->forward_bytes()))
446    return false;
447
448  return true;
449}
450
451bool BufferedResourceLoader::WillFulfillRead() {
452  // Reading too far in the backward direction.
453  if (first_offset_ < 0 &&
454      first_offset_ + static_cast<int>(buffer_->backward_bytes()) < 0)
455    return false;
456
457  // Try to read too far ahead.
458  if (last_offset_ > kForwardWaitThreshold)
459    return false;
460
461  // The resource request has completed, there's no way we can fulfill the
462  // read request.
463  if (completed_)
464    return false;
465
466  return true;
467}
468
469void BufferedResourceLoader::ReadInternal() {
470  // Seek to the first byte requested.
471  bool ret = buffer_->Seek(first_offset_);
472  DCHECK(ret);
473
474  // Then do the read.
475  int read = static_cast<int>(buffer_->Read(read_buffer_, read_size_));
476  offset_ += first_offset_ + read;
477
478  // And report with what we have read.
479  DoneRead(read);
480}
481
482bool BufferedResourceLoader::VerifyPartialResponse(
483    const ResourceResponseInfo& info) {
484  int64 first_byte_position, last_byte_position, instance_size;
485  if (!info.headers->GetContentRange(&first_byte_position,
486                                     &last_byte_position,
487                                     &instance_size)) {
488    return false;
489  }
490
491  if (instance_size != kPositionNotSpecified)
492    instance_size_ = instance_size;
493
494  if (first_byte_position_ != -1 &&
495      first_byte_position_ != first_byte_position) {
496    return false;
497  }
498
499  // TODO(hclam): I should also check |last_byte_position|, but since
500  // we will never make such a request that it is ok to leave it unimplemented.
501  return true;
502}
503
504void BufferedResourceLoader::DoneRead(int error) {
505  read_callback_->RunWithParams(Tuple1<int>(error));
506  read_callback_.reset();
507  read_position_ = 0;
508  read_size_ = 0;
509  read_buffer_ = NULL;
510  first_offset_ = 0;
511  last_offset_ = 0;
512}
513
514void BufferedResourceLoader::DoneStart(int error) {
515  start_callback_->RunWithParams(Tuple1<int>(error));
516  start_callback_.reset();
517}
518
519void BufferedResourceLoader::NotifyNetworkEvent() {
520  if (event_callback_.get())
521    event_callback_->Run();
522}
523
524/////////////////////////////////////////////////////////////////////////////
525// BufferedDataSource
526BufferedDataSource::BufferedDataSource(
527    MessageLoop* render_loop,
528    webkit_glue::MediaResourceLoaderBridgeFactory* bridge_factory)
529    : total_bytes_(kPositionNotSpecified),
530      loaded_(false),
531      streaming_(false),
532      single_origin_(true),
533      bridge_factory_(bridge_factory),
534      loader_(NULL),
535      network_activity_(false),
536      initialize_callback_(NULL),
537      read_callback_(NULL),
538      read_position_(0),
539      read_size_(0),
540      read_buffer_(NULL),
541      read_attempts_(0),
542      intermediate_read_buffer_(new uint8[kInitialReadBufferSize]),
543      intermediate_read_buffer_size_(kInitialReadBufferSize),
544      render_loop_(render_loop),
545      stop_signal_received_(false),
546      stopped_on_render_loop_(false),
547      media_is_paused_(true),
548      using_range_request_(true) {
549}
550
551BufferedDataSource::~BufferedDataSource() {
552}
553
554// A factory method to create BufferedResourceLoader using the read parameters.
555// This method can be overrided to inject mock BufferedResourceLoader object
556// for testing purpose.
557BufferedResourceLoader* BufferedDataSource::CreateResourceLoader(
558    int64 first_byte_position, int64 last_byte_position) {
559  DCHECK(MessageLoop::current() == render_loop_);
560
561  return new BufferedResourceLoader(bridge_factory_.get(), url_,
562                                    first_byte_position,
563                                    last_byte_position);
564}
565
566// This method simply returns kTimeoutMilliseconds. The purpose of this
567// method is to be overidded so as to provide a different timeout value
568// for testing purpose.
569base::TimeDelta BufferedDataSource::GetTimeoutMilliseconds() {
570  return base::TimeDelta::FromMilliseconds(kTimeoutMilliseconds);
571}
572
573/////////////////////////////////////////////////////////////////////////////
574// BufferedDataSource, media::MediaFilter implementation
575void BufferedDataSource::Initialize(const std::string& url,
576                                    media::FilterCallback* callback) {
577  // Saves the url.
578  url_ = GURL(url);
579
580  if (!IsProtocolSupportedForMedia(url_)) {
581    // This method is called on the thread where host() lives so it is safe
582    // to make this call.
583    host()->SetError(media::PIPELINE_ERROR_NETWORK);
584    callback->Run();
585    delete callback;
586    return;
587  }
588
589  DCHECK(callback);
590  initialize_callback_.reset(callback);
591
592  media_format_.SetAsString(media::MediaFormat::kMimeType,
593                            media::mime_type::kApplicationOctetStream);
594  media_format_.SetAsString(media::MediaFormat::kURL, url);
595
596  // Post a task to complete the initialization task.
597  render_loop_->PostTask(FROM_HERE,
598      NewRunnableMethod(this, &BufferedDataSource::InitializeTask));
599}
600
601bool BufferedDataSource::IsUrlSupported(const std::string& url) {
602  GURL gurl(url);
603
604  // This data source doesn't support data:// protocol so reject it.
605  return IsProtocolSupportedForMedia(gurl) && !IsDataProtocol(gurl);
606}
607
608void BufferedDataSource::Stop(media::FilterCallback* callback) {
609  {
610    AutoLock auto_lock(lock_);
611    stop_signal_received_ = true;
612  }
613  if (callback) {
614    callback->Run();
615    delete callback;
616  }
617
618  render_loop_->PostTask(FROM_HERE,
619      NewRunnableMethod(this, &BufferedDataSource::CleanupTask));
620}
621
622void BufferedDataSource::SetPlaybackRate(float playback_rate) {
623  render_loop_->PostTask(FROM_HERE,
624      NewRunnableMethod(this, &BufferedDataSource::SetPlaybackRateTask,
625                        playback_rate));
626}
627
628/////////////////////////////////////////////////////////////////////////////
629// BufferedDataSource, media::DataSource implementation
630void BufferedDataSource::Read(int64 position, size_t size, uint8* data,
631                              media::DataSource::ReadCallback* read_callback) {
632  render_loop_->PostTask(FROM_HERE,
633      NewRunnableMethod(this, &BufferedDataSource::ReadTask,
634                        position, static_cast<int>(size), data, read_callback));
635}
636
637bool BufferedDataSource::GetSize(int64* size_out) {
638  if (total_bytes_ != kPositionNotSpecified) {
639    *size_out = total_bytes_;
640    return true;
641  }
642  *size_out = 0;
643  return false;
644}
645
646bool BufferedDataSource::IsStreaming() {
647  return streaming_;
648}
649
650bool BufferedDataSource::HasSingleOrigin() {
651  DCHECK(MessageLoop::current() == render_loop_);
652  return single_origin_;
653}
654
655void BufferedDataSource::Abort() {
656  DCHECK(MessageLoop::current() == render_loop_);
657
658  // If we are told to abort, immediately return from any pending read
659  // with an error.
660  if (read_callback_.get()) {
661    {
662      AutoLock auto_lock(lock_);
663      DoneRead_Locked(net::ERR_FAILED);
664    }
665    CleanupTask();
666  }
667}
668
669/////////////////////////////////////////////////////////////////////////////
670// BufferedDataSource, render thread tasks
671void BufferedDataSource::InitializeTask() {
672  DCHECK(MessageLoop::current() == render_loop_);
673  DCHECK(!loader_.get());
674  DCHECK(!stopped_on_render_loop_);
675
676  // Kick starts the watch dog task that will handle connection timeout.
677  // We run the watch dog 2 times faster the actual timeout so as to catch
678  // the timeout more accurately.
679  watch_dog_timer_.Start(
680      GetTimeoutMilliseconds() / 2,
681      this,
682      &BufferedDataSource::WatchDogTask);
683
684  if (IsHttpProtocol(url_)) {
685    // Fetch only first 1024 bytes as this usually covers the header portion
686    // of a media file that gives enough information about the codecs, etc.
687    // This also serve as a probe to determine server capability to serve
688    // range request.
689    // TODO(hclam): Do some experiments for the best approach.
690    loader_ = CreateResourceLoader(0, 1024);
691    loader_->Start(
692        NewCallback(this, &BufferedDataSource::HttpInitialStartCallback),
693        NewCallback(this, &BufferedDataSource::NetworkEventCallback));
694  } else {
695    // For all other protocols, assume they support range request. We fetch
696    // the full range of the resource to obtain the instance size because
697    // we won't be served HTTP headers.
698    loader_ = CreateResourceLoader(-1, -1);
699    loader_->Start(
700        NewCallback(this, &BufferedDataSource::NonHttpInitialStartCallback),
701        NewCallback(this, &BufferedDataSource::NetworkEventCallback));
702  }
703}
704
705void BufferedDataSource::ReadTask(
706     int64 position, int read_size, uint8* buffer,
707     media::DataSource::ReadCallback* read_callback) {
708  DCHECK(MessageLoop::current() == render_loop_);
709
710  // If CleanupTask() was executed we should return immediately. We check this
711  // variable to prevent doing any actual work after clean up was done. We do
712  // not check |stop_signal_received_| because anything use of it has to be
713  // within |lock_| which is not desirable.
714  if (stopped_on_render_loop_)
715    return;
716
717  DCHECK(!read_callback_.get());
718  DCHECK(read_callback);
719
720  // Saves the read parameters.
721  read_position_ = position;
722  read_size_ = read_size;
723  read_callback_.reset(read_callback);
724  read_buffer_ = buffer;
725  read_submitted_time_ = base::Time::Now();
726  read_attempts_ = 0;
727
728  // Call to read internal to perform the actual read.
729  ReadInternal();
730}
731
732void BufferedDataSource::CleanupTask() {
733  DCHECK(MessageLoop::current() == render_loop_);
734
735  // If we have already stopped, do nothing.
736  if (stopped_on_render_loop_)
737    return;
738
739  // Stop the watch dog.
740  watch_dog_timer_.Stop();
741
742  // We just need to stop the loader, so it stops activity.
743  if (loader_.get())
744    loader_->Stop();
745
746  // Reset the parameters of the current read request.
747  read_callback_.reset();
748  read_position_ = 0;
749  read_size_ = 0;
750  read_buffer_ = 0;
751  read_submitted_time_ = base::Time();
752  read_attempts_ = 0;
753
754  // Signal that stop task has finished execution.
755  stopped_on_render_loop_ = true;
756}
757
758void BufferedDataSource::RestartLoadingTask() {
759  DCHECK(MessageLoop::current() == render_loop_);
760
761  // This variable is set in CleanupTask(). We check this and do an early
762  // return. The sequence of actions which enable this conditions is:
763  // 1. Stop() is called from the pipeline.
764  // 2. ReadCallback() is called from the resource loader.
765  // 3. CleanupTask() is executed.
766  // 4. RestartLoadingTask() is executed.
767  if (stopped_on_render_loop_)
768    return;
769
770  // If there's no outstanding read then return early.
771  if (!read_callback_.get())
772    return;
773
774  loader_ = CreateResourceLoader(read_position_, -1);
775  loader_->SetAllowDefer(!media_is_paused_);
776  loader_->Start(
777      NewCallback(this, &BufferedDataSource::PartialReadStartCallback),
778      NewCallback(this, &BufferedDataSource::NetworkEventCallback));
779}
780
781void BufferedDataSource::WatchDogTask() {
782  DCHECK(MessageLoop::current() == render_loop_);
783  DCHECK(!stopped_on_render_loop_);
784
785  // We only care if there is an active read request.
786  if (!read_callback_.get())
787    return;
788
789  DCHECK(loader_.get());
790  base::TimeDelta delta = base::Time::Now() - read_submitted_time_;
791  if (delta < GetTimeoutMilliseconds())
792    return;
793
794  // TODO(hclam): Maybe raise an error here. But if an error is reported
795  // the whole pipeline may get destroyed...
796  if (read_attempts_ >= kReadTrials)
797    return;
798
799  ++read_attempts_;
800  read_submitted_time_ = base::Time::Now();
801
802  // Stops the current loader and creates a new resource loader and
803  // retry the request.
804  loader_->Stop();
805  loader_ = CreateResourceLoader(read_position_, -1);
806  loader_->SetAllowDefer(!media_is_paused_);
807  loader_->Start(
808      NewCallback(this, &BufferedDataSource::PartialReadStartCallback),
809      NewCallback(this, &BufferedDataSource::NetworkEventCallback));
810}
811
812void BufferedDataSource::SetPlaybackRateTask(float playback_rate) {
813  DCHECK(MessageLoop::current() == render_loop_);
814  DCHECK(loader_.get());
815
816  bool previously_paused = media_is_paused_;
817  media_is_paused_ = (playback_rate == 0.0);
818
819  // Disallow deferring data when we are pausing, allow deferring data
820  // when we resume playing.
821  if (previously_paused && !media_is_paused_) {
822    loader_->SetAllowDefer(true);
823  } else if (!previously_paused && media_is_paused_) {
824    loader_->SetAllowDefer(false);
825  }
826}
827
828// This method is the place where actual read happens, |loader_| must be valid
829// prior to make this method call.
830void BufferedDataSource::ReadInternal() {
831  DCHECK(MessageLoop::current() == render_loop_);
832  DCHECK(loader_.get());
833
834  // First we prepare the intermediate read buffer for BufferedResourceLoader
835  // to write to.
836  if (read_size_ > intermediate_read_buffer_size_) {
837    intermediate_read_buffer_.reset(new uint8[read_size_]);
838  }
839
840  // Perform the actual read with BufferedResourceLoader.
841  loader_->Read(read_position_, read_size_, intermediate_read_buffer_.get(),
842                NewCallback(this, &BufferedDataSource::ReadCallback));
843}
844
845// Method to report the results of the current read request. Also reset all
846// the read parameters.
847void BufferedDataSource::DoneRead_Locked(int error) {
848  DCHECK(MessageLoop::current() == render_loop_);
849  DCHECK(read_callback_.get());
850  lock_.AssertAcquired();
851
852  if (error >= 0) {
853    read_callback_->RunWithParams(Tuple1<size_t>(error));
854  } else {
855    read_callback_->RunWithParams(
856        Tuple1<size_t>(static_cast<size_t>(media::DataSource::kReadError)));
857  }
858
859  read_callback_.reset();
860  read_position_ = 0;
861  read_size_ = 0;
862  read_buffer_ = 0;
863}
864
865void BufferedDataSource::DoneInitialization_Locked() {
866  DCHECK(MessageLoop::current() == render_loop_);
867  DCHECK(initialize_callback_.get());
868  lock_.AssertAcquired();
869
870  initialize_callback_->Run();
871  initialize_callback_.reset();
872}
873
874/////////////////////////////////////////////////////////////////////////////
875// BufferedDataSource, callback methods.
876// These methods are called on the render thread for the events reported by
877// BufferedResourceLoader.
878void BufferedDataSource::HttpInitialStartCallback(int error) {
879  DCHECK(MessageLoop::current() == render_loop_);
880  DCHECK(loader_.get());
881
882  // Check if the request ended up at a different origin via redirect.
883  single_origin_ = url_.GetOrigin() == loader_->url().GetOrigin();
884
885  int64 instance_size = loader_->instance_size();
886  bool partial_response = loader_->partial_response();
887  bool success = error == net::OK;
888
889  if (success) {
890    // TODO(hclam): Needs more thinking about supporting servers without range
891    // request or their partial response is not complete.
892    total_bytes_ = instance_size;
893    loaded_ = false;
894    streaming_ = (instance_size == kPositionNotSpecified) || !partial_response;
895  } else {
896    // TODO(hclam): In case of failure, we can retry several times.
897    loader_->Stop();
898  }
899
900  if (error == net::ERR_INVALID_RESPONSE && using_range_request_) {
901    // Assuming that the Range header was causing the problem. Retry without
902    // the Range header.
903    using_range_request_ = false;
904    loader_ = CreateResourceLoader(-1, -1);
905    loader_->Start(
906        NewCallback(this, &BufferedDataSource::HttpInitialStartCallback),
907        NewCallback(this, &BufferedDataSource::NetworkEventCallback));
908    return;
909  }
910
911  // We need to prevent calling to filter host and running the callback if
912  // we have received the stop signal. We need to lock down the whole callback
913  // method to prevent bad things from happening. The reason behind this is
914  // that we cannot guarantee tasks on render thread have completely stopped
915  // when we receive the Stop() method call. The only way to solve this is to
916  // let tasks on render thread to run but make sure they don't call outside
917  // this object when Stop() method is ever called. Locking this method is safe
918  // because |lock_| is only acquired in tasks on render thread.
919  AutoLock auto_lock(lock_);
920  if (stop_signal_received_)
921    return;
922
923  if (!success) {
924    host()->SetError(media::PIPELINE_ERROR_NETWORK);
925    DoneInitialization_Locked();
926    return;
927  }
928
929  if (streaming_) {
930    // If the server didn't reply with an instance size, it is likely this
931    // is a streaming response.
932    host()->SetStreaming(true);
933  } else {
934    // This value governs the range that we can seek to.
935    // TODO(hclam): Report the correct value of buffered bytes.
936    host()->SetTotalBytes(total_bytes_);
937    host()->SetBufferedBytes(0);
938  }
939
940  // Currently, only files can be used reliably w/o a network.
941  host()->SetLoaded(false);
942  DoneInitialization_Locked();
943}
944
945void BufferedDataSource::NonHttpInitialStartCallback(int error) {
946  DCHECK(MessageLoop::current() == render_loop_);
947  DCHECK(loader_.get());
948
949  // Check if the request ended up at a different origin via redirect.
950  single_origin_ = url_.GetOrigin() == loader_->url().GetOrigin();
951
952  int64 instance_size = loader_->instance_size();
953  bool success = error == net::OK && instance_size != kPositionNotSpecified;
954
955  if (success) {
956    total_bytes_ = instance_size;
957    loaded_ = true;
958  } else {
959    loader_->Stop();
960  }
961
962  // We need to prevent calling to filter host and running the callback if
963  // we have received the stop signal. We need to lock down the whole callback
964  // method to prevent bad things from happening. The reason behind this is
965  // that we cannot guarantee tasks on render thread have completely stopped
966  // when we receive the Stop() method call. The only way to solve this is to
967  // let tasks on render thread to run but make sure they don't call outside
968  // this object when Stop() method is ever called. Locking this method is safe
969  // because |lock_| is only acquired in tasks on render thread.
970  AutoLock auto_lock(lock_);
971  if (stop_signal_received_)
972    return;
973
974  if (success) {
975    host()->SetTotalBytes(total_bytes_);
976    host()->SetBufferedBytes(total_bytes_);
977    host()->SetLoaded(loaded_);
978  } else {
979    host()->SetError(media::PIPELINE_ERROR_NETWORK);
980  }
981  DoneInitialization_Locked();
982}
983
984void BufferedDataSource::PartialReadStartCallback(int error) {
985  DCHECK(MessageLoop::current() == render_loop_);
986  DCHECK(loader_.get());
987
988  // This callback method is invoked after we have verified the server has
989  // range request capability, so as a safety guard verify again the response
990  // is partial.
991  if (error == net::OK && loader_->partial_response()) {
992    // Once the range request has started successfully, we can proceed with
993    // reading from it.
994    ReadInternal();
995    return;
996  }
997
998  // Stop the resource loader since we have received an error.
999  loader_->Stop();
1000
1001  // We need to prevent calling to filter host and running the callback if
1002  // we have received the stop signal. We need to lock down the whole callback
1003  // method to prevent bad things from happening. The reason behind this is
1004  // that we cannot guarantee tasks on render thread have completely stopped
1005  // when we receive the Stop() method call. So only way to solve this is to
1006  // let tasks on render thread to run but make sure they don't call outside
1007  // this object when Stop() method is ever called. Locking this method is
1008  // safe because |lock_| is only acquired in tasks on render thread.
1009  AutoLock auto_lock(lock_);
1010  if (stop_signal_received_)
1011    return;
1012  DoneRead_Locked(net::ERR_INVALID_RESPONSE);
1013}
1014
1015void BufferedDataSource::ReadCallback(int error) {
1016  DCHECK(MessageLoop::current() == render_loop_);
1017
1018  if (error < 0) {
1019    DCHECK(loader_.get());
1020
1021    // Stop the resource load if it failed.
1022    loader_->Stop();
1023
1024    if (error == net::ERR_CACHE_MISS) {
1025      render_loop_->PostTask(FROM_HERE,
1026          NewRunnableMethod(this, &BufferedDataSource::RestartLoadingTask));
1027      return;
1028    }
1029  }
1030
1031  // We need to prevent calling to filter host and running the callback if
1032  // we have received the stop signal. We need to lock down the whole callback
1033  // method to prevent bad things from happening. The reason behind this is
1034  // that we cannot guarantee tasks on render thread have completely stopped
1035  // when we receive the Stop() method call. So only way to solve this is to
1036  // let tasks on render thread to run but make sure they don't call outside
1037  // this object when Stop() method is ever called. Locking this method is safe
1038  // because |lock_| is only acquired in tasks on render thread.
1039  AutoLock auto_lock(lock_);
1040  if (stop_signal_received_)
1041    return;
1042
1043  if (error > 0) {
1044    // If a position error code is received, read was successful. So copy
1045    // from intermediate read buffer to the target read buffer.
1046    memcpy(read_buffer_, intermediate_read_buffer_.get(), error);
1047  }
1048  DoneRead_Locked(error);
1049}
1050
1051void BufferedDataSource::NetworkEventCallback() {
1052  DCHECK(MessageLoop::current() == render_loop_);
1053  DCHECK(loader_.get());
1054
1055  // In case of non-HTTP request we don't need to report network events,
1056  // so return immediately.
1057  if (loaded_)
1058    return;
1059
1060  bool network_activity = loader_->network_activity();
1061  int64 buffered_last_byte_position = loader_->GetBufferedLastBytePosition();
1062
1063  // If we get an unspecified value, return immediately.
1064  if (buffered_last_byte_position == kPositionNotSpecified)
1065    return;
1066
1067  // We need to prevent calling to filter host and running the callback if
1068  // we have received the stop signal. We need to lock down the whole callback
1069  // method to prevent bad things from happening. The reason behind this is
1070  // that we cannot guarantee tasks on render thread have completely stopped
1071  // when we receive the Stop() method call. So only way to solve this is to
1072  // let tasks on render thread to run but make sure they don't call outside
1073  // this object when Stop() method is ever called. Locking this method is safe
1074  // because |lock_| is only acquired in tasks on render thread.
1075  AutoLock auto_lock(lock_);
1076  if (stop_signal_received_)
1077    return;
1078
1079  if (network_activity != network_activity_) {
1080    network_activity_ = network_activity;
1081    host()->SetNetworkActivity(network_activity);
1082  }
1083  host()->SetBufferedBytes(buffered_last_byte_position + 1);
1084}
1085
1086}  // namespace webkit_glue
1087