1/* 2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11#include "webrtc/modules/pacing/include/paced_sender.h" 12 13#include <assert.h> 14 15#include "webrtc/modules/interface/module_common_types.h" 16#include "webrtc/system_wrappers/interface/critical_section_wrapper.h" 17#include "webrtc/system_wrappers/interface/trace_event.h" 18 19namespace { 20// Time limit in milliseconds between packet bursts. 21const int kMinPacketLimitMs = 5; 22 23// Upper cap on process interval, in case process has not been called in a long 24// time. 25const int kMaxIntervalTimeMs = 30; 26 27// Max time that the first packet in the queue can sit in the queue if no 28// packets are sent, regardless of buffer state. In practice only in effect at 29// low bitrates (less than 320 kbits/s). 30const int kMaxQueueTimeWithoutSendingMs = 30; 31 32} // namespace 33 34namespace webrtc { 35 36namespace paced_sender { 37struct Packet { 38 Packet(uint32_t ssrc, uint16_t seq_number, int64_t capture_time_ms, 39 int64_t enqueue_time_ms, int length_in_bytes, bool retransmission) 40 : ssrc_(ssrc), 41 sequence_number_(seq_number), 42 capture_time_ms_(capture_time_ms), 43 enqueue_time_ms_(enqueue_time_ms), 44 bytes_(length_in_bytes), 45 retransmission_(retransmission) { 46 } 47 uint32_t ssrc_; 48 uint16_t sequence_number_; 49 int64_t capture_time_ms_; 50 int64_t enqueue_time_ms_; 51 int bytes_; 52 bool retransmission_; 53}; 54 55// STL list style class which prevents duplicates in the list. 56class PacketList { 57 public: 58 PacketList() {}; 59 60 bool empty() const { 61 return packet_list_.empty(); 62 } 63 64 Packet front() const { 65 return packet_list_.front(); 66 } 67 68 void pop_front() { 69 Packet& packet = packet_list_.front(); 70 uint16_t sequence_number = packet.sequence_number_; 71 packet_list_.pop_front(); 72 sequence_number_set_.erase(sequence_number); 73 } 74 75 void push_back(const Packet& packet) { 76 if (sequence_number_set_.find(packet.sequence_number_) == 77 sequence_number_set_.end()) { 78 // Don't insert duplicates. 79 packet_list_.push_back(packet); 80 sequence_number_set_.insert(packet.sequence_number_); 81 } 82 } 83 84 private: 85 std::list<Packet> packet_list_; 86 std::set<uint16_t> sequence_number_set_; 87}; 88 89class IntervalBudget { 90 public: 91 explicit IntervalBudget(int initial_target_rate_kbps) 92 : target_rate_kbps_(initial_target_rate_kbps), 93 bytes_remaining_(0) {} 94 95 void set_target_rate_kbps(int target_rate_kbps) { 96 target_rate_kbps_ = target_rate_kbps; 97 } 98 99 void IncreaseBudget(int delta_time_ms) { 100 int bytes = target_rate_kbps_ * delta_time_ms / 8; 101 if (bytes_remaining_ < 0) { 102 // We overused last interval, compensate this interval. 103 bytes_remaining_ = bytes_remaining_ + bytes; 104 } else { 105 // If we underused last interval we can't use it this interval. 106 bytes_remaining_ = bytes; 107 } 108 } 109 110 void UseBudget(int bytes) { 111 bytes_remaining_ = std::max(bytes_remaining_ - bytes, 112 -500 * target_rate_kbps_ / 8); 113 } 114 115 int bytes_remaining() const { return bytes_remaining_; } 116 117 private: 118 int target_rate_kbps_; 119 int bytes_remaining_; 120}; 121} // namespace paced_sender 122 123PacedSender::PacedSender(Callback* callback, 124 int max_bitrate_kbps, 125 int min_bitrate_kbps) 126 : callback_(callback), 127 enabled_(true), 128 paused_(false), 129 max_queue_length_ms_(kDefaultMaxQueueLengthMs), 130 critsect_(CriticalSectionWrapper::CreateCriticalSection()), 131 media_budget_(new paced_sender::IntervalBudget(max_bitrate_kbps)), 132 padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)), 133 time_last_update_(TickTime::Now()), 134 capture_time_ms_last_queued_(0), 135 capture_time_ms_last_sent_(0), 136 high_priority_packets_(new paced_sender::PacketList), 137 normal_priority_packets_(new paced_sender::PacketList), 138 low_priority_packets_(new paced_sender::PacketList) { 139 UpdateBytesPerInterval(kMinPacketLimitMs); 140} 141 142PacedSender::~PacedSender() { 143} 144 145void PacedSender::Pause() { 146 CriticalSectionScoped cs(critsect_.get()); 147 paused_ = true; 148} 149 150void PacedSender::Resume() { 151 CriticalSectionScoped cs(critsect_.get()); 152 paused_ = false; 153} 154 155void PacedSender::SetStatus(bool enable) { 156 CriticalSectionScoped cs(critsect_.get()); 157 enabled_ = enable; 158} 159 160bool PacedSender::Enabled() const { 161 CriticalSectionScoped cs(critsect_.get()); 162 return enabled_; 163} 164 165void PacedSender::UpdateBitrate(int max_bitrate_kbps, 166 int min_bitrate_kbps) { 167 CriticalSectionScoped cs(critsect_.get()); 168 media_budget_->set_target_rate_kbps(max_bitrate_kbps); 169 padding_budget_->set_target_rate_kbps(min_bitrate_kbps); 170} 171 172bool PacedSender::SendPacket(Priority priority, uint32_t ssrc, 173 uint16_t sequence_number, int64_t capture_time_ms, int bytes, 174 bool retransmission) { 175 CriticalSectionScoped cs(critsect_.get()); 176 177 if (!enabled_) { 178 return true; // We can send now. 179 } 180 if (capture_time_ms < 0) { 181 capture_time_ms = TickTime::MillisecondTimestamp(); 182 } 183 if (priority != kHighPriority && 184 capture_time_ms > capture_time_ms_last_queued_) { 185 capture_time_ms_last_queued_ = capture_time_ms; 186 TRACE_EVENT_ASYNC_BEGIN1("webrtc_rtp", "PacedSend", capture_time_ms, 187 "capture_time_ms", capture_time_ms); 188 } 189 paced_sender::PacketList* packet_list = NULL; 190 switch (priority) { 191 case kHighPriority: 192 packet_list = high_priority_packets_.get(); 193 break; 194 case kNormalPriority: 195 packet_list = normal_priority_packets_.get(); 196 break; 197 case kLowPriority: 198 packet_list = low_priority_packets_.get(); 199 break; 200 } 201 packet_list->push_back(paced_sender::Packet(ssrc, 202 sequence_number, 203 capture_time_ms, 204 TickTime::MillisecondTimestamp(), 205 bytes, 206 retransmission)); 207 return false; 208} 209 210void PacedSender::set_max_queue_length_ms(int max_queue_length_ms) { 211 CriticalSectionScoped cs(critsect_.get()); 212 max_queue_length_ms_ = max_queue_length_ms; 213} 214 215int PacedSender::QueueInMs() const { 216 CriticalSectionScoped cs(critsect_.get()); 217 int64_t now_ms = TickTime::MillisecondTimestamp(); 218 int64_t oldest_packet_enqueue_time = now_ms; 219 if (!high_priority_packets_->empty()) { 220 oldest_packet_enqueue_time = std::min( 221 oldest_packet_enqueue_time, 222 high_priority_packets_->front().enqueue_time_ms_); 223 } 224 if (!normal_priority_packets_->empty()) { 225 oldest_packet_enqueue_time = std::min( 226 oldest_packet_enqueue_time, 227 normal_priority_packets_->front().enqueue_time_ms_); 228 } 229 if (!low_priority_packets_->empty()) { 230 oldest_packet_enqueue_time = std::min( 231 oldest_packet_enqueue_time, 232 low_priority_packets_->front().enqueue_time_ms_); 233 } 234 return now_ms - oldest_packet_enqueue_time; 235} 236 237int32_t PacedSender::TimeUntilNextProcess() { 238 CriticalSectionScoped cs(critsect_.get()); 239 int64_t elapsed_time_ms = 240 (TickTime::Now() - time_last_update_).Milliseconds(); 241 if (elapsed_time_ms <= 0) { 242 return kMinPacketLimitMs; 243 } 244 if (elapsed_time_ms >= kMinPacketLimitMs) { 245 return 0; 246 } 247 return kMinPacketLimitMs - elapsed_time_ms; 248} 249 250int32_t PacedSender::Process() { 251 TickTime now = TickTime::Now(); 252 CriticalSectionScoped cs(critsect_.get()); 253 int elapsed_time_ms = (now - time_last_update_).Milliseconds(); 254 time_last_update_ = now; 255 if (!enabled_) { 256 return 0; 257 } 258 if (!paused_) { 259 if (elapsed_time_ms > 0) { 260 uint32_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms); 261 UpdateBytesPerInterval(delta_time_ms); 262 } 263 paced_sender::PacketList* packet_list; 264 while (ShouldSendNextPacket(&packet_list)) { 265 if (!SendPacketFromList(packet_list)) 266 return 0; 267 } 268 if (high_priority_packets_->empty() && 269 normal_priority_packets_->empty() && 270 low_priority_packets_->empty() && 271 padding_budget_->bytes_remaining() > 0) { 272 int padding_needed = padding_budget_->bytes_remaining(); 273 critsect_->Leave(); 274 int bytes_sent = callback_->TimeToSendPadding(padding_needed); 275 critsect_->Enter(); 276 media_budget_->UseBudget(bytes_sent); 277 padding_budget_->UseBudget(bytes_sent); 278 } 279 } 280 return 0; 281} 282 283// MUST have critsect_ when calling. 284bool PacedSender::SendPacketFromList(paced_sender::PacketList* packet_list) 285 EXCLUSIVE_LOCKS_REQUIRED(critsect_.get()) { 286 paced_sender::Packet packet = GetNextPacketFromList(packet_list); 287 critsect_->Leave(); 288 289 const bool success = callback_->TimeToSendPacket(packet.ssrc_, 290 packet.sequence_number_, 291 packet.capture_time_ms_, 292 packet.retransmission_); 293 critsect_->Enter(); 294 // If packet cannot be sent then keep it in packet list and exit early. 295 // There's no need to send more packets. 296 if (!success) { 297 return false; 298 } 299 packet_list->pop_front(); 300 const bool last_packet = packet_list->empty() || 301 packet_list->front().capture_time_ms_ > packet.capture_time_ms_; 302 if (packet_list != high_priority_packets_.get()) { 303 if (packet.capture_time_ms_ > capture_time_ms_last_sent_) { 304 capture_time_ms_last_sent_ = packet.capture_time_ms_; 305 } else if (packet.capture_time_ms_ == capture_time_ms_last_sent_ && 306 last_packet) { 307 TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend", 308 packet.capture_time_ms_); 309 } 310 } 311 return true; 312} 313 314// MUST have critsect_ when calling. 315void PacedSender::UpdateBytesPerInterval(uint32_t delta_time_ms) { 316 media_budget_->IncreaseBudget(delta_time_ms); 317 padding_budget_->IncreaseBudget(delta_time_ms); 318} 319 320// MUST have critsect_ when calling. 321bool PacedSender::ShouldSendNextPacket(paced_sender::PacketList** packet_list) { 322 *packet_list = NULL; 323 if (media_budget_->bytes_remaining() <= 0) { 324 // All bytes consumed for this interval. 325 // Check if we have not sent in a too long time. 326 if ((TickTime::Now() - time_last_send_).Milliseconds() > 327 kMaxQueueTimeWithoutSendingMs) { 328 if (!high_priority_packets_->empty()) { 329 *packet_list = high_priority_packets_.get(); 330 return true; 331 } 332 if (!normal_priority_packets_->empty()) { 333 *packet_list = normal_priority_packets_.get(); 334 return true; 335 } 336 } 337 // Send any old packets to avoid queuing for too long. 338 if (max_queue_length_ms_ >= 0 && QueueInMs() > max_queue_length_ms_) { 339 int64_t high_priority_capture_time = -1; 340 if (!high_priority_packets_->empty()) { 341 high_priority_capture_time = 342 high_priority_packets_->front().capture_time_ms_; 343 *packet_list = high_priority_packets_.get(); 344 } 345 if (!normal_priority_packets_->empty() && 346 (high_priority_capture_time == -1 || high_priority_capture_time > 347 normal_priority_packets_->front().capture_time_ms_)) { 348 *packet_list = normal_priority_packets_.get(); 349 } 350 if (*packet_list) 351 return true; 352 } 353 return false; 354 } 355 if (!high_priority_packets_->empty()) { 356 *packet_list = high_priority_packets_.get(); 357 return true; 358 } 359 if (!normal_priority_packets_->empty()) { 360 *packet_list = normal_priority_packets_.get(); 361 return true; 362 } 363 if (!low_priority_packets_->empty()) { 364 *packet_list = low_priority_packets_.get(); 365 return true; 366 } 367 return false; 368} 369 370paced_sender::Packet PacedSender::GetNextPacketFromList( 371 paced_sender::PacketList* packets) { 372 paced_sender::Packet packet = packets->front(); 373 UpdateMediaBytesSent(packet.bytes_); 374 return packet; 375} 376 377// MUST have critsect_ when calling. 378void PacedSender::UpdateMediaBytesSent(int num_bytes) { 379 time_last_send_ = TickTime::Now(); 380 media_budget_->UseBudget(num_bytes); 381 padding_budget_->UseBudget(num_bytes); 382} 383 384} // namespace webrtc 385