frame_receiver.cc revision 46d4c2bc3267f3f028f39e7e311b0f89aba2e4fd
1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "media/cast/receiver/frame_receiver.h"
6
7#include <algorithm>
8
9#include "base/big_endian.h"
10#include "base/bind.h"
11#include "base/logging.h"
12#include "base/message_loop/message_loop.h"
13#include "media/cast/cast_environment.h"
14
15namespace {
16const int kMinSchedulingDelayMs = 1;
17}  // namespace
18
19namespace media {
20namespace cast {
21
22FrameReceiver::FrameReceiver(
23    const scoped_refptr<CastEnvironment>& cast_environment,
24    const FrameReceiverConfig& config,
25    EventMediaType event_media_type,
26    transport::PacedPacketSender* const packet_sender)
27    : cast_environment_(cast_environment),
28      packet_parser_(config.incoming_ssrc, config.rtp_payload_type),
29      stats_(cast_environment->Clock()),
30      event_media_type_(event_media_type),
31      event_subscriber_(kReceiverRtcpEventHistorySize, event_media_type),
32      rtp_timebase_(config.frequency),
33      target_playout_delay_(
34          base::TimeDelta::FromMilliseconds(config.rtp_max_delay_ms)),
35      expected_frame_duration_(
36          base::TimeDelta::FromSeconds(1) / config.max_frame_rate),
37      reports_are_scheduled_(false),
38      framer_(cast_environment->Clock(),
39              this,
40              config.incoming_ssrc,
41              true,
42              config.rtp_max_delay_ms * config.max_frame_rate / 1000),
43      rtcp_(cast_environment_,
44            NULL,
45            NULL,
46            packet_sender,
47            &stats_,
48            config.rtcp_mode,
49            base::TimeDelta::FromMilliseconds(config.rtcp_interval),
50            config.feedback_ssrc,
51            config.incoming_ssrc,
52            config.rtcp_c_name,
53            event_media_type),
54      is_waiting_for_consecutive_frame_(false),
55      lip_sync_drift_(ClockDriftSmoother::GetDefaultTimeConstant()),
56      weak_factory_(this) {
57  DCHECK_GT(config.rtp_max_delay_ms, 0);
58  DCHECK_GT(config.max_frame_rate, 0);
59  decryptor_.Initialize(config.aes_key, config.aes_iv_mask);
60  rtcp_.SetTargetDelay(target_playout_delay_);
61  cast_environment_->Logging()->AddRawEventSubscriber(&event_subscriber_);
62  memset(frame_id_to_rtp_timestamp_, 0, sizeof(frame_id_to_rtp_timestamp_));
63}
64
65FrameReceiver::~FrameReceiver() {
66  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
67  cast_environment_->Logging()->RemoveRawEventSubscriber(&event_subscriber_);
68}
69
70void FrameReceiver::RequestEncodedFrame(
71    const ReceiveEncodedFrameCallback& callback) {
72  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
73  frame_request_queue_.push_back(callback);
74  EmitAvailableEncodedFrames();
75}
76
77bool FrameReceiver::ProcessPacket(scoped_ptr<Packet> packet) {
78  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
79
80  if (Rtcp::IsRtcpPacket(&packet->front(), packet->size())) {
81    rtcp_.IncomingRtcpPacket(&packet->front(), packet->size());
82  } else {
83    RtpCastHeader rtp_header;
84    const uint8* payload_data;
85    size_t payload_size;
86    if (!packet_parser_.ParsePacket(&packet->front(),
87                                    packet->size(),
88                                    &rtp_header,
89                                    &payload_data,
90                                    &payload_size)) {
91      return false;
92    }
93
94    ProcessParsedPacket(rtp_header, payload_data, payload_size);
95    stats_.UpdateStatistics(rtp_header);
96  }
97
98  if (!reports_are_scheduled_) {
99    ScheduleNextRtcpReport();
100    ScheduleNextCastMessage();
101    reports_are_scheduled_ = true;
102  }
103
104  return true;
105}
106
107// static
108bool FrameReceiver::ParseSenderSsrc(const uint8* packet,
109                                    size_t length,
110                                    uint32* ssrc) {
111  base::BigEndianReader big_endian_reader(
112      reinterpret_cast<const char*>(packet), length);
113  return big_endian_reader.Skip(8) && big_endian_reader.ReadU32(ssrc);
114}
115
116void FrameReceiver::ProcessParsedPacket(const RtpCastHeader& rtp_header,
117                                        const uint8* payload_data,
118                                        size_t payload_size) {
119  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
120
121  const base::TimeTicks now = cast_environment_->Clock()->NowTicks();
122
123  frame_id_to_rtp_timestamp_[rtp_header.frame_id & 0xff] =
124      rtp_header.rtp_timestamp;
125  cast_environment_->Logging()->InsertPacketEvent(
126      now, PACKET_RECEIVED, event_media_type_, rtp_header.rtp_timestamp,
127      rtp_header.frame_id, rtp_header.packet_id, rtp_header.max_packet_id,
128      payload_size);
129
130  bool duplicate = false;
131  const bool complete =
132      framer_.InsertPacket(payload_data, payload_size, rtp_header, &duplicate);
133
134  // Duplicate packets are ignored.
135  if (duplicate)
136    return;
137
138  // Update lip-sync values upon receiving the first packet of each frame, or if
139  // they have never been set yet.
140  if (rtp_header.packet_id == 0 || lip_sync_reference_time_.is_null()) {
141    RtpTimestamp fresh_sync_rtp;
142    base::TimeTicks fresh_sync_reference;
143    if (!rtcp_.GetLatestLipSyncTimes(&fresh_sync_rtp, &fresh_sync_reference)) {
144      // HACK: The sender should have provided Sender Reports before the first
145      // frame was sent.  However, the spec does not currently require this.
146      // Therefore, when the data is missing, the local clock is used to
147      // generate reference timestamps.
148      VLOG(2) << "Lip sync info missing.  Falling-back to local clock.";
149      fresh_sync_rtp = rtp_header.rtp_timestamp;
150      fresh_sync_reference = now;
151    }
152    // |lip_sync_reference_time_| is always incremented according to the time
153    // delta computed from the difference in RTP timestamps.  Then,
154    // |lip_sync_drift_| accounts for clock drift and also smoothes-out any
155    // sudden/discontinuous shifts in the series of reference time values.
156    if (lip_sync_reference_time_.is_null()) {
157      lip_sync_reference_time_ = fresh_sync_reference;
158    } else {
159      lip_sync_reference_time_ += RtpDeltaToTimeDelta(
160          static_cast<int32>(fresh_sync_rtp - lip_sync_rtp_timestamp_),
161          rtp_timebase_);
162    }
163    lip_sync_rtp_timestamp_ = fresh_sync_rtp;
164    lip_sync_drift_.Update(
165        now, fresh_sync_reference - lip_sync_reference_time_);
166  }
167
168  // Another frame is complete from a non-duplicate packet.  Attempt to emit
169  // more frames to satisfy enqueued requests.
170  if (complete)
171    EmitAvailableEncodedFrames();
172}
173
174void FrameReceiver::CastFeedback(const RtcpCastMessage& cast_message) {
175  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
176
177  base::TimeTicks now = cast_environment_->Clock()->NowTicks();
178  RtpTimestamp rtp_timestamp =
179      frame_id_to_rtp_timestamp_[cast_message.ack_frame_id_ & 0xff];
180  cast_environment_->Logging()->InsertFrameEvent(
181      now, FRAME_ACK_SENT, event_media_type_,
182      rtp_timestamp, cast_message.ack_frame_id_);
183
184  ReceiverRtcpEventSubscriber::RtcpEventMultiMap rtcp_events;
185  event_subscriber_.GetRtcpEventsAndReset(&rtcp_events);
186  rtcp_.SendRtcpFromRtpReceiver(&cast_message, &rtcp_events);
187}
188
189void FrameReceiver::EmitAvailableEncodedFrames() {
190  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
191
192  while (!frame_request_queue_.empty()) {
193    // Attempt to peek at the next completed frame from the |framer_|.
194    // TODO(miu): We should only be peeking at the metadata, and not copying the
195    // payload yet!  Or, at least, peek using a StringPiece instead of a copy.
196    scoped_ptr<transport::EncodedFrame> encoded_frame(
197        new transport::EncodedFrame());
198    bool is_consecutively_next_frame = false;
199    bool have_multiple_complete_frames = false;
200    if (!framer_.GetEncodedFrame(encoded_frame.get(),
201                                 &is_consecutively_next_frame,
202                                 &have_multiple_complete_frames)) {
203      VLOG(1) << "Wait for more packets to produce a completed frame.";
204      return;  // ProcessParsedPacket() will invoke this method in the future.
205    }
206
207    const base::TimeTicks now = cast_environment_->Clock()->NowTicks();
208    const base::TimeTicks playout_time =
209        GetPlayoutTime(encoded_frame->rtp_timestamp);
210
211    // If we have multiple decodable frames, and the current frame is
212    // too old, then skip it and decode the next frame instead.
213    if (have_multiple_complete_frames && now > playout_time) {
214      framer_.ReleaseFrame(encoded_frame->frame_id);
215      continue;
216    }
217
218    // If |framer_| has a frame ready that is out of sequence, examine the
219    // playout time to determine whether it's acceptable to continue, thereby
220    // skipping one or more frames.  Skip if the missing frame wouldn't complete
221    // playing before the start of playback of the available frame.
222    if (!is_consecutively_next_frame) {
223      // TODO(miu): Also account for expected decode time here?
224      const base::TimeTicks earliest_possible_end_time_of_missing_frame =
225          now + expected_frame_duration_;
226      if (earliest_possible_end_time_of_missing_frame < playout_time) {
227        VLOG(1) << "Wait for next consecutive frame instead of skipping.";
228        if (!is_waiting_for_consecutive_frame_) {
229          is_waiting_for_consecutive_frame_ = true;
230          cast_environment_->PostDelayedTask(
231              CastEnvironment::MAIN,
232              FROM_HERE,
233              base::Bind(&FrameReceiver::EmitAvailableEncodedFramesAfterWaiting,
234                         weak_factory_.GetWeakPtr()),
235              playout_time - now);
236        }
237        return;
238      }
239    }
240
241    // Decrypt the payload data in the frame, if crypto is being used.
242    if (decryptor_.initialized()) {
243      std::string decrypted_data;
244      if (!decryptor_.Decrypt(encoded_frame->frame_id,
245                              encoded_frame->data,
246                              &decrypted_data)) {
247        // Decryption failed.  Give up on this frame.
248        framer_.ReleaseFrame(encoded_frame->frame_id);
249        continue;
250      }
251      encoded_frame->data.swap(decrypted_data);
252    }
253
254    // At this point, we have a decrypted EncodedFrame ready to be emitted.
255    encoded_frame->reference_time = playout_time;
256    framer_.ReleaseFrame(encoded_frame->frame_id);
257    cast_environment_->PostTask(CastEnvironment::MAIN,
258                                FROM_HERE,
259                                base::Bind(frame_request_queue_.front(),
260                                           base::Passed(&encoded_frame)));
261    frame_request_queue_.pop_front();
262  }
263}
264
265void FrameReceiver::EmitAvailableEncodedFramesAfterWaiting() {
266  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
267  DCHECK(is_waiting_for_consecutive_frame_);
268  is_waiting_for_consecutive_frame_ = false;
269  EmitAvailableEncodedFrames();
270}
271
272base::TimeTicks FrameReceiver::GetPlayoutTime(uint32 rtp_timestamp) const {
273  return lip_sync_reference_time_ +
274      lip_sync_drift_.Current() +
275      RtpDeltaToTimeDelta(
276          static_cast<int32>(rtp_timestamp - lip_sync_rtp_timestamp_),
277          rtp_timebase_) +
278      target_playout_delay_;
279}
280
281void FrameReceiver::ScheduleNextCastMessage() {
282  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
283  base::TimeTicks send_time;
284  framer_.TimeToSendNextCastMessage(&send_time);
285  base::TimeDelta time_to_send =
286      send_time - cast_environment_->Clock()->NowTicks();
287  time_to_send = std::max(
288      time_to_send, base::TimeDelta::FromMilliseconds(kMinSchedulingDelayMs));
289  cast_environment_->PostDelayedTask(
290      CastEnvironment::MAIN,
291      FROM_HERE,
292      base::Bind(&FrameReceiver::SendNextCastMessage,
293                 weak_factory_.GetWeakPtr()),
294      time_to_send);
295}
296
297void FrameReceiver::SendNextCastMessage() {
298  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
299  framer_.SendCastMessage();  // Will only send a message if it is time.
300  ScheduleNextCastMessage();
301}
302
303void FrameReceiver::ScheduleNextRtcpReport() {
304  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
305  base::TimeDelta time_to_next = rtcp_.TimeToSendNextRtcpReport() -
306                                 cast_environment_->Clock()->NowTicks();
307
308  time_to_next = std::max(
309      time_to_next, base::TimeDelta::FromMilliseconds(kMinSchedulingDelayMs));
310
311  cast_environment_->PostDelayedTask(
312      CastEnvironment::MAIN,
313      FROM_HERE,
314      base::Bind(&FrameReceiver::SendNextRtcpReport,
315                 weak_factory_.GetWeakPtr()),
316      time_to_next);
317}
318
319void FrameReceiver::SendNextRtcpReport() {
320  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
321  rtcp_.SendRtcpFromRtpReceiver(NULL, NULL);
322  ScheduleNextRtcpReport();
323}
324
325}  // namespace cast
326}  // namespace media
327