1/*
2 *  Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3 *
4 *  Use of this source code is governed by a BSD-style license
5 *  that can be found in the LICENSE file in the root of the source
6 *  tree. An additional intellectual property rights grant can be found
7 *  in the file PATENTS.  All contributing project authors may
8 *  be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "webrtc/modules/pacing/include/paced_sender.h"
12
13#include <assert.h>
14
15#include <map>
16#include <set>
17
18#include "webrtc/modules/interface/module_common_types.h"
19#include "webrtc/system_wrappers/interface/clock.h"
20#include "webrtc/system_wrappers/interface/critical_section_wrapper.h"
21#include "webrtc/system_wrappers/interface/trace_event.h"
22
23namespace {
24// Time limit in milliseconds between packet bursts.
25const int kMinPacketLimitMs = 5;
26
27// Upper cap on process interval, in case process has not been called in a long
28// time.
29const int kMaxIntervalTimeMs = 30;
30
31// Max time that the first packet in the queue can sit in the queue if no
32// packets are sent, regardless of buffer state. In practice only in effect at
33// low bitrates (less than 320 kbits/s).
34const int kMaxQueueTimeWithoutSendingUs = 30000;
35
36}  // namespace
37
38namespace webrtc {
39namespace paced_sender {
40struct Packet {
41  Packet(uint32_t ssrc,
42         uint16_t seq_number,
43         int64_t capture_time_ms,
44         int64_t enqueue_time_ms,
45         int length_in_bytes,
46         bool retransmission)
47      : ssrc(ssrc),
48        sequence_number(seq_number),
49        capture_time_ms(capture_time_ms),
50        enqueue_time_ms(enqueue_time_ms),
51        bytes(length_in_bytes),
52        retransmission(retransmission) {}
53  uint32_t ssrc;
54  uint16_t sequence_number;
55  int64_t capture_time_ms;
56  int64_t enqueue_time_ms;
57  int bytes;
58  bool retransmission;
59};
60
61// STL list style class which prevents duplicates in the list.
62class PacketList {
63 public:
64  PacketList() {};
65
66  bool empty() const {
67    return packet_list_.empty();
68  }
69
70  Packet front() const {
71    return packet_list_.front();
72  }
73
74  void pop_front() {
75    Packet& packet = packet_list_.front();
76    uint16_t sequence_number = packet.sequence_number;
77    uint32_t ssrc = packet.ssrc;
78    packet_list_.pop_front();
79    sequence_number_set_[ssrc].erase(sequence_number);
80  }
81
82  void push_back(const Packet& packet) {
83    if (sequence_number_set_[packet.ssrc].find(packet.sequence_number) ==
84        sequence_number_set_[packet.ssrc].end()) {
85      // Don't insert duplicates.
86      packet_list_.push_back(packet);
87      sequence_number_set_[packet.ssrc].insert(packet.sequence_number);
88    }
89  }
90
91 private:
92  std::list<Packet> packet_list_;
93  std::map<uint32_t, std::set<uint16_t> > sequence_number_set_;
94};
95
96class IntervalBudget {
97 public:
98  explicit IntervalBudget(int initial_target_rate_kbps)
99      : target_rate_kbps_(initial_target_rate_kbps),
100        bytes_remaining_(0) {}
101
102  void set_target_rate_kbps(int target_rate_kbps) {
103    target_rate_kbps_ = target_rate_kbps;
104  }
105
106  void IncreaseBudget(int delta_time_ms) {
107    int bytes = target_rate_kbps_ * delta_time_ms / 8;
108    if (bytes_remaining_ < 0) {
109      // We overused last interval, compensate this interval.
110      bytes_remaining_ = bytes_remaining_ + bytes;
111    } else {
112      // If we underused last interval we can't use it this interval.
113      bytes_remaining_ = bytes;
114    }
115  }
116
117  void UseBudget(int bytes) {
118    bytes_remaining_ = std::max(bytes_remaining_ - bytes,
119                                -500 * target_rate_kbps_ / 8);
120  }
121
122  int bytes_remaining() const { return bytes_remaining_; }
123
124 private:
125  int target_rate_kbps_;
126  int bytes_remaining_;
127};
128}  // namespace paced_sender
129
130const float PacedSender::kDefaultPaceMultiplier = 2.5f;
131
132PacedSender::PacedSender(Clock* clock,
133                         Callback* callback,
134                         int max_bitrate_kbps,
135                         int min_bitrate_kbps)
136    : clock_(clock),
137      callback_(callback),
138      critsect_(CriticalSectionWrapper::CreateCriticalSection()),
139      enabled_(true),
140      paused_(false),
141      max_queue_length_ms_(kDefaultMaxQueueLengthMs),
142      media_budget_(new paced_sender::IntervalBudget(max_bitrate_kbps)),
143      padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)),
144      time_last_update_us_(clock->TimeInMicroseconds()),
145      capture_time_ms_last_queued_(0),
146      capture_time_ms_last_sent_(0),
147      high_priority_packets_(new paced_sender::PacketList),
148      normal_priority_packets_(new paced_sender::PacketList),
149      low_priority_packets_(new paced_sender::PacketList) {
150  UpdateBytesPerInterval(kMinPacketLimitMs);
151}
152
153PacedSender::~PacedSender() {}
154
155void PacedSender::Pause() {
156  CriticalSectionScoped cs(critsect_.get());
157  paused_ = true;
158}
159
160void PacedSender::Resume() {
161  CriticalSectionScoped cs(critsect_.get());
162  paused_ = false;
163}
164
165void PacedSender::SetStatus(bool enable) {
166  CriticalSectionScoped cs(critsect_.get());
167  enabled_ = enable;
168}
169
170bool PacedSender::Enabled() const {
171  CriticalSectionScoped cs(critsect_.get());
172  return enabled_;
173}
174
175void PacedSender::UpdateBitrate(int max_bitrate_kbps,
176                                int min_bitrate_kbps) {
177  CriticalSectionScoped cs(critsect_.get());
178  media_budget_->set_target_rate_kbps(max_bitrate_kbps);
179  padding_budget_->set_target_rate_kbps(min_bitrate_kbps);
180}
181
182bool PacedSender::SendPacket(Priority priority, uint32_t ssrc,
183    uint16_t sequence_number, int64_t capture_time_ms, int bytes,
184    bool retransmission) {
185  CriticalSectionScoped cs(critsect_.get());
186
187  if (!enabled_) {
188    return true;  // We can send now.
189  }
190  if (capture_time_ms < 0) {
191    capture_time_ms = clock_->TimeInMilliseconds();
192  }
193  if (priority != kHighPriority &&
194      capture_time_ms > capture_time_ms_last_queued_) {
195    capture_time_ms_last_queued_ = capture_time_ms;
196    TRACE_EVENT_ASYNC_BEGIN1("webrtc_rtp", "PacedSend", capture_time_ms,
197                             "capture_time_ms", capture_time_ms);
198  }
199  paced_sender::PacketList* packet_list = NULL;
200  switch (priority) {
201    case kHighPriority:
202      packet_list = high_priority_packets_.get();
203      break;
204    case kNormalPriority:
205      packet_list = normal_priority_packets_.get();
206      break;
207    case kLowPriority:
208      packet_list = low_priority_packets_.get();
209      break;
210  }
211  packet_list->push_back(paced_sender::Packet(ssrc,
212                                              sequence_number,
213                                              capture_time_ms,
214                                              clock_->TimeInMilliseconds(),
215                                              bytes,
216                                              retransmission));
217  return false;
218}
219
220void PacedSender::set_max_queue_length_ms(int max_queue_length_ms) {
221  CriticalSectionScoped cs(critsect_.get());
222  max_queue_length_ms_ = max_queue_length_ms;
223}
224
225int PacedSender::QueueInMs() const {
226  CriticalSectionScoped cs(critsect_.get());
227  int64_t now_ms = clock_->TimeInMilliseconds();
228  int64_t oldest_packet_enqueue_time = now_ms;
229  if (!high_priority_packets_->empty()) {
230    oldest_packet_enqueue_time =
231        std::min(oldest_packet_enqueue_time,
232                 high_priority_packets_->front().enqueue_time_ms);
233  }
234  if (!normal_priority_packets_->empty()) {
235    oldest_packet_enqueue_time =
236        std::min(oldest_packet_enqueue_time,
237                 normal_priority_packets_->front().enqueue_time_ms);
238  }
239  if (!low_priority_packets_->empty()) {
240    oldest_packet_enqueue_time =
241        std::min(oldest_packet_enqueue_time,
242                 low_priority_packets_->front().enqueue_time_ms);
243  }
244  return now_ms - oldest_packet_enqueue_time;
245}
246
247int32_t PacedSender::TimeUntilNextProcess() {
248  CriticalSectionScoped cs(critsect_.get());
249  int64_t elapsed_time_ms = (clock_->TimeInMicroseconds() -
250      time_last_update_us_ + 500) / 1000;
251  if (elapsed_time_ms <= 0) {
252    return kMinPacketLimitMs;
253  }
254  if (elapsed_time_ms >= kMinPacketLimitMs) {
255    return 0;
256  }
257  return kMinPacketLimitMs - elapsed_time_ms;
258}
259
260int32_t PacedSender::Process() {
261  int64_t now_us = clock_->TimeInMicroseconds();
262  CriticalSectionScoped cs(critsect_.get());
263  int elapsed_time_ms = (now_us - time_last_update_us_ + 500) / 1000;
264  time_last_update_us_ = now_us;
265  if (!enabled_) {
266    return 0;
267  }
268  if (!paused_) {
269    if (elapsed_time_ms > 0) {
270      uint32_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
271      UpdateBytesPerInterval(delta_time_ms);
272    }
273    paced_sender::PacketList* packet_list;
274    while (ShouldSendNextPacket(&packet_list)) {
275      if (!SendPacketFromList(packet_list))
276        return 0;
277    }
278    if (high_priority_packets_->empty() &&
279        normal_priority_packets_->empty() &&
280        low_priority_packets_->empty() &&
281        padding_budget_->bytes_remaining() > 0) {
282      int padding_needed = padding_budget_->bytes_remaining();
283      critsect_->Leave();
284      int bytes_sent = callback_->TimeToSendPadding(padding_needed);
285      critsect_->Enter();
286      media_budget_->UseBudget(bytes_sent);
287      padding_budget_->UseBudget(bytes_sent);
288    }
289  }
290  return 0;
291}
292
293bool PacedSender::SendPacketFromList(paced_sender::PacketList* packet_list)
294    EXCLUSIVE_LOCKS_REQUIRED(critsect_.get()) {
295  paced_sender::Packet packet = GetNextPacketFromList(packet_list);
296  critsect_->Leave();
297
298  const bool success = callback_->TimeToSendPacket(packet.ssrc,
299                                                   packet.sequence_number,
300                                                   packet.capture_time_ms,
301                                                   packet.retransmission);
302  critsect_->Enter();
303  // If packet cannot be sent then keep it in packet list and exit early.
304  // There's no need to send more packets.
305  if (!success) {
306    return false;
307  }
308  packet_list->pop_front();
309  const bool last_packet =
310      packet_list->empty() ||
311      packet_list->front().capture_time_ms > packet.capture_time_ms;
312  if (packet_list != high_priority_packets_.get()) {
313    if (packet.capture_time_ms > capture_time_ms_last_sent_) {
314      capture_time_ms_last_sent_ = packet.capture_time_ms;
315    } else if (packet.capture_time_ms == capture_time_ms_last_sent_ &&
316               last_packet) {
317      TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend", packet.capture_time_ms);
318    }
319  }
320  return true;
321}
322
323void PacedSender::UpdateBytesPerInterval(uint32_t delta_time_ms) {
324  media_budget_->IncreaseBudget(delta_time_ms);
325  padding_budget_->IncreaseBudget(delta_time_ms);
326}
327
328bool PacedSender::ShouldSendNextPacket(paced_sender::PacketList** packet_list) {
329  *packet_list = NULL;
330  if (media_budget_->bytes_remaining() <= 0) {
331    // All bytes consumed for this interval.
332    // Check if we have not sent in a too long time.
333    if (clock_->TimeInMicroseconds() - time_last_send_us_ >
334        kMaxQueueTimeWithoutSendingUs) {
335      if (!high_priority_packets_->empty()) {
336        *packet_list = high_priority_packets_.get();
337        return true;
338      }
339      if (!normal_priority_packets_->empty()) {
340        *packet_list = normal_priority_packets_.get();
341        return true;
342      }
343    }
344    // Send any old packets to avoid queuing for too long.
345    if (max_queue_length_ms_ >= 0 && QueueInMs() > max_queue_length_ms_) {
346      int64_t high_priority_capture_time = -1;
347      if (!high_priority_packets_->empty()) {
348        high_priority_capture_time =
349            high_priority_packets_->front().capture_time_ms;
350        *packet_list = high_priority_packets_.get();
351      }
352      if (!normal_priority_packets_->empty() &&
353          (high_priority_capture_time == -1 ||
354           high_priority_capture_time >
355               normal_priority_packets_->front().capture_time_ms)) {
356        *packet_list = normal_priority_packets_.get();
357      }
358      if (*packet_list)
359        return true;
360    }
361    return false;
362  }
363  if (!high_priority_packets_->empty()) {
364    *packet_list = high_priority_packets_.get();
365    return true;
366  }
367  if (!normal_priority_packets_->empty()) {
368    *packet_list = normal_priority_packets_.get();
369    return true;
370  }
371  if (!low_priority_packets_->empty()) {
372    *packet_list = low_priority_packets_.get();
373    return true;
374  }
375  return false;
376}
377
378paced_sender::Packet PacedSender::GetNextPacketFromList(
379    paced_sender::PacketList* packets) {
380  paced_sender::Packet packet = packets->front();
381  UpdateMediaBytesSent(packet.bytes);
382  return packet;
383}
384
385void PacedSender::UpdateMediaBytesSent(int num_bytes) {
386  time_last_send_us_ = clock_->TimeInMicroseconds();
387  media_budget_->UseBudget(num_bytes);
388  padding_budget_->UseBudget(num_bytes);
389}
390
391}  // namespace webrtc
392