paced_sender.cc revision 875ad49dee49e95d212a91eb9bb1af327a80ee85
1/* 2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11#include "webrtc/modules/pacing/include/paced_sender.h" 12 13#include <assert.h> 14 15#include "webrtc/modules/interface/module_common_types.h" 16#include "webrtc/system_wrappers/interface/clock.h" 17#include "webrtc/system_wrappers/interface/critical_section_wrapper.h" 18#include "webrtc/system_wrappers/interface/trace_event.h" 19 20namespace { 21// Time limit in milliseconds between packet bursts. 22const int kMinPacketLimitMs = 5; 23 24// Upper cap on process interval, in case process has not been called in a long 25// time. 26const int kMaxIntervalTimeMs = 30; 27 28// Max time that the first packet in the queue can sit in the queue if no 29// packets are sent, regardless of buffer state. In practice only in effect at 30// low bitrates (less than 320 kbits/s). 31const int kMaxQueueTimeWithoutSendingMs = 30; 32 33} // namespace 34 35namespace webrtc { 36 37namespace paced_sender { 38struct Packet { 39 Packet(uint32_t ssrc, uint16_t seq_number, int64_t capture_time_ms, 40 int64_t enqueue_time_ms, int length_in_bytes, bool retransmission) 41 : ssrc_(ssrc), 42 sequence_number_(seq_number), 43 capture_time_ms_(capture_time_ms), 44 enqueue_time_ms_(enqueue_time_ms), 45 bytes_(length_in_bytes), 46 retransmission_(retransmission) { 47 } 48 uint32_t ssrc_; 49 uint16_t sequence_number_; 50 int64_t capture_time_ms_; 51 int64_t enqueue_time_ms_; 52 int bytes_; 53 bool retransmission_; 54}; 55 56// STL list style class which prevents duplicates in the list. 57class PacketList { 58 public: 59 PacketList() {}; 60 61 bool empty() const { 62 return packet_list_.empty(); 63 } 64 65 Packet front() const { 66 return packet_list_.front(); 67 } 68 69 void pop_front() { 70 Packet& packet = packet_list_.front(); 71 uint16_t sequence_number = packet.sequence_number_; 72 packet_list_.pop_front(); 73 sequence_number_set_.erase(sequence_number); 74 } 75 76 void push_back(const Packet& packet) { 77 if (sequence_number_set_.find(packet.sequence_number_) == 78 sequence_number_set_.end()) { 79 // Don't insert duplicates. 80 packet_list_.push_back(packet); 81 sequence_number_set_.insert(packet.sequence_number_); 82 } 83 } 84 85 private: 86 std::list<Packet> packet_list_; 87 std::set<uint16_t> sequence_number_set_; 88}; 89 90class IntervalBudget { 91 public: 92 explicit IntervalBudget(int initial_target_rate_kbps) 93 : target_rate_kbps_(initial_target_rate_kbps), 94 bytes_remaining_(0) {} 95 96 void set_target_rate_kbps(int target_rate_kbps) { 97 target_rate_kbps_ = target_rate_kbps; 98 } 99 100 void IncreaseBudget(int delta_time_ms) { 101 int bytes = target_rate_kbps_ * delta_time_ms / 8; 102 if (bytes_remaining_ < 0) { 103 // We overused last interval, compensate this interval. 104 bytes_remaining_ = bytes_remaining_ + bytes; 105 } else { 106 // If we underused last interval we can't use it this interval. 107 bytes_remaining_ = bytes; 108 } 109 } 110 111 void UseBudget(int bytes) { 112 bytes_remaining_ = std::max(bytes_remaining_ - bytes, 113 -500 * target_rate_kbps_ / 8); 114 } 115 116 int bytes_remaining() const { return bytes_remaining_; } 117 118 private: 119 int target_rate_kbps_; 120 int bytes_remaining_; 121}; 122} // namespace paced_sender 123 124const float PacedSender::kDefaultPaceMultiplier = 2.5f; 125 126PacedSender::PacedSender(Clock* clock, 127 Callback* callback, 128 int max_bitrate_kbps, 129 int min_bitrate_kbps) 130 : clock_(clock), 131 callback_(callback), 132 enabled_(true), 133 paused_(false), 134 max_queue_length_ms_(kDefaultMaxQueueLengthMs), 135 critsect_(CriticalSectionWrapper::CreateCriticalSection()), 136 media_budget_(new paced_sender::IntervalBudget(max_bitrate_kbps)), 137 padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)), 138 time_last_update_(TickTime::Now()), 139 capture_time_ms_last_queued_(0), 140 capture_time_ms_last_sent_(0), 141 high_priority_packets_(new paced_sender::PacketList), 142 normal_priority_packets_(new paced_sender::PacketList), 143 low_priority_packets_(new paced_sender::PacketList) { 144 UpdateBytesPerInterval(kMinPacketLimitMs); 145} 146 147PacedSender::~PacedSender() { 148} 149 150void PacedSender::Pause() { 151 CriticalSectionScoped cs(critsect_.get()); 152 paused_ = true; 153} 154 155void PacedSender::Resume() { 156 CriticalSectionScoped cs(critsect_.get()); 157 paused_ = false; 158} 159 160void PacedSender::SetStatus(bool enable) { 161 CriticalSectionScoped cs(critsect_.get()); 162 enabled_ = enable; 163} 164 165bool PacedSender::Enabled() const { 166 CriticalSectionScoped cs(critsect_.get()); 167 return enabled_; 168} 169 170void PacedSender::UpdateBitrate(int max_bitrate_kbps, 171 int min_bitrate_kbps) { 172 CriticalSectionScoped cs(critsect_.get()); 173 media_budget_->set_target_rate_kbps(max_bitrate_kbps); 174 padding_budget_->set_target_rate_kbps(min_bitrate_kbps); 175} 176 177bool PacedSender::SendPacket(Priority priority, uint32_t ssrc, 178 uint16_t sequence_number, int64_t capture_time_ms, int bytes, 179 bool retransmission) { 180 CriticalSectionScoped cs(critsect_.get()); 181 182 if (!enabled_) { 183 return true; // We can send now. 184 } 185 if (capture_time_ms < 0) { 186 capture_time_ms = clock_->TimeInMilliseconds(); 187 } 188 if (priority != kHighPriority && 189 capture_time_ms > capture_time_ms_last_queued_) { 190 capture_time_ms_last_queued_ = capture_time_ms; 191 TRACE_EVENT_ASYNC_BEGIN1("webrtc_rtp", "PacedSend", capture_time_ms, 192 "capture_time_ms", capture_time_ms); 193 } 194 paced_sender::PacketList* packet_list = NULL; 195 switch (priority) { 196 case kHighPriority: 197 packet_list = high_priority_packets_.get(); 198 break; 199 case kNormalPriority: 200 packet_list = normal_priority_packets_.get(); 201 break; 202 case kLowPriority: 203 packet_list = low_priority_packets_.get(); 204 break; 205 } 206 packet_list->push_back(paced_sender::Packet(ssrc, 207 sequence_number, 208 capture_time_ms, 209 clock_->TimeInMilliseconds(), 210 bytes, 211 retransmission)); 212 return false; 213} 214 215void PacedSender::set_max_queue_length_ms(int max_queue_length_ms) { 216 CriticalSectionScoped cs(critsect_.get()); 217 max_queue_length_ms_ = max_queue_length_ms; 218} 219 220int PacedSender::QueueInMs() const { 221 CriticalSectionScoped cs(critsect_.get()); 222 int64_t now_ms = clock_->TimeInMilliseconds(); 223 int64_t oldest_packet_enqueue_time = now_ms; 224 if (!high_priority_packets_->empty()) { 225 oldest_packet_enqueue_time = std::min( 226 oldest_packet_enqueue_time, 227 high_priority_packets_->front().enqueue_time_ms_); 228 } 229 if (!normal_priority_packets_->empty()) { 230 oldest_packet_enqueue_time = std::min( 231 oldest_packet_enqueue_time, 232 normal_priority_packets_->front().enqueue_time_ms_); 233 } 234 if (!low_priority_packets_->empty()) { 235 oldest_packet_enqueue_time = std::min( 236 oldest_packet_enqueue_time, 237 low_priority_packets_->front().enqueue_time_ms_); 238 } 239 return now_ms - oldest_packet_enqueue_time; 240} 241 242int32_t PacedSender::TimeUntilNextProcess() { 243 CriticalSectionScoped cs(critsect_.get()); 244 int64_t elapsed_time_ms = 245 (TickTime::Now() - time_last_update_).Milliseconds(); 246 if (elapsed_time_ms <= 0) { 247 return kMinPacketLimitMs; 248 } 249 if (elapsed_time_ms >= kMinPacketLimitMs) { 250 return 0; 251 } 252 return kMinPacketLimitMs - elapsed_time_ms; 253} 254 255int32_t PacedSender::Process() { 256 TickTime now = TickTime::Now(); 257 CriticalSectionScoped cs(critsect_.get()); 258 int elapsed_time_ms = (now - time_last_update_).Milliseconds(); 259 time_last_update_ = now; 260 if (!enabled_) { 261 return 0; 262 } 263 if (!paused_) { 264 if (elapsed_time_ms > 0) { 265 uint32_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms); 266 UpdateBytesPerInterval(delta_time_ms); 267 } 268 paced_sender::PacketList* packet_list; 269 while (ShouldSendNextPacket(&packet_list)) { 270 if (!SendPacketFromList(packet_list)) 271 return 0; 272 } 273 if (high_priority_packets_->empty() && 274 normal_priority_packets_->empty() && 275 low_priority_packets_->empty() && 276 padding_budget_->bytes_remaining() > 0) { 277 int padding_needed = padding_budget_->bytes_remaining(); 278 critsect_->Leave(); 279 int bytes_sent = callback_->TimeToSendPadding(padding_needed); 280 critsect_->Enter(); 281 media_budget_->UseBudget(bytes_sent); 282 padding_budget_->UseBudget(bytes_sent); 283 } 284 } 285 return 0; 286} 287 288// MUST have critsect_ when calling. 289bool PacedSender::SendPacketFromList(paced_sender::PacketList* packet_list) 290 EXCLUSIVE_LOCKS_REQUIRED(critsect_.get()) { 291 paced_sender::Packet packet = GetNextPacketFromList(packet_list); 292 critsect_->Leave(); 293 294 const bool success = callback_->TimeToSendPacket(packet.ssrc_, 295 packet.sequence_number_, 296 packet.capture_time_ms_, 297 packet.retransmission_); 298 critsect_->Enter(); 299 // If packet cannot be sent then keep it in packet list and exit early. 300 // There's no need to send more packets. 301 if (!success) { 302 return false; 303 } 304 packet_list->pop_front(); 305 const bool last_packet = packet_list->empty() || 306 packet_list->front().capture_time_ms_ > packet.capture_time_ms_; 307 if (packet_list != high_priority_packets_.get()) { 308 if (packet.capture_time_ms_ > capture_time_ms_last_sent_) { 309 capture_time_ms_last_sent_ = packet.capture_time_ms_; 310 } else if (packet.capture_time_ms_ == capture_time_ms_last_sent_ && 311 last_packet) { 312 TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend", 313 packet.capture_time_ms_); 314 } 315 } 316 return true; 317} 318 319// MUST have critsect_ when calling. 320void PacedSender::UpdateBytesPerInterval(uint32_t delta_time_ms) { 321 media_budget_->IncreaseBudget(delta_time_ms); 322 padding_budget_->IncreaseBudget(delta_time_ms); 323} 324 325// MUST have critsect_ when calling. 326bool PacedSender::ShouldSendNextPacket(paced_sender::PacketList** packet_list) { 327 *packet_list = NULL; 328 if (media_budget_->bytes_remaining() <= 0) { 329 // All bytes consumed for this interval. 330 // Check if we have not sent in a too long time. 331 if ((TickTime::Now() - time_last_send_).Milliseconds() > 332 kMaxQueueTimeWithoutSendingMs) { 333 if (!high_priority_packets_->empty()) { 334 *packet_list = high_priority_packets_.get(); 335 return true; 336 } 337 if (!normal_priority_packets_->empty()) { 338 *packet_list = normal_priority_packets_.get(); 339 return true; 340 } 341 } 342 // Send any old packets to avoid queuing for too long. 343 if (max_queue_length_ms_ >= 0 && QueueInMs() > max_queue_length_ms_) { 344 int64_t high_priority_capture_time = -1; 345 if (!high_priority_packets_->empty()) { 346 high_priority_capture_time = 347 high_priority_packets_->front().capture_time_ms_; 348 *packet_list = high_priority_packets_.get(); 349 } 350 if (!normal_priority_packets_->empty() && 351 (high_priority_capture_time == -1 || high_priority_capture_time > 352 normal_priority_packets_->front().capture_time_ms_)) { 353 *packet_list = normal_priority_packets_.get(); 354 } 355 if (*packet_list) 356 return true; 357 } 358 return false; 359 } 360 if (!high_priority_packets_->empty()) { 361 *packet_list = high_priority_packets_.get(); 362 return true; 363 } 364 if (!normal_priority_packets_->empty()) { 365 *packet_list = normal_priority_packets_.get(); 366 return true; 367 } 368 if (!low_priority_packets_->empty()) { 369 *packet_list = low_priority_packets_.get(); 370 return true; 371 } 372 return false; 373} 374 375paced_sender::Packet PacedSender::GetNextPacketFromList( 376 paced_sender::PacketList* packets) { 377 paced_sender::Packet packet = packets->front(); 378 UpdateMediaBytesSent(packet.bytes_); 379 return packet; 380} 381 382// MUST have critsect_ when calling. 383void PacedSender::UpdateMediaBytesSent(int num_bytes) { 384 time_last_send_ = TickTime::Now(); 385 media_budget_->UseBudget(num_bytes); 386 padding_budget_->UseBudget(num_bytes); 387} 388 389} // namespace webrtc 390