1/*
2 *  Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
3 *
4 *  Use of this source code is governed by a BSD-style license
5 *  that can be found in the LICENSE file in the root of the source
6 *  tree. An additional intellectual property rights grant can be found
7 *  in the file PATENTS.  All contributing project authors may
8 *  be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "webrtc/modules/rtp_rtcp/source/receive_statistics_impl.h"
12
13#include <math.h>
14
15#include "webrtc/base/scoped_ptr.h"
16#include "webrtc/modules/rtp_rtcp/source/bitrate.h"
17#include "webrtc/modules/rtp_rtcp/source/time_util.h"
18#include "webrtc/system_wrappers/include/critical_section_wrapper.h"
19
20namespace webrtc {
21
22const int64_t kStatisticsTimeoutMs = 8000;
23const int64_t kStatisticsProcessIntervalMs = 1000;
24
25StreamStatistician::~StreamStatistician() {}
26
27StreamStatisticianImpl::StreamStatisticianImpl(
28    Clock* clock,
29    RtcpStatisticsCallback* rtcp_callback,
30    StreamDataCountersCallback* rtp_callback)
31    : clock_(clock),
32      stream_lock_(CriticalSectionWrapper::CreateCriticalSection()),
33      incoming_bitrate_(clock, NULL),
34      ssrc_(0),
35      max_reordering_threshold_(kDefaultMaxReorderingThreshold),
36      jitter_q4_(0),
37      cumulative_loss_(0),
38      jitter_q4_transmission_time_offset_(0),
39      last_receive_time_ms_(0),
40      last_received_timestamp_(0),
41      last_received_transmission_time_offset_(0),
42      received_seq_first_(0),
43      received_seq_max_(0),
44      received_seq_wraps_(0),
45      received_packet_overhead_(12),
46      last_report_inorder_packets_(0),
47      last_report_old_packets_(0),
48      last_report_seq_max_(0),
49      rtcp_callback_(rtcp_callback),
50      rtp_callback_(rtp_callback) {}
51
52void StreamStatisticianImpl::IncomingPacket(const RTPHeader& header,
53                                            size_t packet_length,
54                                            bool retransmitted) {
55  UpdateCounters(header, packet_length, retransmitted);
56  NotifyRtpCallback();
57}
58
59void StreamStatisticianImpl::UpdateCounters(const RTPHeader& header,
60                                            size_t packet_length,
61                                            bool retransmitted) {
62  CriticalSectionScoped cs(stream_lock_.get());
63  bool in_order = InOrderPacketInternal(header.sequenceNumber);
64  ssrc_ = header.ssrc;
65  incoming_bitrate_.Update(packet_length);
66  receive_counters_.transmitted.AddPacket(packet_length, header);
67  if (!in_order && retransmitted) {
68    receive_counters_.retransmitted.AddPacket(packet_length, header);
69  }
70
71  if (receive_counters_.transmitted.packets == 1) {
72    received_seq_first_ = header.sequenceNumber;
73    receive_counters_.first_packet_time_ms = clock_->TimeInMilliseconds();
74  }
75
76  // Count only the new packets received. That is, if packets 1, 2, 3, 5, 4, 6
77  // are received, 4 will be ignored.
78  if (in_order) {
79    // Current time in samples.
80    NtpTime receive_time(*clock_);
81
82    // Wrong if we use RetransmitOfOldPacket.
83    if (receive_counters_.transmitted.packets > 1 &&
84        received_seq_max_ > header.sequenceNumber) {
85      // Wrap around detected.
86      received_seq_wraps_++;
87    }
88    // New max.
89    received_seq_max_ = header.sequenceNumber;
90
91    // If new time stamp and more than one in-order packet received, calculate
92    // new jitter statistics.
93    if (header.timestamp != last_received_timestamp_ &&
94        (receive_counters_.transmitted.packets -
95         receive_counters_.retransmitted.packets) > 1) {
96      UpdateJitter(header, receive_time);
97    }
98    last_received_timestamp_ = header.timestamp;
99    last_receive_time_ntp_ = receive_time;
100    last_receive_time_ms_ = clock_->TimeInMilliseconds();
101  }
102
103  size_t packet_oh = header.headerLength + header.paddingLength;
104
105  // Our measured overhead. Filter from RFC 5104 4.2.1.2:
106  // avg_OH (new) = 15/16*avg_OH (old) + 1/16*pckt_OH,
107  received_packet_overhead_ = (15 * received_packet_overhead_ + packet_oh) >> 4;
108}
109
110void StreamStatisticianImpl::UpdateJitter(const RTPHeader& header,
111                                          NtpTime receive_time) {
112  uint32_t receive_time_rtp =
113      NtpToRtp(receive_time, header.payload_type_frequency);
114  uint32_t last_receive_time_rtp =
115      NtpToRtp(last_receive_time_ntp_, header.payload_type_frequency);
116  int32_t time_diff_samples = (receive_time_rtp - last_receive_time_rtp) -
117      (header.timestamp - last_received_timestamp_);
118
119  time_diff_samples = abs(time_diff_samples);
120
121  // lib_jingle sometimes deliver crazy jumps in TS for the same stream.
122  // If this happens, don't update jitter value. Use 5 secs video frequency
123  // as the threshold.
124  if (time_diff_samples < 450000) {
125    // Note we calculate in Q4 to avoid using float.
126    int32_t jitter_diff_q4 = (time_diff_samples << 4) - jitter_q4_;
127    jitter_q4_ += ((jitter_diff_q4 + 8) >> 4);
128  }
129
130  // Extended jitter report, RFC 5450.
131  // Actual network jitter, excluding the source-introduced jitter.
132  int32_t time_diff_samples_ext =
133    (receive_time_rtp - last_receive_time_rtp) -
134    ((header.timestamp +
135      header.extension.transmissionTimeOffset) -
136     (last_received_timestamp_ +
137      last_received_transmission_time_offset_));
138
139  time_diff_samples_ext = abs(time_diff_samples_ext);
140
141  if (time_diff_samples_ext < 450000) {
142    int32_t jitter_diffQ4TransmissionTimeOffset =
143      (time_diff_samples_ext << 4) - jitter_q4_transmission_time_offset_;
144    jitter_q4_transmission_time_offset_ +=
145      ((jitter_diffQ4TransmissionTimeOffset + 8) >> 4);
146  }
147}
148
149void StreamStatisticianImpl::NotifyRtpCallback() {
150  StreamDataCounters data;
151  uint32_t ssrc;
152  {
153    CriticalSectionScoped cs(stream_lock_.get());
154    data = receive_counters_;
155    ssrc = ssrc_;
156  }
157  rtp_callback_->DataCountersUpdated(data, ssrc);
158}
159
160void StreamStatisticianImpl::NotifyRtcpCallback() {
161  RtcpStatistics data;
162  uint32_t ssrc;
163  {
164    CriticalSectionScoped cs(stream_lock_.get());
165    data = last_reported_statistics_;
166    ssrc = ssrc_;
167  }
168  rtcp_callback_->StatisticsUpdated(data, ssrc);
169}
170
171void StreamStatisticianImpl::FecPacketReceived(const RTPHeader& header,
172                                               size_t packet_length) {
173  {
174    CriticalSectionScoped cs(stream_lock_.get());
175    receive_counters_.fec.AddPacket(packet_length, header);
176  }
177  NotifyRtpCallback();
178}
179
180void StreamStatisticianImpl::SetMaxReorderingThreshold(
181    int max_reordering_threshold) {
182  CriticalSectionScoped cs(stream_lock_.get());
183  max_reordering_threshold_ = max_reordering_threshold;
184}
185
186bool StreamStatisticianImpl::GetStatistics(RtcpStatistics* statistics,
187                                           bool reset) {
188  {
189    CriticalSectionScoped cs(stream_lock_.get());
190    if (received_seq_first_ == 0 &&
191        receive_counters_.transmitted.payload_bytes == 0) {
192      // We have not received anything.
193      return false;
194    }
195
196    if (!reset) {
197      if (last_report_inorder_packets_ == 0) {
198        // No report.
199        return false;
200      }
201      // Just get last report.
202      *statistics = last_reported_statistics_;
203      return true;
204    }
205
206    *statistics = CalculateRtcpStatistics();
207  }
208
209  NotifyRtcpCallback();
210
211  return true;
212}
213
214RtcpStatistics StreamStatisticianImpl::CalculateRtcpStatistics() {
215  RtcpStatistics stats;
216
217  if (last_report_inorder_packets_ == 0) {
218    // First time we send a report.
219    last_report_seq_max_ = received_seq_first_ - 1;
220  }
221
222  // Calculate fraction lost.
223  uint16_t exp_since_last = (received_seq_max_ - last_report_seq_max_);
224
225  if (last_report_seq_max_ > received_seq_max_) {
226    // Can we assume that the seq_num can't go decrease over a full RTCP period?
227    exp_since_last = 0;
228  }
229
230  // Number of received RTP packets since last report, counts all packets but
231  // not re-transmissions.
232  uint32_t rec_since_last =
233      (receive_counters_.transmitted.packets -
234       receive_counters_.retransmitted.packets) - last_report_inorder_packets_;
235
236  // With NACK we don't know the expected retransmissions during the last
237  // second. We know how many "old" packets we have received. We just count
238  // the number of old received to estimate the loss, but it still does not
239  // guarantee an exact number since we run this based on time triggered by
240  // sending of an RTP packet. This should have a minimum effect.
241
242  // With NACK we don't count old packets as received since they are
243  // re-transmitted. We use RTT to decide if a packet is re-ordered or
244  // re-transmitted.
245  uint32_t retransmitted_packets =
246      receive_counters_.retransmitted.packets - last_report_old_packets_;
247  rec_since_last += retransmitted_packets;
248
249  int32_t missing = 0;
250  if (exp_since_last > rec_since_last) {
251    missing = (exp_since_last - rec_since_last);
252  }
253  uint8_t local_fraction_lost = 0;
254  if (exp_since_last) {
255    // Scale 0 to 255, where 255 is 100% loss.
256    local_fraction_lost =
257        static_cast<uint8_t>(255 * missing / exp_since_last);
258  }
259  stats.fraction_lost = local_fraction_lost;
260
261  // We need a counter for cumulative loss too.
262  // TODO(danilchap): Ensure cumulative loss is below maximum value of 2^24.
263  cumulative_loss_ += missing;
264  stats.cumulative_lost = cumulative_loss_;
265  stats.extended_max_sequence_number =
266      (received_seq_wraps_ << 16) + received_seq_max_;
267  // Note: internal jitter value is in Q4 and needs to be scaled by 1/16.
268  stats.jitter = jitter_q4_ >> 4;
269
270  // Store this report.
271  last_reported_statistics_ = stats;
272
273  // Only for report blocks in RTCP SR and RR.
274  last_report_inorder_packets_ =
275      receive_counters_.transmitted.packets -
276      receive_counters_.retransmitted.packets;
277  last_report_old_packets_ = receive_counters_.retransmitted.packets;
278  last_report_seq_max_ = received_seq_max_;
279
280  return stats;
281}
282
283void StreamStatisticianImpl::GetDataCounters(
284    size_t* bytes_received, uint32_t* packets_received) const {
285  CriticalSectionScoped cs(stream_lock_.get());
286  if (bytes_received) {
287    *bytes_received = receive_counters_.transmitted.payload_bytes +
288                      receive_counters_.transmitted.header_bytes +
289                      receive_counters_.transmitted.padding_bytes;
290  }
291  if (packets_received) {
292    *packets_received = receive_counters_.transmitted.packets;
293  }
294}
295
296void StreamStatisticianImpl::GetReceiveStreamDataCounters(
297    StreamDataCounters* data_counters) const {
298  CriticalSectionScoped cs(stream_lock_.get());
299  *data_counters = receive_counters_;
300}
301
302uint32_t StreamStatisticianImpl::BitrateReceived() const {
303  CriticalSectionScoped cs(stream_lock_.get());
304  return incoming_bitrate_.BitrateNow();
305}
306
307void StreamStatisticianImpl::ProcessBitrate() {
308  CriticalSectionScoped cs(stream_lock_.get());
309  incoming_bitrate_.Process();
310}
311
312void StreamStatisticianImpl::LastReceiveTimeNtp(uint32_t* secs,
313                                                uint32_t* frac) const {
314  CriticalSectionScoped cs(stream_lock_.get());
315  *secs = last_receive_time_ntp_.seconds();
316  *frac = last_receive_time_ntp_.fractions();
317}
318
319bool StreamStatisticianImpl::IsRetransmitOfOldPacket(
320    const RTPHeader& header, int64_t min_rtt) const {
321  CriticalSectionScoped cs(stream_lock_.get());
322  if (InOrderPacketInternal(header.sequenceNumber)) {
323    return false;
324  }
325  uint32_t frequency_khz = header.payload_type_frequency / 1000;
326  assert(frequency_khz > 0);
327
328  int64_t time_diff_ms = clock_->TimeInMilliseconds() -
329      last_receive_time_ms_;
330
331  // Diff in time stamp since last received in order.
332  uint32_t timestamp_diff = header.timestamp - last_received_timestamp_;
333  uint32_t rtp_time_stamp_diff_ms = timestamp_diff / frequency_khz;
334
335  int64_t max_delay_ms = 0;
336  if (min_rtt == 0) {
337    // Jitter standard deviation in samples.
338    float jitter_std = sqrt(static_cast<float>(jitter_q4_ >> 4));
339
340    // 2 times the standard deviation => 95% confidence.
341    // And transform to milliseconds by dividing by the frequency in kHz.
342    max_delay_ms = static_cast<int64_t>((2 * jitter_std) / frequency_khz);
343
344    // Min max_delay_ms is 1.
345    if (max_delay_ms == 0) {
346      max_delay_ms = 1;
347    }
348  } else {
349    max_delay_ms = (min_rtt / 3) + 1;
350  }
351  return time_diff_ms > rtp_time_stamp_diff_ms + max_delay_ms;
352}
353
354bool StreamStatisticianImpl::IsPacketInOrder(uint16_t sequence_number) const {
355  CriticalSectionScoped cs(stream_lock_.get());
356  return InOrderPacketInternal(sequence_number);
357}
358
359bool StreamStatisticianImpl::InOrderPacketInternal(
360    uint16_t sequence_number) const {
361  // First packet is always in order.
362  if (last_receive_time_ms_ == 0)
363    return true;
364
365  if (IsNewerSequenceNumber(sequence_number, received_seq_max_)) {
366    return true;
367  } else {
368    // If we have a restart of the remote side this packet is still in order.
369    return !IsNewerSequenceNumber(sequence_number, received_seq_max_ -
370                                  max_reordering_threshold_);
371  }
372}
373
374ReceiveStatistics* ReceiveStatistics::Create(Clock* clock) {
375  return new ReceiveStatisticsImpl(clock);
376}
377
378ReceiveStatisticsImpl::ReceiveStatisticsImpl(Clock* clock)
379    : clock_(clock),
380      receive_statistics_lock_(CriticalSectionWrapper::CreateCriticalSection()),
381      last_rate_update_ms_(0),
382      rtcp_stats_callback_(NULL),
383      rtp_stats_callback_(NULL) {}
384
385ReceiveStatisticsImpl::~ReceiveStatisticsImpl() {
386  while (!statisticians_.empty()) {
387    delete statisticians_.begin()->second;
388    statisticians_.erase(statisticians_.begin());
389  }
390}
391
392void ReceiveStatisticsImpl::IncomingPacket(const RTPHeader& header,
393                                           size_t packet_length,
394                                           bool retransmitted) {
395  StreamStatisticianImpl* impl;
396  {
397    CriticalSectionScoped cs(receive_statistics_lock_.get());
398    StatisticianImplMap::iterator it = statisticians_.find(header.ssrc);
399    if (it != statisticians_.end()) {
400      impl = it->second;
401    } else {
402      impl = new StreamStatisticianImpl(clock_, this, this);
403      statisticians_[header.ssrc] = impl;
404    }
405  }
406  // StreamStatisticianImpl instance is created once and only destroyed when
407  // this whole ReceiveStatisticsImpl is destroyed. StreamStatisticianImpl has
408  // it's own locking so don't hold receive_statistics_lock_ (potential
409  // deadlock).
410  impl->IncomingPacket(header, packet_length, retransmitted);
411}
412
413void ReceiveStatisticsImpl::FecPacketReceived(const RTPHeader& header,
414                                              size_t packet_length) {
415  CriticalSectionScoped cs(receive_statistics_lock_.get());
416  StatisticianImplMap::iterator it = statisticians_.find(header.ssrc);
417  // Ignore FEC if it is the first packet.
418  if (it != statisticians_.end()) {
419    it->second->FecPacketReceived(header, packet_length);
420  }
421}
422
423StatisticianMap ReceiveStatisticsImpl::GetActiveStatisticians() const {
424  CriticalSectionScoped cs(receive_statistics_lock_.get());
425  StatisticianMap active_statisticians;
426  for (StatisticianImplMap::const_iterator it = statisticians_.begin();
427       it != statisticians_.end(); ++it) {
428    uint32_t secs;
429    uint32_t frac;
430    it->second->LastReceiveTimeNtp(&secs, &frac);
431    if (clock_->CurrentNtpInMilliseconds() -
432        Clock::NtpToMs(secs, frac) < kStatisticsTimeoutMs) {
433      active_statisticians[it->first] = it->second;
434    }
435  }
436  return active_statisticians;
437}
438
439StreamStatistician* ReceiveStatisticsImpl::GetStatistician(
440    uint32_t ssrc) const {
441  CriticalSectionScoped cs(receive_statistics_lock_.get());
442  StatisticianImplMap::const_iterator it = statisticians_.find(ssrc);
443  if (it == statisticians_.end())
444    return NULL;
445  return it->second;
446}
447
448void ReceiveStatisticsImpl::SetMaxReorderingThreshold(
449    int max_reordering_threshold) {
450  CriticalSectionScoped cs(receive_statistics_lock_.get());
451  for (StatisticianImplMap::iterator it = statisticians_.begin();
452       it != statisticians_.end(); ++it) {
453    it->second->SetMaxReorderingThreshold(max_reordering_threshold);
454  }
455}
456
457int32_t ReceiveStatisticsImpl::Process() {
458  CriticalSectionScoped cs(receive_statistics_lock_.get());
459  for (StatisticianImplMap::iterator it = statisticians_.begin();
460       it != statisticians_.end(); ++it) {
461    it->second->ProcessBitrate();
462  }
463  last_rate_update_ms_ = clock_->TimeInMilliseconds();
464  return 0;
465}
466
467int64_t ReceiveStatisticsImpl::TimeUntilNextProcess() {
468  CriticalSectionScoped cs(receive_statistics_lock_.get());
469  int64_t time_since_last_update = clock_->TimeInMilliseconds() -
470      last_rate_update_ms_;
471  return std::max<int64_t>(
472      kStatisticsProcessIntervalMs - time_since_last_update, 0);
473}
474
475void ReceiveStatisticsImpl::RegisterRtcpStatisticsCallback(
476    RtcpStatisticsCallback* callback) {
477  CriticalSectionScoped cs(receive_statistics_lock_.get());
478  if (callback != NULL)
479    assert(rtcp_stats_callback_ == NULL);
480  rtcp_stats_callback_ = callback;
481}
482
483void ReceiveStatisticsImpl::StatisticsUpdated(const RtcpStatistics& statistics,
484                                              uint32_t ssrc) {
485  CriticalSectionScoped cs(receive_statistics_lock_.get());
486  if (rtcp_stats_callback_)
487    rtcp_stats_callback_->StatisticsUpdated(statistics, ssrc);
488}
489
490void ReceiveStatisticsImpl::CNameChanged(const char* cname, uint32_t ssrc) {
491  CriticalSectionScoped cs(receive_statistics_lock_.get());
492  if (rtcp_stats_callback_)
493    rtcp_stats_callback_->CNameChanged(cname, ssrc);
494}
495
496void ReceiveStatisticsImpl::RegisterRtpStatisticsCallback(
497    StreamDataCountersCallback* callback) {
498  CriticalSectionScoped cs(receive_statistics_lock_.get());
499  if (callback != NULL)
500    assert(rtp_stats_callback_ == NULL);
501  rtp_stats_callback_ = callback;
502}
503
504void ReceiveStatisticsImpl::DataCountersUpdated(const StreamDataCounters& stats,
505                                                uint32_t ssrc) {
506  CriticalSectionScoped cs(receive_statistics_lock_.get());
507  if (rtp_stats_callback_) {
508    rtp_stats_callback_->DataCountersUpdated(stats, ssrc);
509  }
510}
511
512void NullReceiveStatistics::IncomingPacket(const RTPHeader& rtp_header,
513                                           size_t packet_length,
514                                           bool retransmitted) {}
515
516void NullReceiveStatistics::FecPacketReceived(const RTPHeader& header,
517                                              size_t packet_length) {}
518
519StatisticianMap NullReceiveStatistics::GetActiveStatisticians() const {
520  return StatisticianMap();
521}
522
523StreamStatistician* NullReceiveStatistics::GetStatistician(
524    uint32_t ssrc) const {
525  return NULL;
526}
527
528void NullReceiveStatistics::SetMaxReorderingThreshold(
529    int max_reordering_threshold) {}
530
531int64_t NullReceiveStatistics::TimeUntilNextProcess() { return 0; }
532
533int32_t NullReceiveStatistics::Process() { return 0; }
534
535void NullReceiveStatistics::RegisterRtcpStatisticsCallback(
536    RtcpStatisticsCallback* callback) {}
537
538void NullReceiveStatistics::RegisterRtpStatisticsCallback(
539    StreamDataCountersCallback* callback) {}
540
541}  // namespace webrtc
542