1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "media/cast/receiver/frame_receiver.h"
6
7#include <algorithm>
8
9#include "base/big_endian.h"
10#include "base/bind.h"
11#include "base/logging.h"
12#include "base/message_loop/message_loop.h"
13#include "media/cast/cast_environment.h"
14
15namespace {
16const int kMinSchedulingDelayMs = 1;
17}  // namespace
18
19namespace media {
20namespace cast {
21
22FrameReceiver::FrameReceiver(
23    const scoped_refptr<CastEnvironment>& cast_environment,
24    const FrameReceiverConfig& config,
25    EventMediaType event_media_type,
26    PacedPacketSender* const packet_sender)
27    : cast_environment_(cast_environment),
28      packet_parser_(config.incoming_ssrc, config.rtp_payload_type),
29      stats_(cast_environment->Clock()),
30      event_media_type_(event_media_type),
31      event_subscriber_(kReceiverRtcpEventHistorySize, event_media_type),
32      rtp_timebase_(config.frequency),
33      target_playout_delay_(
34          base::TimeDelta::FromMilliseconds(config.rtp_max_delay_ms)),
35      expected_frame_duration_(
36          base::TimeDelta::FromSeconds(1) / config.max_frame_rate),
37      reports_are_scheduled_(false),
38      framer_(cast_environment->Clock(),
39              this,
40              config.incoming_ssrc,
41              true,
42              config.rtp_max_delay_ms * config.max_frame_rate / 1000),
43      rtcp_(RtcpCastMessageCallback(),
44            RtcpRttCallback(),
45            RtcpLogMessageCallback(),
46            cast_environment_->Clock(),
47            packet_sender,
48            config.feedback_ssrc,
49            config.incoming_ssrc),
50      is_waiting_for_consecutive_frame_(false),
51      lip_sync_drift_(ClockDriftSmoother::GetDefaultTimeConstant()),
52      rtcp_interval_(base::TimeDelta::FromMilliseconds(config.rtcp_interval)),
53      weak_factory_(this) {
54  DCHECK_GT(config.rtp_max_delay_ms, 0);
55  DCHECK_GT(config.max_frame_rate, 0);
56  decryptor_.Initialize(config.aes_key, config.aes_iv_mask);
57  cast_environment_->Logging()->AddRawEventSubscriber(&event_subscriber_);
58  memset(frame_id_to_rtp_timestamp_, 0, sizeof(frame_id_to_rtp_timestamp_));
59}
60
61FrameReceiver::~FrameReceiver() {
62  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
63  cast_environment_->Logging()->RemoveRawEventSubscriber(&event_subscriber_);
64}
65
66void FrameReceiver::RequestEncodedFrame(
67    const ReceiveEncodedFrameCallback& callback) {
68  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
69  frame_request_queue_.push_back(callback);
70  EmitAvailableEncodedFrames();
71}
72
73bool FrameReceiver::ProcessPacket(scoped_ptr<Packet> packet) {
74  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
75
76  if (Rtcp::IsRtcpPacket(&packet->front(), packet->size())) {
77    rtcp_.IncomingRtcpPacket(&packet->front(), packet->size());
78  } else {
79    RtpCastHeader rtp_header;
80    const uint8* payload_data;
81    size_t payload_size;
82    if (!packet_parser_.ParsePacket(&packet->front(),
83                                    packet->size(),
84                                    &rtp_header,
85                                    &payload_data,
86                                    &payload_size)) {
87      return false;
88    }
89
90    ProcessParsedPacket(rtp_header, payload_data, payload_size);
91    stats_.UpdateStatistics(rtp_header);
92  }
93
94  if (!reports_are_scheduled_) {
95    ScheduleNextRtcpReport();
96    ScheduleNextCastMessage();
97    reports_are_scheduled_ = true;
98  }
99
100  return true;
101}
102
103// static
104bool FrameReceiver::ParseSenderSsrc(const uint8* packet,
105                                    size_t length,
106                                    uint32* ssrc) {
107  base::BigEndianReader big_endian_reader(
108      reinterpret_cast<const char*>(packet), length);
109  return big_endian_reader.Skip(8) && big_endian_reader.ReadU32(ssrc);
110}
111
112void FrameReceiver::ProcessParsedPacket(const RtpCastHeader& rtp_header,
113                                        const uint8* payload_data,
114                                        size_t payload_size) {
115  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
116
117  const base::TimeTicks now = cast_environment_->Clock()->NowTicks();
118
119  frame_id_to_rtp_timestamp_[rtp_header.frame_id & 0xff] =
120      rtp_header.rtp_timestamp;
121  cast_environment_->Logging()->InsertPacketEvent(
122      now, PACKET_RECEIVED, event_media_type_, rtp_header.rtp_timestamp,
123      rtp_header.frame_id, rtp_header.packet_id, rtp_header.max_packet_id,
124      payload_size);
125
126  bool duplicate = false;
127  const bool complete =
128      framer_.InsertPacket(payload_data, payload_size, rtp_header, &duplicate);
129
130  // Duplicate packets are ignored.
131  if (duplicate)
132    return;
133
134  // Update lip-sync values upon receiving the first packet of each frame, or if
135  // they have never been set yet.
136  if (rtp_header.packet_id == 0 || lip_sync_reference_time_.is_null()) {
137    RtpTimestamp fresh_sync_rtp;
138    base::TimeTicks fresh_sync_reference;
139    if (!rtcp_.GetLatestLipSyncTimes(&fresh_sync_rtp, &fresh_sync_reference)) {
140      // HACK: The sender should have provided Sender Reports before the first
141      // frame was sent.  However, the spec does not currently require this.
142      // Therefore, when the data is missing, the local clock is used to
143      // generate reference timestamps.
144      VLOG(2) << "Lip sync info missing.  Falling-back to local clock.";
145      fresh_sync_rtp = rtp_header.rtp_timestamp;
146      fresh_sync_reference = now;
147    }
148    // |lip_sync_reference_time_| is always incremented according to the time
149    // delta computed from the difference in RTP timestamps.  Then,
150    // |lip_sync_drift_| accounts for clock drift and also smoothes-out any
151    // sudden/discontinuous shifts in the series of reference time values.
152    if (lip_sync_reference_time_.is_null()) {
153      lip_sync_reference_time_ = fresh_sync_reference;
154    } else {
155      lip_sync_reference_time_ += RtpDeltaToTimeDelta(
156          static_cast<int32>(fresh_sync_rtp - lip_sync_rtp_timestamp_),
157          rtp_timebase_);
158    }
159    lip_sync_rtp_timestamp_ = fresh_sync_rtp;
160    lip_sync_drift_.Update(
161        now, fresh_sync_reference - lip_sync_reference_time_);
162  }
163
164  // Another frame is complete from a non-duplicate packet.  Attempt to emit
165  // more frames to satisfy enqueued requests.
166  if (complete)
167    EmitAvailableEncodedFrames();
168}
169
170void FrameReceiver::CastFeedback(const RtcpCastMessage& cast_message) {
171  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
172
173  base::TimeTicks now = cast_environment_->Clock()->NowTicks();
174  RtpTimestamp rtp_timestamp =
175      frame_id_to_rtp_timestamp_[cast_message.ack_frame_id & 0xff];
176  cast_environment_->Logging()->InsertFrameEvent(
177      now, FRAME_ACK_SENT, event_media_type_,
178      rtp_timestamp, cast_message.ack_frame_id);
179
180  ReceiverRtcpEventSubscriber::RtcpEventMultiMap rtcp_events;
181  event_subscriber_.GetRtcpEventsAndReset(&rtcp_events);
182  rtcp_.SendRtcpFromRtpReceiver(&cast_message, target_playout_delay_,
183                                &rtcp_events, NULL);
184}
185
186void FrameReceiver::EmitAvailableEncodedFrames() {
187  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
188
189  while (!frame_request_queue_.empty()) {
190    // Attempt to peek at the next completed frame from the |framer_|.
191    // TODO(miu): We should only be peeking at the metadata, and not copying the
192    // payload yet!  Or, at least, peek using a StringPiece instead of a copy.
193    scoped_ptr<EncodedFrame> encoded_frame(
194        new EncodedFrame());
195    bool is_consecutively_next_frame = false;
196    bool have_multiple_complete_frames = false;
197    if (!framer_.GetEncodedFrame(encoded_frame.get(),
198                                 &is_consecutively_next_frame,
199                                 &have_multiple_complete_frames)) {
200      VLOG(1) << "Wait for more packets to produce a completed frame.";
201      return;  // ProcessParsedPacket() will invoke this method in the future.
202    }
203
204    const base::TimeTicks now = cast_environment_->Clock()->NowTicks();
205    const base::TimeTicks playout_time = GetPlayoutTime(*encoded_frame);
206
207    // If we have multiple decodable frames, and the current frame is
208    // too old, then skip it and decode the next frame instead.
209    if (have_multiple_complete_frames && now > playout_time) {
210      framer_.ReleaseFrame(encoded_frame->frame_id);
211      continue;
212    }
213
214    // If |framer_| has a frame ready that is out of sequence, examine the
215    // playout time to determine whether it's acceptable to continue, thereby
216    // skipping one or more frames.  Skip if the missing frame wouldn't complete
217    // playing before the start of playback of the available frame.
218    if (!is_consecutively_next_frame) {
219      // This assumes that decoding takes as long as playing, which might
220      // not be true.
221      const base::TimeTicks earliest_possible_end_time_of_missing_frame =
222          now + expected_frame_duration_ * 2;
223      if (earliest_possible_end_time_of_missing_frame < playout_time) {
224        VLOG(1) << "Wait for next consecutive frame instead of skipping.";
225        if (!is_waiting_for_consecutive_frame_) {
226          is_waiting_for_consecutive_frame_ = true;
227          cast_environment_->PostDelayedTask(
228              CastEnvironment::MAIN,
229              FROM_HERE,
230              base::Bind(&FrameReceiver::EmitAvailableEncodedFramesAfterWaiting,
231                         weak_factory_.GetWeakPtr()),
232              playout_time - now);
233        }
234        return;
235      }
236    }
237
238    // At this point, we have the complete next frame, or a decodable
239    // frame from somewhere later in the stream, AND we have given up
240    // on waiting for any frames in between, so now we can ACK the frame.
241    framer_.AckFrame(encoded_frame->frame_id);
242
243    // Decrypt the payload data in the frame, if crypto is being used.
244    if (decryptor_.is_activated()) {
245      std::string decrypted_data;
246      if (!decryptor_.Decrypt(encoded_frame->frame_id,
247                              encoded_frame->data,
248                              &decrypted_data)) {
249        // Decryption failed.  Give up on this frame.
250        framer_.ReleaseFrame(encoded_frame->frame_id);
251        continue;
252      }
253      encoded_frame->data.swap(decrypted_data);
254    }
255
256    // At this point, we have a decrypted EncodedFrame ready to be emitted.
257    encoded_frame->reference_time = playout_time;
258    framer_.ReleaseFrame(encoded_frame->frame_id);
259    if (encoded_frame->new_playout_delay_ms) {
260      target_playout_delay_ = base::TimeDelta::FromMilliseconds(
261          encoded_frame->new_playout_delay_ms);
262    }
263    cast_environment_->PostTask(CastEnvironment::MAIN,
264                                FROM_HERE,
265                                base::Bind(&FrameReceiver::EmitOneFrame,
266                                           weak_factory_.GetWeakPtr(),
267                                           frame_request_queue_.front(),
268                                           base::Passed(&encoded_frame)));
269    frame_request_queue_.pop_front();
270  }
271}
272
273void FrameReceiver::EmitAvailableEncodedFramesAfterWaiting() {
274  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
275  DCHECK(is_waiting_for_consecutive_frame_);
276  is_waiting_for_consecutive_frame_ = false;
277  EmitAvailableEncodedFrames();
278}
279
280void FrameReceiver::EmitOneFrame(const ReceiveEncodedFrameCallback& callback,
281                                 scoped_ptr<EncodedFrame> encoded_frame) const {
282  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
283  if (!callback.is_null())
284    callback.Run(encoded_frame.Pass());
285}
286
287base::TimeTicks FrameReceiver::GetPlayoutTime(const EncodedFrame& frame) const {
288  base::TimeDelta target_playout_delay = target_playout_delay_;
289  if (frame.new_playout_delay_ms) {
290    target_playout_delay = base::TimeDelta::FromMilliseconds(
291        frame.new_playout_delay_ms);
292  }
293  return lip_sync_reference_time_ +
294      lip_sync_drift_.Current() +
295      RtpDeltaToTimeDelta(
296          static_cast<int32>(frame.rtp_timestamp - lip_sync_rtp_timestamp_),
297          rtp_timebase_) +
298      target_playout_delay;
299}
300
301void FrameReceiver::ScheduleNextCastMessage() {
302  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
303  base::TimeTicks send_time;
304  framer_.TimeToSendNextCastMessage(&send_time);
305  base::TimeDelta time_to_send =
306      send_time - cast_environment_->Clock()->NowTicks();
307  time_to_send = std::max(
308      time_to_send, base::TimeDelta::FromMilliseconds(kMinSchedulingDelayMs));
309  cast_environment_->PostDelayedTask(
310      CastEnvironment::MAIN,
311      FROM_HERE,
312      base::Bind(&FrameReceiver::SendNextCastMessage,
313                 weak_factory_.GetWeakPtr()),
314      time_to_send);
315}
316
317void FrameReceiver::SendNextCastMessage() {
318  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
319  framer_.SendCastMessage();  // Will only send a message if it is time.
320  ScheduleNextCastMessage();
321}
322
323void FrameReceiver::ScheduleNextRtcpReport() {
324  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
325  base::TimeDelta time_to_next = rtcp_interval_;
326  time_to_next = std::max(
327      time_to_next, base::TimeDelta::FromMilliseconds(kMinSchedulingDelayMs));
328
329  cast_environment_->PostDelayedTask(
330      CastEnvironment::MAIN,
331      FROM_HERE,
332      base::Bind(&FrameReceiver::SendNextRtcpReport,
333                 weak_factory_.GetWeakPtr()),
334      time_to_next);
335}
336
337void FrameReceiver::SendNextRtcpReport() {
338  DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
339  rtcp_.SendRtcpFromRtpReceiver(NULL, base::TimeDelta(), NULL, &stats_);
340  ScheduleNextRtcpReport();
341}
342
343}  // namespace cast
344}  // namespace media
345