paced_sender.cc revision 875ad49dee49e95d212a91eb9bb1af327a80ee85
1/*
2 *  Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3 *
4 *  Use of this source code is governed by a BSD-style license
5 *  that can be found in the LICENSE file in the root of the source
6 *  tree. An additional intellectual property rights grant can be found
7 *  in the file PATENTS.  All contributing project authors may
8 *  be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "webrtc/modules/pacing/include/paced_sender.h"
12
13#include <assert.h>
14
15#include "webrtc/modules/interface/module_common_types.h"
16#include "webrtc/system_wrappers/interface/clock.h"
17#include "webrtc/system_wrappers/interface/critical_section_wrapper.h"
18#include "webrtc/system_wrappers/interface/trace_event.h"
19
20namespace {
21// Time limit in milliseconds between packet bursts.
22const int kMinPacketLimitMs = 5;
23
24// Upper cap on process interval, in case process has not been called in a long
25// time.
26const int kMaxIntervalTimeMs = 30;
27
28// Max time that the first packet in the queue can sit in the queue if no
29// packets are sent, regardless of buffer state. In practice only in effect at
30// low bitrates (less than 320 kbits/s).
31const int kMaxQueueTimeWithoutSendingMs = 30;
32
33}  // namespace
34
35namespace webrtc {
36
37namespace paced_sender {
38struct Packet {
39  Packet(uint32_t ssrc, uint16_t seq_number, int64_t capture_time_ms,
40         int64_t enqueue_time_ms, int length_in_bytes, bool retransmission)
41      : ssrc_(ssrc),
42        sequence_number_(seq_number),
43        capture_time_ms_(capture_time_ms),
44        enqueue_time_ms_(enqueue_time_ms),
45        bytes_(length_in_bytes),
46        retransmission_(retransmission) {
47  }
48  uint32_t ssrc_;
49  uint16_t sequence_number_;
50  int64_t capture_time_ms_;
51  int64_t enqueue_time_ms_;
52  int bytes_;
53  bool retransmission_;
54};
55
56// STL list style class which prevents duplicates in the list.
57class PacketList {
58 public:
59  PacketList() {};
60
61  bool empty() const {
62    return packet_list_.empty();
63  }
64
65  Packet front() const {
66    return packet_list_.front();
67  }
68
69  void pop_front() {
70    Packet& packet = packet_list_.front();
71    uint16_t sequence_number = packet.sequence_number_;
72    packet_list_.pop_front();
73    sequence_number_set_.erase(sequence_number);
74  }
75
76  void push_back(const Packet& packet) {
77    if (sequence_number_set_.find(packet.sequence_number_) ==
78        sequence_number_set_.end()) {
79      // Don't insert duplicates.
80      packet_list_.push_back(packet);
81      sequence_number_set_.insert(packet.sequence_number_);
82    }
83  }
84
85 private:
86  std::list<Packet> packet_list_;
87  std::set<uint16_t> sequence_number_set_;
88};
89
90class IntervalBudget {
91 public:
92  explicit IntervalBudget(int initial_target_rate_kbps)
93      : target_rate_kbps_(initial_target_rate_kbps),
94        bytes_remaining_(0) {}
95
96  void set_target_rate_kbps(int target_rate_kbps) {
97    target_rate_kbps_ = target_rate_kbps;
98  }
99
100  void IncreaseBudget(int delta_time_ms) {
101    int bytes = target_rate_kbps_ * delta_time_ms / 8;
102    if (bytes_remaining_ < 0) {
103      // We overused last interval, compensate this interval.
104      bytes_remaining_ = bytes_remaining_ + bytes;
105    } else {
106      // If we underused last interval we can't use it this interval.
107      bytes_remaining_ = bytes;
108    }
109  }
110
111  void UseBudget(int bytes) {
112    bytes_remaining_ = std::max(bytes_remaining_ - bytes,
113                                -500 * target_rate_kbps_ / 8);
114  }
115
116  int bytes_remaining() const { return bytes_remaining_; }
117
118 private:
119  int target_rate_kbps_;
120  int bytes_remaining_;
121};
122}  // namespace paced_sender
123
124const float PacedSender::kDefaultPaceMultiplier = 2.5f;
125
126PacedSender::PacedSender(Clock* clock,
127                         Callback* callback,
128                         int max_bitrate_kbps,
129                         int min_bitrate_kbps)
130    : clock_(clock),
131      callback_(callback),
132      enabled_(true),
133      paused_(false),
134      max_queue_length_ms_(kDefaultMaxQueueLengthMs),
135      critsect_(CriticalSectionWrapper::CreateCriticalSection()),
136      media_budget_(new paced_sender::IntervalBudget(max_bitrate_kbps)),
137      padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)),
138      time_last_update_(TickTime::Now()),
139      capture_time_ms_last_queued_(0),
140      capture_time_ms_last_sent_(0),
141      high_priority_packets_(new paced_sender::PacketList),
142      normal_priority_packets_(new paced_sender::PacketList),
143      low_priority_packets_(new paced_sender::PacketList) {
144  UpdateBytesPerInterval(kMinPacketLimitMs);
145}
146
147PacedSender::~PacedSender() {
148}
149
150void PacedSender::Pause() {
151  CriticalSectionScoped cs(critsect_.get());
152  paused_ = true;
153}
154
155void PacedSender::Resume() {
156  CriticalSectionScoped cs(critsect_.get());
157  paused_ = false;
158}
159
160void PacedSender::SetStatus(bool enable) {
161  CriticalSectionScoped cs(critsect_.get());
162  enabled_ = enable;
163}
164
165bool PacedSender::Enabled() const {
166  CriticalSectionScoped cs(critsect_.get());
167  return enabled_;
168}
169
170void PacedSender::UpdateBitrate(int max_bitrate_kbps,
171                                int min_bitrate_kbps) {
172  CriticalSectionScoped cs(critsect_.get());
173  media_budget_->set_target_rate_kbps(max_bitrate_kbps);
174  padding_budget_->set_target_rate_kbps(min_bitrate_kbps);
175}
176
177bool PacedSender::SendPacket(Priority priority, uint32_t ssrc,
178    uint16_t sequence_number, int64_t capture_time_ms, int bytes,
179    bool retransmission) {
180  CriticalSectionScoped cs(critsect_.get());
181
182  if (!enabled_) {
183    return true;  // We can send now.
184  }
185  if (capture_time_ms < 0) {
186    capture_time_ms = clock_->TimeInMilliseconds();
187  }
188  if (priority != kHighPriority &&
189      capture_time_ms > capture_time_ms_last_queued_) {
190    capture_time_ms_last_queued_ = capture_time_ms;
191    TRACE_EVENT_ASYNC_BEGIN1("webrtc_rtp", "PacedSend", capture_time_ms,
192                             "capture_time_ms", capture_time_ms);
193  }
194  paced_sender::PacketList* packet_list = NULL;
195  switch (priority) {
196    case kHighPriority:
197      packet_list = high_priority_packets_.get();
198      break;
199    case kNormalPriority:
200      packet_list = normal_priority_packets_.get();
201      break;
202    case kLowPriority:
203      packet_list = low_priority_packets_.get();
204      break;
205  }
206  packet_list->push_back(paced_sender::Packet(ssrc,
207                                              sequence_number,
208                                              capture_time_ms,
209                                              clock_->TimeInMilliseconds(),
210                                              bytes,
211                                              retransmission));
212  return false;
213}
214
215void PacedSender::set_max_queue_length_ms(int max_queue_length_ms) {
216  CriticalSectionScoped cs(critsect_.get());
217  max_queue_length_ms_ = max_queue_length_ms;
218}
219
220int PacedSender::QueueInMs() const {
221  CriticalSectionScoped cs(critsect_.get());
222  int64_t now_ms = clock_->TimeInMilliseconds();
223  int64_t oldest_packet_enqueue_time = now_ms;
224  if (!high_priority_packets_->empty()) {
225    oldest_packet_enqueue_time = std::min(
226        oldest_packet_enqueue_time,
227        high_priority_packets_->front().enqueue_time_ms_);
228  }
229  if (!normal_priority_packets_->empty()) {
230    oldest_packet_enqueue_time = std::min(
231        oldest_packet_enqueue_time,
232        normal_priority_packets_->front().enqueue_time_ms_);
233  }
234  if (!low_priority_packets_->empty()) {
235    oldest_packet_enqueue_time = std::min(
236        oldest_packet_enqueue_time,
237        low_priority_packets_->front().enqueue_time_ms_);
238  }
239  return now_ms - oldest_packet_enqueue_time;
240}
241
242int32_t PacedSender::TimeUntilNextProcess() {
243  CriticalSectionScoped cs(critsect_.get());
244  int64_t elapsed_time_ms =
245      (TickTime::Now() - time_last_update_).Milliseconds();
246  if (elapsed_time_ms <= 0) {
247    return kMinPacketLimitMs;
248  }
249  if (elapsed_time_ms >= kMinPacketLimitMs) {
250    return 0;
251  }
252  return kMinPacketLimitMs - elapsed_time_ms;
253}
254
255int32_t PacedSender::Process() {
256  TickTime now = TickTime::Now();
257  CriticalSectionScoped cs(critsect_.get());
258  int elapsed_time_ms = (now - time_last_update_).Milliseconds();
259  time_last_update_ = now;
260  if (!enabled_) {
261    return 0;
262  }
263  if (!paused_) {
264    if (elapsed_time_ms > 0) {
265      uint32_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
266      UpdateBytesPerInterval(delta_time_ms);
267    }
268    paced_sender::PacketList* packet_list;
269    while (ShouldSendNextPacket(&packet_list)) {
270      if (!SendPacketFromList(packet_list))
271        return 0;
272    }
273    if (high_priority_packets_->empty() &&
274        normal_priority_packets_->empty() &&
275        low_priority_packets_->empty() &&
276        padding_budget_->bytes_remaining() > 0) {
277      int padding_needed = padding_budget_->bytes_remaining();
278      critsect_->Leave();
279      int bytes_sent = callback_->TimeToSendPadding(padding_needed);
280      critsect_->Enter();
281      media_budget_->UseBudget(bytes_sent);
282      padding_budget_->UseBudget(bytes_sent);
283    }
284  }
285  return 0;
286}
287
288// MUST have critsect_ when calling.
289bool PacedSender::SendPacketFromList(paced_sender::PacketList* packet_list)
290    EXCLUSIVE_LOCKS_REQUIRED(critsect_.get()) {
291  paced_sender::Packet packet = GetNextPacketFromList(packet_list);
292  critsect_->Leave();
293
294  const bool success = callback_->TimeToSendPacket(packet.ssrc_,
295                                                   packet.sequence_number_,
296                                                   packet.capture_time_ms_,
297                                                   packet.retransmission_);
298  critsect_->Enter();
299  // If packet cannot be sent then keep it in packet list and exit early.
300  // There's no need to send more packets.
301  if (!success) {
302    return false;
303  }
304  packet_list->pop_front();
305  const bool last_packet = packet_list->empty() ||
306      packet_list->front().capture_time_ms_ > packet.capture_time_ms_;
307  if (packet_list != high_priority_packets_.get()) {
308    if (packet.capture_time_ms_ > capture_time_ms_last_sent_) {
309      capture_time_ms_last_sent_ = packet.capture_time_ms_;
310    } else if (packet.capture_time_ms_ == capture_time_ms_last_sent_ &&
311               last_packet) {
312      TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend",
313          packet.capture_time_ms_);
314    }
315  }
316  return true;
317}
318
319// MUST have critsect_ when calling.
320void PacedSender::UpdateBytesPerInterval(uint32_t delta_time_ms) {
321  media_budget_->IncreaseBudget(delta_time_ms);
322  padding_budget_->IncreaseBudget(delta_time_ms);
323}
324
325// MUST have critsect_ when calling.
326bool PacedSender::ShouldSendNextPacket(paced_sender::PacketList** packet_list) {
327  *packet_list = NULL;
328  if (media_budget_->bytes_remaining() <= 0) {
329    // All bytes consumed for this interval.
330    // Check if we have not sent in a too long time.
331    if ((TickTime::Now() - time_last_send_).Milliseconds() >
332        kMaxQueueTimeWithoutSendingMs) {
333      if (!high_priority_packets_->empty()) {
334        *packet_list = high_priority_packets_.get();
335        return true;
336      }
337      if (!normal_priority_packets_->empty()) {
338        *packet_list = normal_priority_packets_.get();
339        return true;
340      }
341    }
342    // Send any old packets to avoid queuing for too long.
343    if (max_queue_length_ms_ >= 0 && QueueInMs() > max_queue_length_ms_) {
344      int64_t high_priority_capture_time = -1;
345      if (!high_priority_packets_->empty()) {
346        high_priority_capture_time =
347            high_priority_packets_->front().capture_time_ms_;
348        *packet_list = high_priority_packets_.get();
349      }
350      if (!normal_priority_packets_->empty() &&
351          (high_priority_capture_time == -1 || high_priority_capture_time >
352          normal_priority_packets_->front().capture_time_ms_)) {
353        *packet_list = normal_priority_packets_.get();
354      }
355      if (*packet_list)
356        return true;
357    }
358    return false;
359  }
360  if (!high_priority_packets_->empty()) {
361    *packet_list = high_priority_packets_.get();
362    return true;
363  }
364  if (!normal_priority_packets_->empty()) {
365    *packet_list = normal_priority_packets_.get();
366    return true;
367  }
368  if (!low_priority_packets_->empty()) {
369    *packet_list = low_priority_packets_.get();
370    return true;
371  }
372  return false;
373}
374
375paced_sender::Packet PacedSender::GetNextPacketFromList(
376    paced_sender::PacketList* packets) {
377  paced_sender::Packet packet = packets->front();
378  UpdateMediaBytesSent(packet.bytes_);
379  return packet;
380}
381
382// MUST have critsect_ when calling.
383void PacedSender::UpdateMediaBytesSent(int num_bytes) {
384  time_last_send_ = TickTime::Now();
385  media_budget_->UseBudget(num_bytes);
386  padding_budget_->UseBudget(num_bytes);
387}
388
389}  // namespace webrtc
390