1/*
2 *  Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3 *
4 *  Use of this source code is governed by a BSD-style license
5 *  that can be found in the LICENSE file in the root of the source
6 *  tree. An additional intellectual property rights grant can be found
7 *  in the file PATENTS.  All contributing project authors may
8 *  be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "webrtc/modules/pacing/include/paced_sender.h"
12
13#include <assert.h>
14
15#include "webrtc/modules/interface/module_common_types.h"
16#include "webrtc/system_wrappers/interface/critical_section_wrapper.h"
17#include "webrtc/system_wrappers/interface/trace_event.h"
18
19namespace {
20// Time limit in milliseconds between packet bursts.
21const int kMinPacketLimitMs = 5;
22
23// Upper cap on process interval, in case process has not been called in a long
24// time.
25const int kMaxIntervalTimeMs = 30;
26
27// Max time that the first packet in the queue can sit in the queue if no
28// packets are sent, regardless of buffer state. In practice only in effect at
29// low bitrates (less than 320 kbits/s).
30const int kMaxQueueTimeWithoutSendingMs = 30;
31
32}  // namespace
33
34namespace webrtc {
35
36namespace paced_sender {
37struct Packet {
38  Packet(uint32_t ssrc, uint16_t seq_number, int64_t capture_time_ms,
39         int64_t enqueue_time_ms, int length_in_bytes, bool retransmission)
40      : ssrc_(ssrc),
41        sequence_number_(seq_number),
42        capture_time_ms_(capture_time_ms),
43        enqueue_time_ms_(enqueue_time_ms),
44        bytes_(length_in_bytes),
45        retransmission_(retransmission) {
46  }
47  uint32_t ssrc_;
48  uint16_t sequence_number_;
49  int64_t capture_time_ms_;
50  int64_t enqueue_time_ms_;
51  int bytes_;
52  bool retransmission_;
53};
54
55// STL list style class which prevents duplicates in the list.
56class PacketList {
57 public:
58  PacketList() {};
59
60  bool empty() const {
61    return packet_list_.empty();
62  }
63
64  Packet front() const {
65    return packet_list_.front();
66  }
67
68  void pop_front() {
69    Packet& packet = packet_list_.front();
70    uint16_t sequence_number = packet.sequence_number_;
71    packet_list_.pop_front();
72    sequence_number_set_.erase(sequence_number);
73  }
74
75  void push_back(const Packet& packet) {
76    if (sequence_number_set_.find(packet.sequence_number_) ==
77        sequence_number_set_.end()) {
78      // Don't insert duplicates.
79      packet_list_.push_back(packet);
80      sequence_number_set_.insert(packet.sequence_number_);
81    }
82  }
83
84 private:
85  std::list<Packet> packet_list_;
86  std::set<uint16_t> sequence_number_set_;
87};
88
89class IntervalBudget {
90 public:
91  explicit IntervalBudget(int initial_target_rate_kbps)
92      : target_rate_kbps_(initial_target_rate_kbps),
93        bytes_remaining_(0) {}
94
95  void set_target_rate_kbps(int target_rate_kbps) {
96    target_rate_kbps_ = target_rate_kbps;
97  }
98
99  void IncreaseBudget(int delta_time_ms) {
100    int bytes = target_rate_kbps_ * delta_time_ms / 8;
101    if (bytes_remaining_ < 0) {
102      // We overused last interval, compensate this interval.
103      bytes_remaining_ = bytes_remaining_ + bytes;
104    } else {
105      // If we underused last interval we can't use it this interval.
106      bytes_remaining_ = bytes;
107    }
108  }
109
110  void UseBudget(int bytes) {
111    bytes_remaining_ = std::max(bytes_remaining_ - bytes,
112                                -500 * target_rate_kbps_ / 8);
113  }
114
115  int bytes_remaining() const { return bytes_remaining_; }
116
117 private:
118  int target_rate_kbps_;
119  int bytes_remaining_;
120};
121}  // namespace paced_sender
122
123PacedSender::PacedSender(Callback* callback,
124                         int max_bitrate_kbps,
125                         int min_bitrate_kbps)
126    : callback_(callback),
127      enabled_(true),
128      paused_(false),
129      max_queue_length_ms_(kDefaultMaxQueueLengthMs),
130      critsect_(CriticalSectionWrapper::CreateCriticalSection()),
131      media_budget_(new paced_sender::IntervalBudget(max_bitrate_kbps)),
132      padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)),
133      time_last_update_(TickTime::Now()),
134      capture_time_ms_last_queued_(0),
135      capture_time_ms_last_sent_(0),
136      high_priority_packets_(new paced_sender::PacketList),
137      normal_priority_packets_(new paced_sender::PacketList),
138      low_priority_packets_(new paced_sender::PacketList) {
139  UpdateBytesPerInterval(kMinPacketLimitMs);
140}
141
142PacedSender::~PacedSender() {
143}
144
145void PacedSender::Pause() {
146  CriticalSectionScoped cs(critsect_.get());
147  paused_ = true;
148}
149
150void PacedSender::Resume() {
151  CriticalSectionScoped cs(critsect_.get());
152  paused_ = false;
153}
154
155void PacedSender::SetStatus(bool enable) {
156  CriticalSectionScoped cs(critsect_.get());
157  enabled_ = enable;
158}
159
160bool PacedSender::Enabled() const {
161  CriticalSectionScoped cs(critsect_.get());
162  return enabled_;
163}
164
165void PacedSender::UpdateBitrate(int max_bitrate_kbps,
166                                int min_bitrate_kbps) {
167  CriticalSectionScoped cs(critsect_.get());
168  media_budget_->set_target_rate_kbps(max_bitrate_kbps);
169  padding_budget_->set_target_rate_kbps(min_bitrate_kbps);
170}
171
172bool PacedSender::SendPacket(Priority priority, uint32_t ssrc,
173    uint16_t sequence_number, int64_t capture_time_ms, int bytes,
174    bool retransmission) {
175  CriticalSectionScoped cs(critsect_.get());
176
177  if (!enabled_) {
178    return true;  // We can send now.
179  }
180  if (capture_time_ms < 0) {
181    capture_time_ms = TickTime::MillisecondTimestamp();
182  }
183  if (priority != kHighPriority &&
184      capture_time_ms > capture_time_ms_last_queued_) {
185    capture_time_ms_last_queued_ = capture_time_ms;
186    TRACE_EVENT_ASYNC_BEGIN1("webrtc_rtp", "PacedSend", capture_time_ms,
187                             "capture_time_ms", capture_time_ms);
188  }
189  paced_sender::PacketList* packet_list = NULL;
190  switch (priority) {
191    case kHighPriority:
192      packet_list = high_priority_packets_.get();
193      break;
194    case kNormalPriority:
195      packet_list = normal_priority_packets_.get();
196      break;
197    case kLowPriority:
198      packet_list = low_priority_packets_.get();
199      break;
200  }
201  packet_list->push_back(paced_sender::Packet(ssrc,
202                                              sequence_number,
203                                              capture_time_ms,
204                                              TickTime::MillisecondTimestamp(),
205                                              bytes,
206                                              retransmission));
207  return false;
208}
209
210void PacedSender::set_max_queue_length_ms(int max_queue_length_ms) {
211  CriticalSectionScoped cs(critsect_.get());
212  max_queue_length_ms_ = max_queue_length_ms;
213}
214
215int PacedSender::QueueInMs() const {
216  CriticalSectionScoped cs(critsect_.get());
217  int64_t now_ms = TickTime::MillisecondTimestamp();
218  int64_t oldest_packet_enqueue_time = now_ms;
219  if (!high_priority_packets_->empty()) {
220    oldest_packet_enqueue_time = std::min(
221        oldest_packet_enqueue_time,
222        high_priority_packets_->front().enqueue_time_ms_);
223  }
224  if (!normal_priority_packets_->empty()) {
225    oldest_packet_enqueue_time = std::min(
226        oldest_packet_enqueue_time,
227        normal_priority_packets_->front().enqueue_time_ms_);
228  }
229  if (!low_priority_packets_->empty()) {
230    oldest_packet_enqueue_time = std::min(
231        oldest_packet_enqueue_time,
232        low_priority_packets_->front().enqueue_time_ms_);
233  }
234  return now_ms - oldest_packet_enqueue_time;
235}
236
237int32_t PacedSender::TimeUntilNextProcess() {
238  CriticalSectionScoped cs(critsect_.get());
239  int64_t elapsed_time_ms =
240      (TickTime::Now() - time_last_update_).Milliseconds();
241  if (elapsed_time_ms <= 0) {
242    return kMinPacketLimitMs;
243  }
244  if (elapsed_time_ms >= kMinPacketLimitMs) {
245    return 0;
246  }
247  return kMinPacketLimitMs - elapsed_time_ms;
248}
249
250int32_t PacedSender::Process() {
251  TickTime now = TickTime::Now();
252  CriticalSectionScoped cs(critsect_.get());
253  int elapsed_time_ms = (now - time_last_update_).Milliseconds();
254  time_last_update_ = now;
255  if (!enabled_) {
256    return 0;
257  }
258  if (!paused_) {
259    if (elapsed_time_ms > 0) {
260      uint32_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
261      UpdateBytesPerInterval(delta_time_ms);
262    }
263    paced_sender::PacketList* packet_list;
264    while (ShouldSendNextPacket(&packet_list)) {
265      if (!SendPacketFromList(packet_list))
266        return 0;
267    }
268    if (high_priority_packets_->empty() &&
269        normal_priority_packets_->empty() &&
270        low_priority_packets_->empty() &&
271        padding_budget_->bytes_remaining() > 0) {
272      int padding_needed = padding_budget_->bytes_remaining();
273      critsect_->Leave();
274      int bytes_sent = callback_->TimeToSendPadding(padding_needed);
275      critsect_->Enter();
276      media_budget_->UseBudget(bytes_sent);
277      padding_budget_->UseBudget(bytes_sent);
278    }
279  }
280  return 0;
281}
282
283// MUST have critsect_ when calling.
284bool PacedSender::SendPacketFromList(paced_sender::PacketList* packet_list)
285    EXCLUSIVE_LOCKS_REQUIRED(critsect_.get()) {
286  paced_sender::Packet packet = GetNextPacketFromList(packet_list);
287  critsect_->Leave();
288
289  const bool success = callback_->TimeToSendPacket(packet.ssrc_,
290                                                   packet.sequence_number_,
291                                                   packet.capture_time_ms_,
292                                                   packet.retransmission_);
293  critsect_->Enter();
294  // If packet cannot be sent then keep it in packet list and exit early.
295  // There's no need to send more packets.
296  if (!success) {
297    return false;
298  }
299  packet_list->pop_front();
300  const bool last_packet = packet_list->empty() ||
301      packet_list->front().capture_time_ms_ > packet.capture_time_ms_;
302  if (packet_list != high_priority_packets_.get()) {
303    if (packet.capture_time_ms_ > capture_time_ms_last_sent_) {
304      capture_time_ms_last_sent_ = packet.capture_time_ms_;
305    } else if (packet.capture_time_ms_ == capture_time_ms_last_sent_ &&
306               last_packet) {
307      TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend",
308          packet.capture_time_ms_);
309    }
310  }
311  return true;
312}
313
314// MUST have critsect_ when calling.
315void PacedSender::UpdateBytesPerInterval(uint32_t delta_time_ms) {
316  media_budget_->IncreaseBudget(delta_time_ms);
317  padding_budget_->IncreaseBudget(delta_time_ms);
318}
319
320// MUST have critsect_ when calling.
321bool PacedSender::ShouldSendNextPacket(paced_sender::PacketList** packet_list) {
322  *packet_list = NULL;
323  if (media_budget_->bytes_remaining() <= 0) {
324    // All bytes consumed for this interval.
325    // Check if we have not sent in a too long time.
326    if ((TickTime::Now() - time_last_send_).Milliseconds() >
327        kMaxQueueTimeWithoutSendingMs) {
328      if (!high_priority_packets_->empty()) {
329        *packet_list = high_priority_packets_.get();
330        return true;
331      }
332      if (!normal_priority_packets_->empty()) {
333        *packet_list = normal_priority_packets_.get();
334        return true;
335      }
336    }
337    // Send any old packets to avoid queuing for too long.
338    if (max_queue_length_ms_ >= 0 && QueueInMs() > max_queue_length_ms_) {
339      int64_t high_priority_capture_time = -1;
340      if (!high_priority_packets_->empty()) {
341        high_priority_capture_time =
342            high_priority_packets_->front().capture_time_ms_;
343        *packet_list = high_priority_packets_.get();
344      }
345      if (!normal_priority_packets_->empty() &&
346          (high_priority_capture_time == -1 || high_priority_capture_time >
347          normal_priority_packets_->front().capture_time_ms_)) {
348        *packet_list = normal_priority_packets_.get();
349      }
350      if (*packet_list)
351        return true;
352    }
353    return false;
354  }
355  if (!high_priority_packets_->empty()) {
356    *packet_list = high_priority_packets_.get();
357    return true;
358  }
359  if (!normal_priority_packets_->empty()) {
360    *packet_list = normal_priority_packets_.get();
361    return true;
362  }
363  if (!low_priority_packets_->empty()) {
364    *packet_list = low_priority_packets_.get();
365    return true;
366  }
367  return false;
368}
369
370paced_sender::Packet PacedSender::GetNextPacketFromList(
371    paced_sender::PacketList* packets) {
372  paced_sender::Packet packet = packets->front();
373  UpdateMediaBytesSent(packet.bytes_);
374  return packet;
375}
376
377// MUST have critsect_ when calling.
378void PacedSender::UpdateMediaBytesSent(int num_bytes) {
379  time_last_send_ = TickTime::Now();
380  media_budget_->UseBudget(num_bytes);
381  padding_budget_->UseBudget(num_bytes);
382}
383
384}  // namespace webrtc
385