1/*
2 *  Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3 *
4 *  Use of this source code is governed by a BSD-style license
5 *  that can be found in the LICENSE file in the root of the source
6 *  tree. An additional intellectual property rights grant can be found
7 *  in the file PATENTS.  All contributing project authors may
8 *  be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "webrtc/modules/pacing/paced_sender.h"
12
13#include <map>
14#include <queue>
15#include <set>
16
17#include "webrtc/base/checks.h"
18#include "webrtc/base/logging.h"
19#include "webrtc/modules/include/module_common_types.h"
20#include "webrtc/modules/pacing/bitrate_prober.h"
21#include "webrtc/system_wrappers/include/clock.h"
22#include "webrtc/system_wrappers/include/critical_section_wrapper.h"
23#include "webrtc/system_wrappers/include/field_trial.h"
24
25namespace {
26// Time limit in milliseconds between packet bursts.
27const int64_t kMinPacketLimitMs = 5;
28
29// Upper cap on process interval, in case process has not been called in a long
30// time.
31const int64_t kMaxIntervalTimeMs = 30;
32
33}  // namespace
34
35// TODO(sprang): Move at least PacketQueue and MediaBudget out to separate
36// files, so that we can more easily test them.
37
38namespace webrtc {
39namespace paced_sender {
40struct Packet {
41  Packet(RtpPacketSender::Priority priority,
42         uint32_t ssrc,
43         uint16_t seq_number,
44         int64_t capture_time_ms,
45         int64_t enqueue_time_ms,
46         size_t length_in_bytes,
47         bool retransmission,
48         uint64_t enqueue_order)
49      : priority(priority),
50        ssrc(ssrc),
51        sequence_number(seq_number),
52        capture_time_ms(capture_time_ms),
53        enqueue_time_ms(enqueue_time_ms),
54        bytes(length_in_bytes),
55        retransmission(retransmission),
56        enqueue_order(enqueue_order) {}
57
58  RtpPacketSender::Priority priority;
59  uint32_t ssrc;
60  uint16_t sequence_number;
61  int64_t capture_time_ms;
62  int64_t enqueue_time_ms;
63  size_t bytes;
64  bool retransmission;
65  uint64_t enqueue_order;
66  std::list<Packet>::iterator this_it;
67};
68
69// Used by priority queue to sort packets.
70struct Comparator {
71  bool operator()(const Packet* first, const Packet* second) {
72    // Highest prio = 0.
73    if (first->priority != second->priority)
74      return first->priority > second->priority;
75
76    // Retransmissions go first.
77    if (second->retransmission && !first->retransmission)
78      return true;
79
80    // Older frames have higher prio.
81    if (first->capture_time_ms != second->capture_time_ms)
82      return first->capture_time_ms > second->capture_time_ms;
83
84    return first->enqueue_order > second->enqueue_order;
85  }
86};
87
88// Class encapsulating a priority queue with some extensions.
89class PacketQueue {
90 public:
91  explicit PacketQueue(Clock* clock)
92      : bytes_(0),
93        clock_(clock),
94        queue_time_sum_(0),
95        time_last_updated_(clock_->TimeInMilliseconds()) {}
96  virtual ~PacketQueue() {}
97
98  void Push(const Packet& packet) {
99    if (!AddToDupeSet(packet))
100      return;
101
102    UpdateQueueTime(packet.enqueue_time_ms);
103
104    // Store packet in list, use pointers in priority queue for cheaper moves.
105    // Packets have a handle to its own iterator in the list, for easy removal
106    // when popping from queue.
107    packet_list_.push_front(packet);
108    std::list<Packet>::iterator it = packet_list_.begin();
109    it->this_it = it;          // Handle for direct removal from list.
110    prio_queue_.push(&(*it));  // Pointer into list.
111    bytes_ += packet.bytes;
112  }
113
114  const Packet& BeginPop() {
115    const Packet& packet = *prio_queue_.top();
116    prio_queue_.pop();
117    return packet;
118  }
119
120  void CancelPop(const Packet& packet) { prio_queue_.push(&(*packet.this_it)); }
121
122  void FinalizePop(const Packet& packet) {
123    RemoveFromDupeSet(packet);
124    bytes_ -= packet.bytes;
125    queue_time_sum_ -= (time_last_updated_ - packet.enqueue_time_ms);
126    packet_list_.erase(packet.this_it);
127    RTC_DCHECK_EQ(packet_list_.size(), prio_queue_.size());
128    if (packet_list_.empty())
129      RTC_DCHECK_EQ(0u, queue_time_sum_);
130  }
131
132  bool Empty() const { return prio_queue_.empty(); }
133
134  size_t SizeInPackets() const { return prio_queue_.size(); }
135
136  uint64_t SizeInBytes() const { return bytes_; }
137
138  int64_t OldestEnqueueTimeMs() const {
139    auto it = packet_list_.rbegin();
140    if (it == packet_list_.rend())
141      return 0;
142    return it->enqueue_time_ms;
143  }
144
145  void UpdateQueueTime(int64_t timestamp_ms) {
146    RTC_DCHECK_GE(timestamp_ms, time_last_updated_);
147    int64_t delta = timestamp_ms - time_last_updated_;
148    // Use packet packet_list_.size() not prio_queue_.size() here, as there
149    // might be an outstanding element popped from prio_queue_ currently in the
150    // SendPacket() call, while packet_list_ will always be correct.
151    queue_time_sum_ += delta * packet_list_.size();
152    time_last_updated_ = timestamp_ms;
153  }
154
155  int64_t AverageQueueTimeMs() const {
156    if (prio_queue_.empty())
157      return 0;
158    return queue_time_sum_ / packet_list_.size();
159  }
160
161 private:
162  // Try to add a packet to the set of ssrc/seqno identifiers currently in the
163  // queue. Return true if inserted, false if this is a duplicate.
164  bool AddToDupeSet(const Packet& packet) {
165    SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
166    if (it == dupe_map_.end()) {
167      // First for this ssrc, just insert.
168      dupe_map_[packet.ssrc].insert(packet.sequence_number);
169      return true;
170    }
171
172    // Insert returns a pair, where second is a bool set to true if new element.
173    return it->second.insert(packet.sequence_number).second;
174  }
175
176  void RemoveFromDupeSet(const Packet& packet) {
177    SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
178    RTC_DCHECK(it != dupe_map_.end());
179    it->second.erase(packet.sequence_number);
180    if (it->second.empty()) {
181      dupe_map_.erase(it);
182    }
183  }
184
185  // List of packets, in the order the were enqueued. Since dequeueing may
186  // occur out of order, use list instead of vector.
187  std::list<Packet> packet_list_;
188  // Priority queue of the packets, sorted according to Comparator.
189  // Use pointers into list, to avoid moving whole struct within heap.
190  std::priority_queue<Packet*, std::vector<Packet*>, Comparator> prio_queue_;
191  // Total number of bytes in the queue.
192  uint64_t bytes_;
193  // Map<ssrc, set<seq_no> >, for checking duplicates.
194  typedef std::map<uint32_t, std::set<uint16_t> > SsrcSeqNoMap;
195  SsrcSeqNoMap dupe_map_;
196  Clock* const clock_;
197  int64_t queue_time_sum_;
198  int64_t time_last_updated_;
199};
200
201class IntervalBudget {
202 public:
203  explicit IntervalBudget(int initial_target_rate_kbps)
204      : target_rate_kbps_(initial_target_rate_kbps),
205        bytes_remaining_(0) {}
206
207  void set_target_rate_kbps(int target_rate_kbps) {
208    target_rate_kbps_ = target_rate_kbps;
209    bytes_remaining_ =
210        std::max(-kWindowMs * target_rate_kbps_ / 8, bytes_remaining_);
211  }
212
213  void IncreaseBudget(int64_t delta_time_ms) {
214    int64_t bytes = target_rate_kbps_ * delta_time_ms / 8;
215    if (bytes_remaining_ < 0) {
216      // We overused last interval, compensate this interval.
217      bytes_remaining_ = bytes_remaining_ + bytes;
218    } else {
219      // If we underused last interval we can't use it this interval.
220      bytes_remaining_ = bytes;
221    }
222  }
223
224  void UseBudget(size_t bytes) {
225    bytes_remaining_ = std::max(bytes_remaining_ - static_cast<int>(bytes),
226                                -kWindowMs * target_rate_kbps_ / 8);
227  }
228
229  size_t bytes_remaining() const {
230    return static_cast<size_t>(std::max(0, bytes_remaining_));
231  }
232
233  int target_rate_kbps() const { return target_rate_kbps_; }
234
235 private:
236  static const int kWindowMs = 500;
237
238  int target_rate_kbps_;
239  int bytes_remaining_;
240};
241}  // namespace paced_sender
242
243const int64_t PacedSender::kMaxQueueLengthMs = 2000;
244const float PacedSender::kDefaultPaceMultiplier = 2.5f;
245
246PacedSender::PacedSender(Clock* clock,
247                         Callback* callback,
248                         int bitrate_kbps,
249                         int max_bitrate_kbps,
250                         int min_bitrate_kbps)
251    : clock_(clock),
252      callback_(callback),
253      critsect_(CriticalSectionWrapper::CreateCriticalSection()),
254      paused_(false),
255      probing_enabled_(true),
256      media_budget_(new paced_sender::IntervalBudget(max_bitrate_kbps)),
257      padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)),
258      prober_(new BitrateProber()),
259      bitrate_bps_(1000 * bitrate_kbps),
260      max_bitrate_kbps_(max_bitrate_kbps),
261      time_last_update_us_(clock->TimeInMicroseconds()),
262      packets_(new paced_sender::PacketQueue(clock)),
263      packet_counter_(0) {
264  UpdateBytesPerInterval(kMinPacketLimitMs);
265}
266
267PacedSender::~PacedSender() {}
268
269void PacedSender::Pause() {
270  CriticalSectionScoped cs(critsect_.get());
271  paused_ = true;
272}
273
274void PacedSender::Resume() {
275  CriticalSectionScoped cs(critsect_.get());
276  paused_ = false;
277}
278
279void PacedSender::SetProbingEnabled(bool enabled) {
280  RTC_CHECK_EQ(0u, packet_counter_);
281  probing_enabled_ = enabled;
282}
283
284void PacedSender::UpdateBitrate(int bitrate_kbps,
285                                int max_bitrate_kbps,
286                                int min_bitrate_kbps) {
287  CriticalSectionScoped cs(critsect_.get());
288  // Don't set media bitrate here as it may be boosted in order to meet max
289  // queue time constraint. Just update max_bitrate_kbps_ and let media_budget_
290  // be updated in Process().
291  padding_budget_->set_target_rate_kbps(min_bitrate_kbps);
292  bitrate_bps_ = 1000 * bitrate_kbps;
293  max_bitrate_kbps_ = max_bitrate_kbps;
294}
295
296void PacedSender::InsertPacket(RtpPacketSender::Priority priority,
297                               uint32_t ssrc,
298                               uint16_t sequence_number,
299                               int64_t capture_time_ms,
300                               size_t bytes,
301                               bool retransmission) {
302  CriticalSectionScoped cs(critsect_.get());
303
304  if (probing_enabled_ && !prober_->IsProbing())
305    prober_->SetEnabled(true);
306  prober_->MaybeInitializeProbe(bitrate_bps_);
307
308  int64_t now_ms = clock_->TimeInMilliseconds();
309  if (capture_time_ms < 0)
310    capture_time_ms = now_ms;
311
312  packets_->Push(paced_sender::Packet(priority, ssrc, sequence_number,
313                                      capture_time_ms, now_ms, bytes,
314                                      retransmission, packet_counter_++));
315}
316
317int64_t PacedSender::ExpectedQueueTimeMs() const {
318  CriticalSectionScoped cs(critsect_.get());
319  RTC_DCHECK_GT(max_bitrate_kbps_, 0);
320  return static_cast<int64_t>(packets_->SizeInBytes() * 8 / max_bitrate_kbps_);
321}
322
323size_t PacedSender::QueueSizePackets() const {
324  CriticalSectionScoped cs(critsect_.get());
325  return packets_->SizeInPackets();
326}
327
328int64_t PacedSender::QueueInMs() const {
329  CriticalSectionScoped cs(critsect_.get());
330
331  int64_t oldest_packet = packets_->OldestEnqueueTimeMs();
332  if (oldest_packet == 0)
333    return 0;
334
335  return clock_->TimeInMilliseconds() - oldest_packet;
336}
337
338int64_t PacedSender::AverageQueueTimeMs() {
339  CriticalSectionScoped cs(critsect_.get());
340  packets_->UpdateQueueTime(clock_->TimeInMilliseconds());
341  return packets_->AverageQueueTimeMs();
342}
343
344int64_t PacedSender::TimeUntilNextProcess() {
345  CriticalSectionScoped cs(critsect_.get());
346  if (prober_->IsProbing()) {
347    int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds());
348    if (ret >= 0)
349      return ret;
350  }
351  int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_;
352  int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000;
353  return std::max<int64_t>(kMinPacketLimitMs - elapsed_time_ms, 0);
354}
355
356int32_t PacedSender::Process() {
357  int64_t now_us = clock_->TimeInMicroseconds();
358  CriticalSectionScoped cs(critsect_.get());
359  int64_t elapsed_time_ms = (now_us - time_last_update_us_ + 500) / 1000;
360  time_last_update_us_ = now_us;
361  int target_bitrate_kbps = max_bitrate_kbps_;
362  // TODO(holmer): Remove the !paused_ check when issue 5307 has been fixed.
363  if (!paused_ && elapsed_time_ms > 0) {
364    size_t queue_size_bytes = packets_->SizeInBytes();
365    if (queue_size_bytes > 0) {
366      // Assuming equal size packets and input/output rate, the average packet
367      // has avg_time_left_ms left to get queue_size_bytes out of the queue, if
368      // time constraint shall be met. Determine bitrate needed for that.
369      packets_->UpdateQueueTime(clock_->TimeInMilliseconds());
370      int64_t avg_time_left_ms = std::max<int64_t>(
371          1, kMaxQueueLengthMs - packets_->AverageQueueTimeMs());
372      int min_bitrate_needed_kbps =
373          static_cast<int>(queue_size_bytes * 8 / avg_time_left_ms);
374      if (min_bitrate_needed_kbps > target_bitrate_kbps)
375        target_bitrate_kbps = min_bitrate_needed_kbps;
376    }
377
378    media_budget_->set_target_rate_kbps(target_bitrate_kbps);
379
380    int64_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
381    UpdateBytesPerInterval(delta_time_ms);
382  }
383  while (!packets_->Empty()) {
384    if (media_budget_->bytes_remaining() == 0 && !prober_->IsProbing())
385      return 0;
386
387    // Since we need to release the lock in order to send, we first pop the
388    // element from the priority queue but keep it in storage, so that we can
389    // reinsert it if send fails.
390    const paced_sender::Packet& packet = packets_->BeginPop();
391
392    if (SendPacket(packet)) {
393      // Send succeeded, remove it from the queue.
394      packets_->FinalizePop(packet);
395      if (prober_->IsProbing())
396        return 0;
397    } else {
398      // Send failed, put it back into the queue.
399      packets_->CancelPop(packet);
400      return 0;
401    }
402  }
403
404  // TODO(holmer): Remove the paused_ check when issue 5307 has been fixed.
405  if (paused_ || !packets_->Empty())
406    return 0;
407
408  size_t padding_needed;
409  if (prober_->IsProbing()) {
410    padding_needed = prober_->RecommendedPacketSize();
411  } else {
412    padding_needed = padding_budget_->bytes_remaining();
413  }
414
415  if (padding_needed > 0)
416    SendPadding(static_cast<size_t>(padding_needed));
417  return 0;
418}
419
420bool PacedSender::SendPacket(const paced_sender::Packet& packet) {
421  // TODO(holmer): Because of this bug issue 5307 we have to send audio
422  // packets even when the pacer is paused. Here we assume audio packets are
423  // always high priority and that they are the only high priority packets.
424  if (paused_ && packet.priority != kHighPriority)
425    return false;
426  critsect_->Leave();
427  const bool success = callback_->TimeToSendPacket(packet.ssrc,
428                                                   packet.sequence_number,
429                                                   packet.capture_time_ms,
430                                                   packet.retransmission);
431  critsect_->Enter();
432
433  // TODO(holmer): High priority packets should only be accounted for if we are
434  // allocating bandwidth for audio.
435  if (success && packet.priority != kHighPriority) {
436    // Update media bytes sent.
437    prober_->PacketSent(clock_->TimeInMilliseconds(), packet.bytes);
438    media_budget_->UseBudget(packet.bytes);
439    padding_budget_->UseBudget(packet.bytes);
440  }
441
442  return success;
443}
444
445void PacedSender::SendPadding(size_t padding_needed) {
446  critsect_->Leave();
447  size_t bytes_sent = callback_->TimeToSendPadding(padding_needed);
448  critsect_->Enter();
449
450  if (bytes_sent > 0) {
451    prober_->PacketSent(clock_->TimeInMilliseconds(), bytes_sent);
452    media_budget_->UseBudget(bytes_sent);
453    padding_budget_->UseBudget(bytes_sent);
454  }
455}
456
457void PacedSender::UpdateBytesPerInterval(int64_t delta_time_ms) {
458  media_budget_->IncreaseBudget(delta_time_ms);
459  padding_budget_->IncreaseBudget(delta_time_ms);
460}
461}  // namespace webrtc
462