pipeline.cc revision 5f1c94371a64b3196d4be9466099bb892df9b88e
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "media/base/pipeline.h"
6
7#include <algorithm>
8
9#include "base/bind.h"
10#include "base/callback.h"
11#include "base/callback_helpers.h"
12#include "base/compiler_specific.h"
13#include "base/location.h"
14#include "base/metrics/histogram.h"
15#include "base/single_thread_task_runner.h"
16#include "base/stl_util.h"
17#include "base/strings/string_number_conversions.h"
18#include "base/strings/string_util.h"
19#include "base/synchronization/condition_variable.h"
20#include "media/base/audio_decoder.h"
21#include "media/base/audio_renderer.h"
22#include "media/base/filter_collection.h"
23#include "media/base/media_log.h"
24#include "media/base/text_renderer.h"
25#include "media/base/text_track_config.h"
26#include "media/base/time_delta_interpolator.h"
27#include "media/base/time_source.h"
28#include "media/base/video_decoder.h"
29#include "media/base/video_decoder_config.h"
30#include "media/base/video_renderer.h"
31
32using base::TimeDelta;
33
34namespace media {
35
36Pipeline::Pipeline(
37    const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
38    MediaLog* media_log)
39    : task_runner_(task_runner),
40      media_log_(media_log),
41      running_(false),
42      did_loading_progress_(false),
43      volume_(1.0f),
44      playback_rate_(0.0f),
45      interpolator_(new TimeDeltaInterpolator(&default_tick_clock_)),
46      interpolation_state_(INTERPOLATION_STOPPED),
47      status_(PIPELINE_OK),
48      state_(kCreated),
49      audio_ended_(false),
50      video_ended_(false),
51      text_ended_(false),
52      audio_buffering_state_(BUFFERING_HAVE_NOTHING),
53      video_buffering_state_(BUFFERING_HAVE_NOTHING),
54      demuxer_(NULL),
55      time_source_(NULL),
56      underflow_disabled_for_testing_(false),
57      weak_factory_(this) {
58  media_log_->AddEvent(media_log_->CreatePipelineStateChangedEvent(kCreated));
59  media_log_->AddEvent(
60      media_log_->CreateEvent(MediaLogEvent::PIPELINE_CREATED));
61  interpolator_->SetBounds(base::TimeDelta(), base::TimeDelta());
62}
63
64Pipeline::~Pipeline() {
65  DCHECK(thread_checker_.CalledOnValidThread())
66      << "Pipeline must be destroyed on same thread that created it";
67  DCHECK(!running_) << "Stop() must complete before destroying object";
68  DCHECK(stop_cb_.is_null());
69  DCHECK(seek_cb_.is_null());
70
71  media_log_->AddEvent(
72      media_log_->CreateEvent(MediaLogEvent::PIPELINE_DESTROYED));
73}
74
75void Pipeline::Start(scoped_ptr<FilterCollection> collection,
76                     const base::Closure& ended_cb,
77                     const PipelineStatusCB& error_cb,
78                     const PipelineStatusCB& seek_cb,
79                     const PipelineMetadataCB& metadata_cb,
80                     const BufferingStateCB& buffering_state_cb,
81                     const base::Closure& duration_change_cb) {
82  DCHECK(!ended_cb.is_null());
83  DCHECK(!error_cb.is_null());
84  DCHECK(!seek_cb.is_null());
85  DCHECK(!metadata_cb.is_null());
86  DCHECK(!buffering_state_cb.is_null());
87
88  base::AutoLock auto_lock(lock_);
89  CHECK(!running_) << "Media pipeline is already running";
90  running_ = true;
91
92  filter_collection_ = collection.Pass();
93  ended_cb_ = ended_cb;
94  error_cb_ = error_cb;
95  seek_cb_ = seek_cb;
96  metadata_cb_ = metadata_cb;
97  buffering_state_cb_ = buffering_state_cb;
98  duration_change_cb_ = duration_change_cb;
99
100  task_runner_->PostTask(
101      FROM_HERE, base::Bind(&Pipeline::StartTask, weak_factory_.GetWeakPtr()));
102}
103
104void Pipeline::Stop(const base::Closure& stop_cb) {
105  DVLOG(2) << __FUNCTION__;
106  task_runner_->PostTask(
107      FROM_HERE,
108      base::Bind(&Pipeline::StopTask, weak_factory_.GetWeakPtr(), stop_cb));
109}
110
111void Pipeline::Seek(TimeDelta time, const PipelineStatusCB& seek_cb) {
112  base::AutoLock auto_lock(lock_);
113  if (!running_) {
114    DLOG(ERROR) << "Media pipeline isn't running. Ignoring Seek().";
115    return;
116  }
117
118  task_runner_->PostTask(
119      FROM_HERE,
120      base::Bind(
121          &Pipeline::SeekTask, weak_factory_.GetWeakPtr(), time, seek_cb));
122}
123
124bool Pipeline::IsRunning() const {
125  base::AutoLock auto_lock(lock_);
126  return running_;
127}
128
129float Pipeline::GetPlaybackRate() const {
130  base::AutoLock auto_lock(lock_);
131  return playback_rate_;
132}
133
134void Pipeline::SetPlaybackRate(float playback_rate) {
135  if (playback_rate < 0.0f)
136    return;
137
138  base::AutoLock auto_lock(lock_);
139  playback_rate_ = playback_rate;
140  if (running_) {
141    task_runner_->PostTask(FROM_HERE,
142                           base::Bind(&Pipeline::PlaybackRateChangedTask,
143                                      weak_factory_.GetWeakPtr(),
144                                      playback_rate));
145  }
146}
147
148float Pipeline::GetVolume() const {
149  base::AutoLock auto_lock(lock_);
150  return volume_;
151}
152
153void Pipeline::SetVolume(float volume) {
154  if (volume < 0.0f || volume > 1.0f)
155    return;
156
157  base::AutoLock auto_lock(lock_);
158  volume_ = volume;
159  if (running_) {
160    task_runner_->PostTask(
161        FROM_HERE,
162        base::Bind(
163            &Pipeline::VolumeChangedTask, weak_factory_.GetWeakPtr(), volume));
164  }
165}
166
167TimeDelta Pipeline::GetMediaTime() const {
168  base::AutoLock auto_lock(lock_);
169  return std::min(interpolator_->GetInterpolatedTime(), duration_);
170}
171
172Ranges<TimeDelta> Pipeline::GetBufferedTimeRanges() const {
173  base::AutoLock auto_lock(lock_);
174  return buffered_time_ranges_;
175}
176
177TimeDelta Pipeline::GetMediaDuration() const {
178  base::AutoLock auto_lock(lock_);
179  return duration_;
180}
181
182bool Pipeline::DidLoadingProgress() {
183  base::AutoLock auto_lock(lock_);
184  bool ret = did_loading_progress_;
185  did_loading_progress_ = false;
186  return ret;
187}
188
189PipelineStatistics Pipeline::GetStatistics() const {
190  base::AutoLock auto_lock(lock_);
191  return statistics_;
192}
193
194void Pipeline::SetTimeDeltaInterpolatorForTesting(
195    TimeDeltaInterpolator* interpolator) {
196  interpolator_.reset(interpolator);
197}
198
199void Pipeline::SetErrorForTesting(PipelineStatus status) {
200  OnError(status);
201}
202
203void Pipeline::SetState(State next_state) {
204  DVLOG(1) << GetStateString(state_) << " -> " << GetStateString(next_state);
205
206  state_ = next_state;
207  media_log_->AddEvent(media_log_->CreatePipelineStateChangedEvent(next_state));
208}
209
210#define RETURN_STRING(state) case state: return #state;
211
212const char* Pipeline::GetStateString(State state) {
213  switch (state) {
214    RETURN_STRING(kCreated);
215    RETURN_STRING(kInitDemuxer);
216    RETURN_STRING(kInitAudioRenderer);
217    RETURN_STRING(kInitVideoRenderer);
218    RETURN_STRING(kSeeking);
219    RETURN_STRING(kPlaying);
220    RETURN_STRING(kStopping);
221    RETURN_STRING(kStopped);
222  }
223  NOTREACHED();
224  return "INVALID";
225}
226
227#undef RETURN_STRING
228
229Pipeline::State Pipeline::GetNextState() const {
230  DCHECK(task_runner_->BelongsToCurrentThread());
231  DCHECK(stop_cb_.is_null())
232      << "State transitions don't happen when stopping";
233  DCHECK_EQ(status_, PIPELINE_OK)
234      << "State transitions don't happen when there's an error: " << status_;
235
236  switch (state_) {
237    case kCreated:
238      return kInitDemuxer;
239
240    case kInitDemuxer:
241      if (demuxer_->GetStream(DemuxerStream::AUDIO))
242        return kInitAudioRenderer;
243      if (demuxer_->GetStream(DemuxerStream::VIDEO))
244        return kInitVideoRenderer;
245      return kPlaying;
246
247    case kInitAudioRenderer:
248      if (demuxer_->GetStream(DemuxerStream::VIDEO))
249        return kInitVideoRenderer;
250      return kPlaying;
251
252    case kInitVideoRenderer:
253      return kPlaying;
254
255    case kSeeking:
256      return kPlaying;
257
258    case kPlaying:
259    case kStopping:
260    case kStopped:
261      break;
262  }
263  NOTREACHED() << "State has no transition: " << state_;
264  return state_;
265}
266
267void Pipeline::OnDemuxerError(PipelineStatus error) {
268  task_runner_->PostTask(FROM_HERE,
269                         base::Bind(&Pipeline::ErrorChangedTask,
270                                    weak_factory_.GetWeakPtr(),
271                                    error));
272}
273
274void Pipeline::AddTextStream(DemuxerStream* text_stream,
275                             const TextTrackConfig& config) {
276  task_runner_->PostTask(FROM_HERE,
277                         base::Bind(&Pipeline::AddTextStreamTask,
278                                    weak_factory_.GetWeakPtr(),
279                                    text_stream,
280                                    config));
281}
282
283void Pipeline::RemoveTextStream(DemuxerStream* text_stream) {
284  task_runner_->PostTask(FROM_HERE,
285                         base::Bind(&Pipeline::RemoveTextStreamTask,
286                                    weak_factory_.GetWeakPtr(),
287                                    text_stream));
288}
289
290void Pipeline::OnError(PipelineStatus error) {
291  DCHECK(task_runner_->BelongsToCurrentThread());
292  DCHECK(IsRunning());
293  DCHECK_NE(PIPELINE_OK, error);
294  VLOG(1) << "Media pipeline error: " << error;
295
296  task_runner_->PostTask(FROM_HERE, base::Bind(
297      &Pipeline::ErrorChangedTask, weak_factory_.GetWeakPtr(), error));
298}
299
300void Pipeline::OnAudioTimeUpdate(TimeDelta time, TimeDelta max_time) {
301  DCHECK(task_runner_->BelongsToCurrentThread());
302  DCHECK_LE(time.InMicroseconds(), max_time.InMicroseconds());
303  base::AutoLock auto_lock(lock_);
304
305  if (interpolation_state_ == INTERPOLATION_WAITING_FOR_AUDIO_TIME_UPDATE &&
306      time < interpolator_->GetInterpolatedTime()) {
307    return;
308  }
309
310  if (state_ == kSeeking)
311    return;
312
313  interpolator_->SetBounds(time, max_time);
314  StartClockIfWaitingForTimeUpdate_Locked();
315}
316
317void Pipeline::OnVideoTimeUpdate(TimeDelta max_time) {
318  DCHECK(task_runner_->BelongsToCurrentThread());
319
320  if (audio_renderer_)
321    return;
322
323  if (state_ == kSeeking)
324    return;
325
326  base::AutoLock auto_lock(lock_);
327  DCHECK_NE(interpolation_state_, INTERPOLATION_WAITING_FOR_AUDIO_TIME_UPDATE);
328  interpolator_->SetUpperBound(max_time);
329}
330
331void Pipeline::SetDuration(TimeDelta duration) {
332  DCHECK(IsRunning());
333  media_log_->AddEvent(
334      media_log_->CreateTimeEvent(
335          MediaLogEvent::DURATION_SET, "duration", duration));
336  UMA_HISTOGRAM_LONG_TIMES("Media.Duration", duration);
337
338  base::AutoLock auto_lock(lock_);
339  duration_ = duration;
340  if (!duration_change_cb_.is_null())
341    duration_change_cb_.Run();
342}
343
344void Pipeline::OnStateTransition(PipelineStatus status) {
345  DCHECK(task_runner_->BelongsToCurrentThread());
346  // Force post to process state transitions after current execution frame.
347  task_runner_->PostTask(
348      FROM_HERE,
349      base::Bind(
350          &Pipeline::StateTransitionTask, weak_factory_.GetWeakPtr(), status));
351}
352
353void Pipeline::StateTransitionTask(PipelineStatus status) {
354  DCHECK(task_runner_->BelongsToCurrentThread());
355
356  // No-op any state transitions if we're stopping.
357  if (state_ == kStopping || state_ == kStopped)
358    return;
359
360  // Preserve existing abnormal status, otherwise update based on the result of
361  // the previous operation.
362  status_ = (status_ != PIPELINE_OK ? status_ : status);
363
364  if (status_ != PIPELINE_OK) {
365    ErrorChangedTask(status_);
366    return;
367  }
368
369  // Guard against accidentally clearing |pending_callbacks_| for states that
370  // use it as well as states that should not be using it.
371  DCHECK_EQ(pending_callbacks_.get() != NULL, state_ == kSeeking);
372
373  pending_callbacks_.reset();
374
375  PipelineStatusCB done_cb =
376      base::Bind(&Pipeline::OnStateTransition, weak_factory_.GetWeakPtr());
377
378  // Switch states, performing any entrance actions for the new state as well.
379  SetState(GetNextState());
380  switch (state_) {
381    case kInitDemuxer:
382      return InitializeDemuxer(done_cb);
383
384    case kInitAudioRenderer:
385      return InitializeAudioRenderer(done_cb);
386
387    case kInitVideoRenderer:
388      return InitializeVideoRenderer(done_cb);
389
390    case kPlaying:
391      // Finish initial start sequence the first time we enter the playing
392      // state.
393      if (filter_collection_) {
394        filter_collection_.reset();
395        if (!audio_renderer_ && !video_renderer_) {
396          ErrorChangedTask(PIPELINE_ERROR_COULD_NOT_RENDER);
397          return;
398        }
399
400        if (audio_renderer_)
401          time_source_ = audio_renderer_->GetTimeSource();
402
403        {
404          PipelineMetadata metadata;
405          metadata.has_audio = audio_renderer_;
406          metadata.has_video = video_renderer_;
407          metadata.timeline_offset = demuxer_->GetTimelineOffset();
408          DemuxerStream* stream = demuxer_->GetStream(DemuxerStream::VIDEO);
409          if (stream) {
410            metadata.natural_size =
411                stream->video_decoder_config().natural_size();
412            metadata.video_rotation = stream->video_rotation();
413          }
414          metadata_cb_.Run(metadata);
415        }
416      }
417
418      base::ResetAndReturn(&seek_cb_).Run(PIPELINE_OK);
419
420      {
421        base::AutoLock auto_lock(lock_);
422        interpolator_->SetBounds(start_timestamp_, start_timestamp_);
423      }
424
425      if (time_source_)
426        time_source_->SetMediaTime(start_timestamp_);
427      if (audio_renderer_)
428        audio_renderer_->StartPlaying();
429      if (video_renderer_)
430        video_renderer_->StartPlaying();
431      if (text_renderer_)
432        text_renderer_->StartPlaying();
433
434      PlaybackRateChangedTask(GetPlaybackRate());
435      VolumeChangedTask(GetVolume());
436      return;
437
438    case kStopping:
439    case kStopped:
440    case kCreated:
441    case kSeeking:
442      NOTREACHED() << "State has no transition: " << state_;
443      return;
444  }
445}
446
447// Note that the usage of base::Unretained() with the audio/video renderers
448// in the following DoXXX() functions is considered safe as they are owned by
449// |pending_callbacks_| and share the same lifetime.
450//
451// That being said, deleting the renderers while keeping |pending_callbacks_|
452// running on the media thread would result in crashes.
453
454#if DCHECK_IS_ON
455static void VerifyBufferingStates(BufferingState* audio_buffering_state,
456                                  BufferingState* video_buffering_state) {
457  DCHECK_EQ(*audio_buffering_state, BUFFERING_HAVE_NOTHING);
458  DCHECK_EQ(*video_buffering_state, BUFFERING_HAVE_NOTHING);
459}
460#endif
461
462void Pipeline::DoSeek(
463    base::TimeDelta seek_timestamp,
464    const PipelineStatusCB& done_cb) {
465  DCHECK(task_runner_->BelongsToCurrentThread());
466  DCHECK(!pending_callbacks_.get());
467  SerialRunner::Queue bound_fns;
468  {
469    base::AutoLock auto_lock(lock_);
470    PauseClockAndStopTicking_Locked();
471  }
472
473  // Pause.
474  if (text_renderer_) {
475    bound_fns.Push(base::Bind(
476        &TextRenderer::Pause, base::Unretained(text_renderer_.get())));
477  }
478
479  // Flush.
480  if (audio_renderer_) {
481    bound_fns.Push(base::Bind(
482        &AudioRenderer::Flush, base::Unretained(audio_renderer_.get())));
483  }
484
485  if (video_renderer_) {
486    bound_fns.Push(base::Bind(
487        &VideoRenderer::Flush, base::Unretained(video_renderer_.get())));
488  }
489
490#if DCHECK_IS_ON
491  // Verify renderers reset their buffering states.
492  bound_fns.Push(base::Bind(&VerifyBufferingStates,
493                            &audio_buffering_state_,
494                            &video_buffering_state_));
495#endif
496
497  if (text_renderer_) {
498    bound_fns.Push(base::Bind(
499        &TextRenderer::Flush, base::Unretained(text_renderer_.get())));
500  }
501
502  // Seek demuxer.
503  bound_fns.Push(base::Bind(
504      &Demuxer::Seek, base::Unretained(demuxer_), seek_timestamp));
505
506  pending_callbacks_ = SerialRunner::Run(bound_fns, done_cb);
507}
508
509void Pipeline::DoStop(const PipelineStatusCB& done_cb) {
510  DVLOG(2) << __FUNCTION__;
511  DCHECK(task_runner_->BelongsToCurrentThread());
512  DCHECK(!pending_callbacks_.get());
513
514  audio_renderer_.reset();
515  video_renderer_.reset();
516  text_renderer_.reset();
517
518  if (demuxer_) {
519    demuxer_->Stop(base::Bind(done_cb, PIPELINE_OK));
520    return;
521  }
522
523  task_runner_->PostTask(FROM_HERE, base::Bind(done_cb, PIPELINE_OK));
524}
525
526void Pipeline::OnStopCompleted(PipelineStatus status) {
527  DVLOG(2) << __FUNCTION__;
528  DCHECK(task_runner_->BelongsToCurrentThread());
529  DCHECK_EQ(state_, kStopping);
530  DCHECK(!audio_renderer_);
531  DCHECK(!video_renderer_);
532  DCHECK(!text_renderer_);
533  {
534    base::AutoLock l(lock_);
535    running_ = false;
536  }
537
538  SetState(kStopped);
539  filter_collection_.reset();
540  demuxer_ = NULL;
541
542  // If we stop during initialization/seeking we want to run |seek_cb_|
543  // followed by |stop_cb_| so we don't leave outstanding callbacks around.
544  if (!seek_cb_.is_null()) {
545    base::ResetAndReturn(&seek_cb_).Run(status_);
546    error_cb_.Reset();
547  }
548  if (!stop_cb_.is_null()) {
549    error_cb_.Reset();
550
551    // Invalid all weak pointers so it's safe to destroy |this| on the render
552    // main thread.
553    weak_factory_.InvalidateWeakPtrs();
554
555    base::ResetAndReturn(&stop_cb_).Run();
556
557    // NOTE: pipeline may be deleted at this point in time as a result of
558    // executing |stop_cb_|.
559    return;
560  }
561  if (!error_cb_.is_null()) {
562    DCHECK_NE(status_, PIPELINE_OK);
563    base::ResetAndReturn(&error_cb_).Run(status_);
564  }
565}
566
567void Pipeline::AddBufferedTimeRange(base::TimeDelta start,
568                                    base::TimeDelta end) {
569  DCHECK(IsRunning());
570  base::AutoLock auto_lock(lock_);
571  buffered_time_ranges_.Add(start, end);
572  did_loading_progress_ = true;
573}
574
575// Called from any thread.
576void Pipeline::OnUpdateStatistics(const PipelineStatistics& stats) {
577  base::AutoLock auto_lock(lock_);
578  statistics_.audio_bytes_decoded += stats.audio_bytes_decoded;
579  statistics_.video_bytes_decoded += stats.video_bytes_decoded;
580  statistics_.video_frames_decoded += stats.video_frames_decoded;
581  statistics_.video_frames_dropped += stats.video_frames_dropped;
582}
583
584void Pipeline::StartTask() {
585  DCHECK(task_runner_->BelongsToCurrentThread());
586
587  CHECK_EQ(kCreated, state_)
588      << "Media pipeline cannot be started more than once";
589
590  text_renderer_ = filter_collection_->GetTextRenderer();
591
592  if (text_renderer_) {
593    text_renderer_->Initialize(
594        base::Bind(&Pipeline::OnTextRendererEnded, weak_factory_.GetWeakPtr()));
595  }
596
597  StateTransitionTask(PIPELINE_OK);
598}
599
600void Pipeline::StopTask(const base::Closure& stop_cb) {
601  DCHECK(task_runner_->BelongsToCurrentThread());
602  DCHECK(stop_cb_.is_null());
603
604  if (state_ == kStopped) {
605    stop_cb.Run();
606    return;
607  }
608
609  stop_cb_ = stop_cb;
610
611  // We may already be stopping due to a runtime error.
612  if (state_ == kStopping)
613    return;
614
615  SetState(kStopping);
616  pending_callbacks_.reset();
617  DoStop(base::Bind(&Pipeline::OnStopCompleted, weak_factory_.GetWeakPtr()));
618}
619
620void Pipeline::ErrorChangedTask(PipelineStatus error) {
621  DCHECK(task_runner_->BelongsToCurrentThread());
622  DCHECK_NE(PIPELINE_OK, error) << "PIPELINE_OK isn't an error!";
623
624  media_log_->AddEvent(media_log_->CreatePipelineErrorEvent(error));
625
626  if (state_ == kStopping || state_ == kStopped)
627    return;
628
629  SetState(kStopping);
630  pending_callbacks_.reset();
631  status_ = error;
632
633  DoStop(base::Bind(&Pipeline::OnStopCompleted, weak_factory_.GetWeakPtr()));
634}
635
636void Pipeline::PlaybackRateChangedTask(float playback_rate) {
637  DCHECK(task_runner_->BelongsToCurrentThread());
638
639  // Playback rate changes are only carried out while playing.
640  if (state_ != kPlaying)
641    return;
642
643  {
644    base::AutoLock auto_lock(lock_);
645    interpolator_->SetPlaybackRate(playback_rate);
646  }
647
648  if (time_source_)
649    time_source_->SetPlaybackRate(playback_rate_);
650}
651
652void Pipeline::VolumeChangedTask(float volume) {
653  DCHECK(task_runner_->BelongsToCurrentThread());
654
655  // Volume changes are only carried out while playing.
656  if (state_ != kPlaying)
657    return;
658
659  if (audio_renderer_)
660    audio_renderer_->SetVolume(volume);
661}
662
663void Pipeline::SeekTask(TimeDelta time, const PipelineStatusCB& seek_cb) {
664  DCHECK(task_runner_->BelongsToCurrentThread());
665  DCHECK(stop_cb_.is_null());
666
667  // Suppress seeking if we're not fully started.
668  if (state_ != kPlaying) {
669    DCHECK(state_ == kStopping || state_ == kStopped)
670        << "Receive extra seek in unexpected state: " << state_;
671
672    // TODO(scherkus): should we run the callback?  I'm tempted to say the API
673    // will only execute the first Seek() request.
674    DVLOG(1) << "Media pipeline has not started, ignoring seek to "
675             << time.InMicroseconds() << " (current state: " << state_ << ")";
676    return;
677  }
678
679  DCHECK(seek_cb_.is_null());
680
681  SetState(kSeeking);
682  seek_cb_ = seek_cb;
683  audio_ended_ = false;
684  video_ended_ = false;
685  text_ended_ = false;
686  start_timestamp_ = time;
687
688  DoSeek(time,
689         base::Bind(&Pipeline::OnStateTransition, weak_factory_.GetWeakPtr()));
690}
691
692void Pipeline::OnAudioRendererEnded() {
693  DCHECK(task_runner_->BelongsToCurrentThread());
694  media_log_->AddEvent(media_log_->CreateEvent(MediaLogEvent::AUDIO_ENDED));
695
696  if (state_ != kPlaying)
697    return;
698
699  DCHECK(!audio_ended_);
700  audio_ended_ = true;
701
702  // Start clock since there is no more audio to trigger clock updates.
703  {
704    base::AutoLock auto_lock(lock_);
705    interpolator_->SetUpperBound(duration_);
706    StartClockIfWaitingForTimeUpdate_Locked();
707  }
708
709  RunEndedCallbackIfNeeded();
710}
711
712void Pipeline::OnVideoRendererEnded() {
713  DCHECK(task_runner_->BelongsToCurrentThread());
714  media_log_->AddEvent(media_log_->CreateEvent(MediaLogEvent::VIDEO_ENDED));
715
716  if (state_ != kPlaying)
717    return;
718
719  DCHECK(!video_ended_);
720  video_ended_ = true;
721
722  RunEndedCallbackIfNeeded();
723}
724
725void Pipeline::OnTextRendererEnded() {
726  DCHECK(task_runner_->BelongsToCurrentThread());
727  media_log_->AddEvent(media_log_->CreateEvent(MediaLogEvent::TEXT_ENDED));
728
729  if (state_ != kPlaying)
730    return;
731
732  DCHECK(!text_ended_);
733  text_ended_ = true;
734
735  RunEndedCallbackIfNeeded();
736}
737
738void Pipeline::RunEndedCallbackIfNeeded() {
739  DCHECK(task_runner_->BelongsToCurrentThread());
740
741  if (audio_renderer_ && !audio_ended_)
742    return;
743
744  if (video_renderer_ && !video_ended_)
745    return;
746
747  if (text_renderer_ && text_renderer_->HasTracks() && !text_ended_)
748    return;
749
750  {
751    base::AutoLock auto_lock(lock_);
752    PauseClockAndStopTicking_Locked();
753    interpolator_->SetBounds(duration_, duration_);
754  }
755
756  DCHECK_EQ(status_, PIPELINE_OK);
757  ended_cb_.Run();
758}
759
760void Pipeline::AddTextStreamTask(DemuxerStream* text_stream,
761                                 const TextTrackConfig& config) {
762  DCHECK(task_runner_->BelongsToCurrentThread());
763  // TODO(matthewjheaney): fix up text_ended_ when text stream
764  // is added (http://crbug.com/321446).
765  text_renderer_->AddTextStream(text_stream, config);
766}
767
768void Pipeline::RemoveTextStreamTask(DemuxerStream* text_stream) {
769  DCHECK(task_runner_->BelongsToCurrentThread());
770  text_renderer_->RemoveTextStream(text_stream);
771}
772
773void Pipeline::InitializeDemuxer(const PipelineStatusCB& done_cb) {
774  DCHECK(task_runner_->BelongsToCurrentThread());
775
776  demuxer_ = filter_collection_->GetDemuxer();
777  demuxer_->Initialize(this, done_cb, text_renderer_);
778}
779
780void Pipeline::InitializeAudioRenderer(const PipelineStatusCB& done_cb) {
781  DCHECK(task_runner_->BelongsToCurrentThread());
782
783  audio_renderer_ = filter_collection_->GetAudioRenderer();
784  base::WeakPtr<Pipeline> weak_this = weak_factory_.GetWeakPtr();
785  audio_renderer_->Initialize(
786      demuxer_->GetStream(DemuxerStream::AUDIO),
787      done_cb,
788      base::Bind(&Pipeline::OnUpdateStatistics, weak_this),
789      base::Bind(&Pipeline::OnAudioTimeUpdate, weak_this),
790      base::Bind(&Pipeline::BufferingStateChanged, weak_this,
791                 &audio_buffering_state_),
792      base::Bind(&Pipeline::OnAudioRendererEnded, weak_this),
793      base::Bind(&Pipeline::OnError, weak_this));
794}
795
796void Pipeline::InitializeVideoRenderer(const PipelineStatusCB& done_cb) {
797  DCHECK(task_runner_->BelongsToCurrentThread());
798
799  video_renderer_ = filter_collection_->GetVideoRenderer();
800  base::WeakPtr<Pipeline> weak_this = weak_factory_.GetWeakPtr();
801  video_renderer_->Initialize(
802      demuxer_->GetStream(DemuxerStream::VIDEO),
803      demuxer_->GetLiveness() == Demuxer::LIVENESS_LIVE,
804      done_cb,
805      base::Bind(&Pipeline::OnUpdateStatistics, weak_this),
806      base::Bind(&Pipeline::OnVideoTimeUpdate, weak_this),
807      base::Bind(&Pipeline::BufferingStateChanged, weak_this,
808                 &video_buffering_state_),
809      base::Bind(&Pipeline::OnVideoRendererEnded, weak_this),
810      base::Bind(&Pipeline::OnError, weak_this),
811      base::Bind(&Pipeline::GetMediaTime, base::Unretained(this)),
812      base::Bind(&Pipeline::GetMediaDuration, base::Unretained(this)));
813}
814
815void Pipeline::BufferingStateChanged(BufferingState* buffering_state,
816                                     BufferingState new_buffering_state) {
817  DVLOG(1) << __FUNCTION__ << "(" << *buffering_state << ", "
818           << " " << new_buffering_state << ") "
819           << (buffering_state == &audio_buffering_state_ ? "audio" : "video");
820  DCHECK(task_runner_->BelongsToCurrentThread());
821  bool was_waiting_for_enough_data = WaitingForEnoughData();
822
823  *buffering_state = new_buffering_state;
824
825  // Disable underflow by ignoring updates that renderers have ran out of data
826  // after we have started the clock.
827  if (state_ == kPlaying && underflow_disabled_for_testing_ &&
828      interpolation_state_ != INTERPOLATION_STOPPED) {
829    return;
830  }
831
832  // Renderer underflowed.
833  if (!was_waiting_for_enough_data && WaitingForEnoughData()) {
834    PausePlayback();
835
836    // TODO(scherkus): Fire BUFFERING_HAVE_NOTHING callback to alert clients of
837    // underflow state http://crbug.com/144683
838    return;
839  }
840
841  // Renderer prerolled.
842  if (was_waiting_for_enough_data && !WaitingForEnoughData()) {
843    StartPlayback();
844    buffering_state_cb_.Run(BUFFERING_HAVE_ENOUGH);
845    return;
846  }
847}
848
849bool Pipeline::WaitingForEnoughData() const {
850  DCHECK(task_runner_->BelongsToCurrentThread());
851  if (state_ != kPlaying)
852    return false;
853  if (audio_renderer_ && audio_buffering_state_ != BUFFERING_HAVE_ENOUGH)
854    return true;
855  if (video_renderer_ && video_buffering_state_ != BUFFERING_HAVE_ENOUGH)
856    return true;
857  return false;
858}
859
860void Pipeline::PausePlayback() {
861  DVLOG(1) << __FUNCTION__;
862  DCHECK_EQ(state_, kPlaying);
863  DCHECK(WaitingForEnoughData());
864  DCHECK(task_runner_->BelongsToCurrentThread());
865
866  base::AutoLock auto_lock(lock_);
867  PauseClockAndStopTicking_Locked();
868}
869
870void Pipeline::StartPlayback() {
871  DVLOG(1) << __FUNCTION__;
872  DCHECK_EQ(state_, kPlaying);
873  DCHECK_EQ(interpolation_state_, INTERPOLATION_STOPPED);
874  DCHECK(!WaitingForEnoughData());
875  DCHECK(task_runner_->BelongsToCurrentThread());
876
877  if (time_source_) {
878    // We use audio stream to update the clock. So if there is such a
879    // stream, we pause the clock until we receive a valid timestamp.
880    base::AutoLock auto_lock(lock_);
881    interpolation_state_ = INTERPOLATION_WAITING_FOR_AUDIO_TIME_UPDATE;
882    time_source_->StartTicking();
883  } else {
884    base::AutoLock auto_lock(lock_);
885    interpolation_state_ = INTERPOLATION_STARTED;
886    interpolator_->SetUpperBound(duration_);
887    interpolator_->StartInterpolating();
888  }
889}
890
891void Pipeline::PauseClockAndStopTicking_Locked() {
892  lock_.AssertAcquired();
893  switch (interpolation_state_) {
894    case INTERPOLATION_STOPPED:
895      return;
896
897    case INTERPOLATION_WAITING_FOR_AUDIO_TIME_UPDATE:
898      time_source_->StopTicking();
899      break;
900
901    case INTERPOLATION_STARTED:
902      if (time_source_)
903        time_source_->StopTicking();
904      interpolator_->StopInterpolating();
905      break;
906  }
907
908  interpolation_state_ = INTERPOLATION_STOPPED;
909}
910
911void Pipeline::StartClockIfWaitingForTimeUpdate_Locked() {
912  lock_.AssertAcquired();
913  if (interpolation_state_ != INTERPOLATION_WAITING_FOR_AUDIO_TIME_UPDATE)
914    return;
915
916  interpolation_state_ = INTERPOLATION_STARTED;
917  interpolator_->StartInterpolating();
918}
919
920}  // namespace media
921