1/*
2 *  Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
3 *
4 *  Use of this source code is governed by a BSD-style license
5 *  that can be found in the LICENSE file in the root of the source
6 *  tree. An additional intellectual property rights grant can be found
7 *  in the file PATENTS.  All contributing project authors may
8 *  be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "webrtc/modules/rtp_rtcp/source/receive_statistics_impl.h"
12
13#include <math.h>
14
15#include "webrtc/modules/rtp_rtcp/source/bitrate.h"
16#include "webrtc/modules/rtp_rtcp/source/rtp_utility.h"
17#include "webrtc/system_wrappers/interface/critical_section_wrapper.h"
18#include "webrtc/system_wrappers/interface/scoped_ptr.h"
19
20namespace webrtc {
21
22const int64_t kStatisticsTimeoutMs = 8000;
23const int kStatisticsProcessIntervalMs = 1000;
24
25StreamStatistician::~StreamStatistician() {}
26
27StreamStatisticianImpl::StreamStatisticianImpl(
28    Clock* clock,
29    RtcpStatisticsCallback* rtcp_callback,
30    StreamDataCountersCallback* rtp_callback)
31    : clock_(clock),
32      stream_lock_(CriticalSectionWrapper::CreateCriticalSection()),
33      incoming_bitrate_(clock, NULL),
34      ssrc_(0),
35      max_reordering_threshold_(kDefaultMaxReorderingThreshold),
36      jitter_q4_(0),
37      cumulative_loss_(0),
38      jitter_q4_transmission_time_offset_(0),
39      last_receive_time_ms_(0),
40      last_receive_time_secs_(0),
41      last_receive_time_frac_(0),
42      last_received_timestamp_(0),
43      last_received_transmission_time_offset_(0),
44      received_seq_first_(0),
45      received_seq_max_(0),
46      received_seq_wraps_(0),
47      received_packet_overhead_(12),
48      last_report_inorder_packets_(0),
49      last_report_old_packets_(0),
50      last_report_seq_max_(0),
51      rtcp_callback_(rtcp_callback),
52      rtp_callback_(rtp_callback) {}
53
54void StreamStatisticianImpl::ResetStatistics() {
55  CriticalSectionScoped cs(stream_lock_.get());
56  last_report_inorder_packets_ = 0;
57  last_report_old_packets_ = 0;
58  last_report_seq_max_ = 0;
59  last_reported_statistics_ = RtcpStatistics();
60  jitter_q4_ = 0;
61  cumulative_loss_ = 0;
62  jitter_q4_transmission_time_offset_ = 0;
63  received_seq_wraps_ = 0;
64  received_seq_max_ = 0;
65  received_seq_first_ = 0;
66  receive_counters_ = StreamDataCounters();
67}
68
69void StreamStatisticianImpl::IncomingPacket(const RTPHeader& header,
70                                            size_t bytes,
71                                            bool retransmitted) {
72  UpdateCounters(header, bytes, retransmitted);
73  NotifyRtpCallback();
74}
75
76void StreamStatisticianImpl::UpdateCounters(const RTPHeader& header,
77                                            size_t bytes,
78                                            bool retransmitted) {
79  CriticalSectionScoped cs(stream_lock_.get());
80  bool in_order = InOrderPacketInternal(header.sequenceNumber);
81  ssrc_ = header.ssrc;
82  incoming_bitrate_.Update(bytes);
83  receive_counters_.bytes +=
84      bytes - (header.paddingLength + header.headerLength);
85  receive_counters_.header_bytes += header.headerLength;
86  receive_counters_.padding_bytes += header.paddingLength;
87  ++receive_counters_.packets;
88  if (!in_order && retransmitted) {
89    ++receive_counters_.retransmitted_packets;
90  }
91
92  if (receive_counters_.packets == 1) {
93    received_seq_first_ = header.sequenceNumber;
94  }
95
96  // Count only the new packets received. That is, if packets 1, 2, 3, 5, 4, 6
97  // are received, 4 will be ignored.
98  if (in_order) {
99    // Current time in samples.
100    uint32_t receive_time_secs;
101    uint32_t receive_time_frac;
102    clock_->CurrentNtp(receive_time_secs, receive_time_frac);
103
104    // Wrong if we use RetransmitOfOldPacket.
105    if (receive_counters_.packets > 1 &&
106        received_seq_max_ > header.sequenceNumber) {
107      // Wrap around detected.
108      received_seq_wraps_++;
109    }
110    // New max.
111    received_seq_max_ = header.sequenceNumber;
112
113    // If new time stamp and more than one in-order packet received, calculate
114    // new jitter statistics.
115    if (header.timestamp != last_received_timestamp_ &&
116        (receive_counters_.packets - receive_counters_.retransmitted_packets) >
117            1) {
118      UpdateJitter(header, receive_time_secs, receive_time_frac);
119    }
120    last_received_timestamp_ = header.timestamp;
121    last_receive_time_secs_ = receive_time_secs;
122    last_receive_time_frac_ = receive_time_frac;
123    last_receive_time_ms_ = clock_->TimeInMilliseconds();
124  }
125
126  uint16_t packet_oh = header.headerLength + header.paddingLength;
127
128  // Our measured overhead. Filter from RFC 5104 4.2.1.2:
129  // avg_OH (new) = 15/16*avg_OH (old) + 1/16*pckt_OH,
130  received_packet_overhead_ = (15 * received_packet_overhead_ + packet_oh) >> 4;
131}
132
133void StreamStatisticianImpl::UpdateJitter(const RTPHeader& header,
134                                          uint32_t receive_time_secs,
135                                          uint32_t receive_time_frac) {
136  uint32_t receive_time_rtp = RtpUtility::ConvertNTPTimeToRTP(
137      receive_time_secs, receive_time_frac, header.payload_type_frequency);
138  uint32_t last_receive_time_rtp =
139      RtpUtility::ConvertNTPTimeToRTP(last_receive_time_secs_,
140                                      last_receive_time_frac_,
141                                      header.payload_type_frequency);
142  int32_t time_diff_samples = (receive_time_rtp - last_receive_time_rtp) -
143      (header.timestamp - last_received_timestamp_);
144
145  time_diff_samples = abs(time_diff_samples);
146
147  // lib_jingle sometimes deliver crazy jumps in TS for the same stream.
148  // If this happens, don't update jitter value. Use 5 secs video frequency
149  // as the threshold.
150  if (time_diff_samples < 450000) {
151    // Note we calculate in Q4 to avoid using float.
152    int32_t jitter_diff_q4 = (time_diff_samples << 4) - jitter_q4_;
153    jitter_q4_ += ((jitter_diff_q4 + 8) >> 4);
154  }
155
156  // Extended jitter report, RFC 5450.
157  // Actual network jitter, excluding the source-introduced jitter.
158  int32_t time_diff_samples_ext =
159    (receive_time_rtp - last_receive_time_rtp) -
160    ((header.timestamp +
161      header.extension.transmissionTimeOffset) -
162     (last_received_timestamp_ +
163      last_received_transmission_time_offset_));
164
165  time_diff_samples_ext = abs(time_diff_samples_ext);
166
167  if (time_diff_samples_ext < 450000) {
168    int32_t jitter_diffQ4TransmissionTimeOffset =
169      (time_diff_samples_ext << 4) - jitter_q4_transmission_time_offset_;
170    jitter_q4_transmission_time_offset_ +=
171      ((jitter_diffQ4TransmissionTimeOffset + 8) >> 4);
172  }
173}
174
175void StreamStatisticianImpl::NotifyRtpCallback() {
176  StreamDataCounters data;
177  uint32_t ssrc;
178  {
179    CriticalSectionScoped cs(stream_lock_.get());
180    data = receive_counters_;
181    ssrc = ssrc_;
182  }
183  rtp_callback_->DataCountersUpdated(data, ssrc);
184}
185
186void StreamStatisticianImpl::NotifyRtcpCallback() {
187  RtcpStatistics data;
188  uint32_t ssrc;
189  {
190    CriticalSectionScoped cs(stream_lock_.get());
191    data = last_reported_statistics_;
192    ssrc = ssrc_;
193  }
194  rtcp_callback_->StatisticsUpdated(data, ssrc);
195}
196
197void StreamStatisticianImpl::FecPacketReceived() {
198  {
199    CriticalSectionScoped cs(stream_lock_.get());
200    ++receive_counters_.fec_packets;
201  }
202  NotifyRtpCallback();
203}
204
205void StreamStatisticianImpl::SetMaxReorderingThreshold(
206    int max_reordering_threshold) {
207  CriticalSectionScoped cs(stream_lock_.get());
208  max_reordering_threshold_ = max_reordering_threshold;
209}
210
211bool StreamStatisticianImpl::GetStatistics(RtcpStatistics* statistics,
212                                           bool reset) {
213  {
214    CriticalSectionScoped cs(stream_lock_.get());
215    if (received_seq_first_ == 0 && receive_counters_.bytes == 0) {
216      // We have not received anything.
217      return false;
218    }
219
220    if (!reset) {
221      if (last_report_inorder_packets_ == 0) {
222        // No report.
223        return false;
224      }
225      // Just get last report.
226      *statistics = last_reported_statistics_;
227      return true;
228    }
229
230    *statistics = CalculateRtcpStatistics();
231  }
232
233  NotifyRtcpCallback();
234
235  return true;
236}
237
238RtcpStatistics StreamStatisticianImpl::CalculateRtcpStatistics() {
239  RtcpStatistics stats;
240
241  if (last_report_inorder_packets_ == 0) {
242    // First time we send a report.
243    last_report_seq_max_ = received_seq_first_ - 1;
244  }
245
246  // Calculate fraction lost.
247  uint16_t exp_since_last = (received_seq_max_ - last_report_seq_max_);
248
249  if (last_report_seq_max_ > received_seq_max_) {
250    // Can we assume that the seq_num can't go decrease over a full RTCP period?
251    exp_since_last = 0;
252  }
253
254  // Number of received RTP packets since last report, counts all packets but
255  // not re-transmissions.
256  uint32_t rec_since_last =
257      (receive_counters_.packets - receive_counters_.retransmitted_packets) -
258      last_report_inorder_packets_;
259
260  // With NACK we don't know the expected retransmissions during the last
261  // second. We know how many "old" packets we have received. We just count
262  // the number of old received to estimate the loss, but it still does not
263  // guarantee an exact number since we run this based on time triggered by
264  // sending of an RTP packet. This should have a minimum effect.
265
266  // With NACK we don't count old packets as received since they are
267  // re-transmitted. We use RTT to decide if a packet is re-ordered or
268  // re-transmitted.
269  uint32_t retransmitted_packets =
270      receive_counters_.retransmitted_packets - last_report_old_packets_;
271  rec_since_last += retransmitted_packets;
272
273  int32_t missing = 0;
274  if (exp_since_last > rec_since_last) {
275    missing = (exp_since_last - rec_since_last);
276  }
277  uint8_t local_fraction_lost = 0;
278  if (exp_since_last) {
279    // Scale 0 to 255, where 255 is 100% loss.
280    local_fraction_lost =
281        static_cast<uint8_t>(255 * missing / exp_since_last);
282  }
283  stats.fraction_lost = local_fraction_lost;
284
285  // We need a counter for cumulative loss too.
286  cumulative_loss_ += missing;
287  stats.cumulative_lost = cumulative_loss_;
288  stats.extended_max_sequence_number =
289      (received_seq_wraps_ << 16) + received_seq_max_;
290  // Note: internal jitter value is in Q4 and needs to be scaled by 1/16.
291  stats.jitter = jitter_q4_ >> 4;
292
293  // Store this report.
294  last_reported_statistics_ = stats;
295
296  // Only for report blocks in RTCP SR and RR.
297  last_report_inorder_packets_ =
298      receive_counters_.packets - receive_counters_.retransmitted_packets;
299  last_report_old_packets_ = receive_counters_.retransmitted_packets;
300  last_report_seq_max_ = received_seq_max_;
301
302  return stats;
303}
304
305void StreamStatisticianImpl::GetDataCounters(
306    uint32_t* bytes_received, uint32_t* packets_received) const {
307  CriticalSectionScoped cs(stream_lock_.get());
308  if (bytes_received) {
309    *bytes_received = receive_counters_.bytes + receive_counters_.header_bytes +
310                      receive_counters_.padding_bytes;
311  }
312  if (packets_received) {
313    *packets_received = receive_counters_.packets;
314  }
315}
316
317uint32_t StreamStatisticianImpl::BitrateReceived() const {
318  CriticalSectionScoped cs(stream_lock_.get());
319  return incoming_bitrate_.BitrateNow();
320}
321
322void StreamStatisticianImpl::ProcessBitrate() {
323  CriticalSectionScoped cs(stream_lock_.get());
324  incoming_bitrate_.Process();
325}
326
327void StreamStatisticianImpl::LastReceiveTimeNtp(uint32_t* secs,
328                                                uint32_t* frac) const {
329  CriticalSectionScoped cs(stream_lock_.get());
330  *secs = last_receive_time_secs_;
331  *frac = last_receive_time_frac_;
332}
333
334bool StreamStatisticianImpl::IsRetransmitOfOldPacket(
335    const RTPHeader& header, int min_rtt) const {
336  CriticalSectionScoped cs(stream_lock_.get());
337  if (InOrderPacketInternal(header.sequenceNumber)) {
338    return false;
339  }
340  uint32_t frequency_khz = header.payload_type_frequency / 1000;
341  assert(frequency_khz > 0);
342
343  int64_t time_diff_ms = clock_->TimeInMilliseconds() -
344      last_receive_time_ms_;
345
346  // Diff in time stamp since last received in order.
347  uint32_t timestamp_diff = header.timestamp - last_received_timestamp_;
348  int32_t rtp_time_stamp_diff_ms = static_cast<int32_t>(timestamp_diff) /
349      frequency_khz;
350
351  int32_t max_delay_ms = 0;
352  if (min_rtt == 0) {
353    // Jitter standard deviation in samples.
354    float jitter_std = sqrt(static_cast<float>(jitter_q4_ >> 4));
355
356    // 2 times the standard deviation => 95% confidence.
357    // And transform to milliseconds by dividing by the frequency in kHz.
358    max_delay_ms = static_cast<int32_t>((2 * jitter_std) / frequency_khz);
359
360    // Min max_delay_ms is 1.
361    if (max_delay_ms == 0) {
362      max_delay_ms = 1;
363    }
364  } else {
365    max_delay_ms = (min_rtt / 3) + 1;
366  }
367  return time_diff_ms > rtp_time_stamp_diff_ms + max_delay_ms;
368}
369
370bool StreamStatisticianImpl::IsPacketInOrder(uint16_t sequence_number) const {
371  CriticalSectionScoped cs(stream_lock_.get());
372  return InOrderPacketInternal(sequence_number);
373}
374
375bool StreamStatisticianImpl::InOrderPacketInternal(
376    uint16_t sequence_number) const {
377  // First packet is always in order.
378  if (last_receive_time_ms_ == 0)
379    return true;
380
381  if (IsNewerSequenceNumber(sequence_number, received_seq_max_)) {
382    return true;
383  } else {
384    // If we have a restart of the remote side this packet is still in order.
385    return !IsNewerSequenceNumber(sequence_number, received_seq_max_ -
386                                  max_reordering_threshold_);
387  }
388}
389
390ReceiveStatistics* ReceiveStatistics::Create(Clock* clock) {
391  return new ReceiveStatisticsImpl(clock);
392}
393
394ReceiveStatisticsImpl::ReceiveStatisticsImpl(Clock* clock)
395    : clock_(clock),
396      receive_statistics_lock_(CriticalSectionWrapper::CreateCriticalSection()),
397      last_rate_update_ms_(0),
398      rtcp_stats_callback_(NULL),
399      rtp_stats_callback_(NULL) {}
400
401ReceiveStatisticsImpl::~ReceiveStatisticsImpl() {
402  while (!statisticians_.empty()) {
403    delete statisticians_.begin()->second;
404    statisticians_.erase(statisticians_.begin());
405  }
406}
407
408void ReceiveStatisticsImpl::IncomingPacket(const RTPHeader& header,
409                                           size_t bytes,
410                                           bool retransmitted) {
411  StreamStatisticianImpl* impl;
412  {
413    CriticalSectionScoped cs(receive_statistics_lock_.get());
414    StatisticianImplMap::iterator it = statisticians_.find(header.ssrc);
415    if (it != statisticians_.end()) {
416      impl = it->second;
417    } else {
418      impl = new StreamStatisticianImpl(clock_, this, this);
419      statisticians_[header.ssrc] = impl;
420    }
421  }
422  // StreamStatisticianImpl instance is created once and only destroyed when
423  // this whole ReceiveStatisticsImpl is destroyed. StreamStatisticianImpl has
424  // it's own locking so don't hold receive_statistics_lock_ (potential
425  // deadlock).
426  impl->IncomingPacket(header, bytes, retransmitted);
427}
428
429void ReceiveStatisticsImpl::FecPacketReceived(uint32_t ssrc) {
430  CriticalSectionScoped cs(receive_statistics_lock_.get());
431  StatisticianImplMap::iterator it = statisticians_.find(ssrc);
432  // Ignore FEC if it is the first packet.
433  if (it != statisticians_.end()) {
434    it->second->FecPacketReceived();
435  }
436}
437
438StatisticianMap ReceiveStatisticsImpl::GetActiveStatisticians() const {
439  CriticalSectionScoped cs(receive_statistics_lock_.get());
440  StatisticianMap active_statisticians;
441  for (StatisticianImplMap::const_iterator it = statisticians_.begin();
442       it != statisticians_.end(); ++it) {
443    uint32_t secs;
444    uint32_t frac;
445    it->second->LastReceiveTimeNtp(&secs, &frac);
446    if (clock_->CurrentNtpInMilliseconds() -
447        Clock::NtpToMs(secs, frac) < kStatisticsTimeoutMs) {
448      active_statisticians[it->first] = it->second;
449    }
450  }
451  return active_statisticians;
452}
453
454StreamStatistician* ReceiveStatisticsImpl::GetStatistician(
455    uint32_t ssrc) const {
456  CriticalSectionScoped cs(receive_statistics_lock_.get());
457  StatisticianImplMap::const_iterator it = statisticians_.find(ssrc);
458  if (it == statisticians_.end())
459    return NULL;
460  return it->second;
461}
462
463void ReceiveStatisticsImpl::SetMaxReorderingThreshold(
464    int max_reordering_threshold) {
465  CriticalSectionScoped cs(receive_statistics_lock_.get());
466  for (StatisticianImplMap::iterator it = statisticians_.begin();
467       it != statisticians_.end(); ++it) {
468    it->second->SetMaxReorderingThreshold(max_reordering_threshold);
469  }
470}
471
472int32_t ReceiveStatisticsImpl::Process() {
473  CriticalSectionScoped cs(receive_statistics_lock_.get());
474  for (StatisticianImplMap::iterator it = statisticians_.begin();
475       it != statisticians_.end(); ++it) {
476    it->second->ProcessBitrate();
477  }
478  last_rate_update_ms_ = clock_->TimeInMilliseconds();
479  return 0;
480}
481
482int32_t ReceiveStatisticsImpl::TimeUntilNextProcess() {
483  CriticalSectionScoped cs(receive_statistics_lock_.get());
484  int time_since_last_update = clock_->TimeInMilliseconds() -
485      last_rate_update_ms_;
486  return std::max(kStatisticsProcessIntervalMs - time_since_last_update, 0);
487}
488
489void ReceiveStatisticsImpl::RegisterRtcpStatisticsCallback(
490    RtcpStatisticsCallback* callback) {
491  CriticalSectionScoped cs(receive_statistics_lock_.get());
492  if (callback != NULL)
493    assert(rtcp_stats_callback_ == NULL);
494  rtcp_stats_callback_ = callback;
495}
496
497void ReceiveStatisticsImpl::StatisticsUpdated(const RtcpStatistics& statistics,
498                                              uint32_t ssrc) {
499  CriticalSectionScoped cs(receive_statistics_lock_.get());
500  if (rtcp_stats_callback_) {
501    rtcp_stats_callback_->StatisticsUpdated(statistics, ssrc);
502  }
503}
504
505void ReceiveStatisticsImpl::RegisterRtpStatisticsCallback(
506    StreamDataCountersCallback* callback) {
507  CriticalSectionScoped cs(receive_statistics_lock_.get());
508  if (callback != NULL)
509    assert(rtp_stats_callback_ == NULL);
510  rtp_stats_callback_ = callback;
511}
512
513void ReceiveStatisticsImpl::DataCountersUpdated(const StreamDataCounters& stats,
514                                                uint32_t ssrc) {
515  CriticalSectionScoped cs(receive_statistics_lock_.get());
516  if (rtp_stats_callback_) {
517    rtp_stats_callback_->DataCountersUpdated(stats, ssrc);
518  }
519}
520
521void NullReceiveStatistics::IncomingPacket(const RTPHeader& rtp_header,
522                                           size_t bytes,
523                                           bool retransmitted) {}
524
525void NullReceiveStatistics::FecPacketReceived(uint32_t ssrc) {}
526
527StatisticianMap NullReceiveStatistics::GetActiveStatisticians() const {
528  return StatisticianMap();
529}
530
531StreamStatistician* NullReceiveStatistics::GetStatistician(
532    uint32_t ssrc) const {
533  return NULL;
534}
535
536void NullReceiveStatistics::SetMaxReorderingThreshold(
537    int max_reordering_threshold) {}
538
539int32_t NullReceiveStatistics::TimeUntilNextProcess() { return 0; }
540
541int32_t NullReceiveStatistics::Process() { return 0; }
542
543void NullReceiveStatistics::RegisterRtcpStatisticsCallback(
544    RtcpStatisticsCallback* callback) {}
545
546void NullReceiveStatistics::RegisterRtpStatisticsCallback(
547    StreamDataCountersCallback* callback) {}
548
549}  // namespace webrtc
550