1/*
2 *  Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
3 *
4 *  Use of this source code is governed by a BSD-style license
5 *  that can be found in the LICENSE file in the root of the source
6 *  tree. An additional intellectual property rights grant can be found
7 *  in the file PATENTS.  All contributing project authors may
8 *  be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "webrtc/modules/remote_bitrate_estimator/test/bwe_test_framework.h"
12
13#include <stdio.h>
14
15#include <sstream>
16
17namespace webrtc {
18namespace testing {
19namespace bwe {
20class DelayCapHelper {
21 public:
22  DelayCapHelper() : max_delay_us_(0), delay_stats_() {}
23
24  void SetMaxDelay(int max_delay_ms) {
25    BWE_TEST_LOGGING_ENABLE(false);
26    BWE_TEST_LOGGING_LOG1("Max Delay", "%d ms", static_cast<int>(max_delay_ms));
27    assert(max_delay_ms >= 0);
28    max_delay_us_ = max_delay_ms * 1000;
29  }
30
31  bool ShouldSendPacket(int64_t send_time_us, int64_t arrival_time_us) {
32    int64_t packet_delay_us = send_time_us - arrival_time_us;
33    delay_stats_.Push(std::min(packet_delay_us, max_delay_us_) / 1000);
34    return (max_delay_us_ == 0 || max_delay_us_ >= packet_delay_us);
35  }
36
37  const Stats<double>& delay_stats() const {
38    return delay_stats_;
39  }
40
41 private:
42  int64_t max_delay_us_;
43  Stats<double> delay_stats_;
44
45  DISALLOW_COPY_AND_ASSIGN(DelayCapHelper);
46};
47
48const FlowIds CreateFlowIds(const int *flow_ids_array, size_t num_flow_ids) {
49  FlowIds flow_ids(&flow_ids_array[0], flow_ids_array + num_flow_ids);
50  return flow_ids;
51}
52
53class RateCounter {
54 public:
55  RateCounter()
56      : kWindowSizeUs(1000000),
57        packets_per_second_(0),
58        bytes_per_second_(0),
59        last_accumulated_us_(0),
60        window_() {}
61
62  void UpdateRates(int64_t send_time_us, uint32_t payload_size) {
63    packets_per_second_++;
64    bytes_per_second_ += payload_size;
65    last_accumulated_us_ = send_time_us;
66    window_.push_back(std::make_pair(send_time_us, payload_size));
67    while (!window_.empty()) {
68      const TimeSizePair& packet = window_.front();
69      if (packet.first > (last_accumulated_us_ - kWindowSizeUs)) {
70        break;
71      }
72      assert(packets_per_second_ >= 1);
73      assert(bytes_per_second_ >= packet.second);
74      packets_per_second_--;
75      bytes_per_second_ -= packet.second;
76      window_.pop_front();
77    }
78  }
79
80  uint32_t bits_per_second() const {
81    return bytes_per_second_ * 8;
82  }
83  uint32_t packets_per_second() const { return packets_per_second_; }
84
85 private:
86  typedef std::pair<int64_t, uint32_t> TimeSizePair;
87
88  const int64_t kWindowSizeUs;
89  uint32_t packets_per_second_;
90  uint32_t bytes_per_second_;
91  int64_t last_accumulated_us_;
92  std::list<TimeSizePair> window_;
93};
94
95Random::Random(uint32_t seed)
96    : a_(0x531FDB97 ^ seed),
97      b_(0x6420ECA8 + seed) {
98}
99
100float Random::Rand() {
101  const float kScale = 1.0f / 0xffffffff;
102  float result = kScale * b_;
103  a_ ^= b_;
104  b_ += a_;
105  return result;
106}
107
108int Random::Gaussian(int mean, int standard_deviation) {
109  // Creating a Normal distribution variable from two independent uniform
110  // variables based on the Box-Muller transform, which is defined on the
111  // interval (0, 1], hence the mask+add below.
112  const double kPi = 3.14159265358979323846;
113  const double kScale = 1.0 / 0x80000000ul;
114  double u1 = kScale * ((a_ & 0x7ffffffful) + 1);
115  double u2 = kScale * ((b_ & 0x7ffffffful) + 1);
116  a_ ^= b_;
117  b_ += a_;
118  return static_cast<int>(mean + standard_deviation *
119      sqrt(-2 * log(u1)) * cos(2 * kPi * u2));
120}
121
122Packet::Packet()
123    : flow_id_(0),
124      creation_time_us_(-1),
125      send_time_us_(-1),
126      payload_size_(0) {
127  memset(&header_, 0, sizeof(header_));
128}
129
130Packet::Packet(int flow_id, int64_t send_time_us, uint32_t payload_size,
131               const RTPHeader& header)
132    : flow_id_(flow_id),
133      creation_time_us_(send_time_us),
134      send_time_us_(send_time_us),
135      payload_size_(payload_size),
136      header_(header) {
137}
138
139Packet::Packet(int64_t send_time_us, uint32_t sequence_number)
140    : flow_id_(0),
141      creation_time_us_(send_time_us),
142      send_time_us_(send_time_us),
143      payload_size_(0) {
144  memset(&header_, 0, sizeof(header_));
145  header_.sequenceNumber = sequence_number;
146}
147
148bool Packet::operator<(const Packet& rhs) const {
149  return send_time_us_ < rhs.send_time_us_;
150}
151
152void Packet::set_send_time_us(int64_t send_time_us) {
153  assert(send_time_us >= 0);
154  send_time_us_ = send_time_us;
155}
156
157void Packet::SetAbsSendTimeMs(int64_t abs_send_time_ms) {
158  header_.extension.absoluteSendTime = ((static_cast<int64_t>(abs_send_time_ms *
159    (1 << 18)) + 500) / 1000) & 0x00fffffful;
160}
161
162bool IsTimeSorted(const Packets& packets) {
163  PacketsConstIt last_it = packets.begin();
164  for (PacketsConstIt it = last_it; it != packets.end(); ++it) {
165    if (it != last_it && *it < *last_it) {
166      return false;
167    }
168    last_it = it;
169  }
170  return true;
171}
172
173PacketProcessor::PacketProcessor(PacketProcessorListener* listener,
174                                 bool is_sender)
175    : listener_(listener), flow_ids_(1, 0) {
176  if (listener_) {
177    listener_->AddPacketProcessor(this, is_sender);
178  }
179}
180
181PacketProcessor::PacketProcessor(PacketProcessorListener* listener,
182                                 const FlowIds& flow_ids,
183                                 bool is_sender)
184    : listener_(listener), flow_ids_(flow_ids) {
185  if (listener_) {
186    listener_->AddPacketProcessor(this, is_sender);
187  }
188}
189
190PacketProcessor::~PacketProcessor() {
191  if (listener_) {
192    listener_->RemovePacketProcessor(this);
193  }
194}
195
196RateCounterFilter::RateCounterFilter(PacketProcessorListener* listener)
197    : PacketProcessor(listener, false),
198      rate_counter_(new RateCounter()),
199      packets_per_second_stats_(),
200      kbps_stats_(),
201      name_("") {}
202
203RateCounterFilter::RateCounterFilter(PacketProcessorListener* listener,
204                                     const std::string& name)
205    : PacketProcessor(listener, false),
206      rate_counter_(new RateCounter()),
207      packets_per_second_stats_(),
208      kbps_stats_(),
209      name_(name) {}
210
211RateCounterFilter::RateCounterFilter(PacketProcessorListener* listener,
212                                     const FlowIds& flow_ids,
213                                     const std::string& name)
214    : PacketProcessor(listener, flow_ids, false),
215      rate_counter_(new RateCounter()),
216      packets_per_second_stats_(),
217      kbps_stats_(),
218      name_(name) {
219  std::stringstream ss;
220  ss << name_ << "_";
221  for (size_t i = 0; i < flow_ids.size(); ++i) {
222    ss << flow_ids[i] << ",";
223  }
224  name_ = ss.str();
225}
226
227RateCounterFilter::~RateCounterFilter() {
228  LogStats();
229}
230
231uint32_t RateCounterFilter::packets_per_second() const {
232  return rate_counter_->packets_per_second();
233}
234
235uint32_t RateCounterFilter::bits_per_second() const {
236  return rate_counter_->bits_per_second();
237}
238
239void RateCounterFilter::LogStats() {
240  BWE_TEST_LOGGING_CONTEXT("RateCounterFilter");
241  packets_per_second_stats_.Log("pps");
242  kbps_stats_.Log("kbps");
243}
244
245Stats<double> RateCounterFilter::GetBitrateStats() const {
246  return kbps_stats_;
247}
248
249void RateCounterFilter::Plot(int64_t timestamp_ms) {
250  BWE_TEST_LOGGING_CONTEXT(name_.c_str());
251  BWE_TEST_LOGGING_PLOT("Throughput_#1", timestamp_ms,
252                        rate_counter_->bits_per_second() / 1000.0);
253}
254
255void RateCounterFilter::RunFor(int64_t /*time_ms*/, Packets* in_out) {
256  assert(in_out);
257  for (PacketsConstIt it = in_out->begin(); it != in_out->end(); ++it) {
258    rate_counter_->UpdateRates(it->send_time_us(), it->payload_size());
259  }
260  packets_per_second_stats_.Push(rate_counter_->packets_per_second());
261  kbps_stats_.Push(rate_counter_->bits_per_second() / 1000.0);
262}
263
264LossFilter::LossFilter(PacketProcessorListener* listener)
265    : PacketProcessor(listener, false),
266      random_(0x12345678),
267      loss_fraction_(0.0f) {
268}
269
270void LossFilter::SetLoss(float loss_percent) {
271  BWE_TEST_LOGGING_ENABLE(false);
272  BWE_TEST_LOGGING_LOG1("Loss", "%f%%", loss_percent);
273  assert(loss_percent >= 0.0f);
274  assert(loss_percent <= 100.0f);
275  loss_fraction_ = loss_percent * 0.01f;
276}
277
278void LossFilter::RunFor(int64_t /*time_ms*/, Packets* in_out) {
279  assert(in_out);
280  for (PacketsIt it = in_out->begin(); it != in_out->end(); ) {
281    if (random_.Rand() < loss_fraction_) {
282      it = in_out->erase(it);
283    } else {
284      ++it;
285    }
286  }
287}
288
289DelayFilter::DelayFilter(PacketProcessorListener* listener)
290    : PacketProcessor(listener, false),
291      delay_us_(0),
292      last_send_time_us_(0) {
293}
294
295void DelayFilter::SetDelay(int64_t delay_ms) {
296  BWE_TEST_LOGGING_ENABLE(false);
297  BWE_TEST_LOGGING_LOG1("Delay", "%d ms", static_cast<int>(delay_ms));
298  assert(delay_ms >= 0);
299  delay_us_ = delay_ms * 1000;
300}
301
302void DelayFilter::RunFor(int64_t /*time_ms*/, Packets* in_out) {
303  assert(in_out);
304  for (PacketsIt it = in_out->begin(); it != in_out->end(); ++it) {
305    int64_t new_send_time_us = it->send_time_us() + delay_us_;
306    last_send_time_us_ = std::max(last_send_time_us_, new_send_time_us);
307    it->set_send_time_us(last_send_time_us_);
308  }
309}
310
311JitterFilter::JitterFilter(PacketProcessorListener* listener)
312    : PacketProcessor(listener, false),
313      random_(0x89674523),
314      stddev_jitter_us_(0),
315      last_send_time_us_(0) {
316}
317
318void JitterFilter::SetJitter(int64_t stddev_jitter_ms) {
319  BWE_TEST_LOGGING_ENABLE(false);
320  BWE_TEST_LOGGING_LOG1("Jitter", "%d ms",
321                        static_cast<int>(stddev_jitter_ms));
322  assert(stddev_jitter_ms >= 0);
323  stddev_jitter_us_ = stddev_jitter_ms * 1000;
324}
325
326void JitterFilter::RunFor(int64_t /*time_ms*/, Packets* in_out) {
327  assert(in_out);
328  for (PacketsIt it = in_out->begin(); it != in_out->end(); ++it) {
329    int64_t new_send_time_us = it->send_time_us();
330    new_send_time_us += random_.Gaussian(0, stddev_jitter_us_);
331    last_send_time_us_ = std::max(last_send_time_us_, new_send_time_us);
332    it->set_send_time_us(last_send_time_us_);
333  }
334}
335
336ReorderFilter::ReorderFilter(PacketProcessorListener* listener)
337    : PacketProcessor(listener, false),
338      random_(0x27452389),
339      reorder_fraction_(0.0f) {
340}
341
342void ReorderFilter::SetReorder(float reorder_percent) {
343  BWE_TEST_LOGGING_ENABLE(false);
344  BWE_TEST_LOGGING_LOG1("Reordering", "%f%%", reorder_percent);
345  assert(reorder_percent >= 0.0f);
346  assert(reorder_percent <= 100.0f);
347  reorder_fraction_ = reorder_percent * 0.01f;
348}
349
350void ReorderFilter::RunFor(int64_t /*time_ms*/, Packets* in_out) {
351  assert(in_out);
352  if (in_out->size() >= 2) {
353    PacketsIt last_it = in_out->begin();
354    PacketsIt it = last_it;
355    while (++it != in_out->end()) {
356      if (random_.Rand() < reorder_fraction_) {
357        int64_t t1 = last_it->send_time_us();
358        int64_t t2 = it->send_time_us();
359        std::swap(*last_it, *it);
360        last_it->set_send_time_us(t1);
361        it->set_send_time_us(t2);
362      }
363      last_it = it;
364    }
365  }
366}
367
368ChokeFilter::ChokeFilter(PacketProcessorListener* listener)
369    : PacketProcessor(listener, false),
370      kbps_(1200),
371      last_send_time_us_(0),
372      delay_cap_helper_(new DelayCapHelper()) {
373}
374
375ChokeFilter::ChokeFilter(PacketProcessorListener* listener,
376                         const FlowIds& flow_ids)
377    : PacketProcessor(listener, flow_ids, false),
378      kbps_(1200),
379      last_send_time_us_(0),
380      delay_cap_helper_(new DelayCapHelper()) {
381}
382
383ChokeFilter::~ChokeFilter() {}
384
385void ChokeFilter::SetCapacity(uint32_t kbps) {
386  BWE_TEST_LOGGING_ENABLE(false);
387  BWE_TEST_LOGGING_LOG1("BitrateChoke", "%d kbps", kbps);
388  kbps_ = kbps;
389}
390
391void ChokeFilter::RunFor(int64_t /*time_ms*/, Packets* in_out) {
392  assert(in_out);
393  for (PacketsIt it = in_out->begin(); it != in_out->end(); ) {
394    int64_t earliest_send_time_us = last_send_time_us_ +
395        (it->payload_size() * 8 * 1000 + kbps_ / 2) / kbps_;
396    int64_t new_send_time_us = std::max(it->send_time_us(),
397                                        earliest_send_time_us);
398    if (delay_cap_helper_->ShouldSendPacket(new_send_time_us,
399                                            it->send_time_us())) {
400      it->set_send_time_us(new_send_time_us);
401      last_send_time_us_ = new_send_time_us;
402      ++it;
403    } else {
404      it = in_out->erase(it);
405    }
406  }
407}
408
409void ChokeFilter::SetMaxDelay(int max_delay_ms) {
410  delay_cap_helper_->SetMaxDelay(max_delay_ms);
411}
412
413Stats<double> ChokeFilter::GetDelayStats() const {
414  return delay_cap_helper_->delay_stats();
415}
416
417TraceBasedDeliveryFilter::TraceBasedDeliveryFilter(
418    PacketProcessorListener* listener)
419    : PacketProcessor(listener, false),
420      current_offset_us_(0),
421      delivery_times_us_(),
422      next_delivery_it_(),
423      local_time_us_(-1),
424      rate_counter_(new RateCounter),
425      name_(""),
426      delay_cap_helper_(new DelayCapHelper()),
427      packets_per_second_stats_(),
428      kbps_stats_() {}
429
430TraceBasedDeliveryFilter::TraceBasedDeliveryFilter(
431    PacketProcessorListener* listener,
432    const std::string& name)
433    : PacketProcessor(listener, false),
434      current_offset_us_(0),
435      delivery_times_us_(),
436      next_delivery_it_(),
437      local_time_us_(-1),
438      rate_counter_(new RateCounter),
439      name_(name),
440      delay_cap_helper_(new DelayCapHelper()),
441      packets_per_second_stats_(),
442      kbps_stats_() {}
443
444TraceBasedDeliveryFilter::~TraceBasedDeliveryFilter() {
445}
446
447bool TraceBasedDeliveryFilter::Init(const std::string& filename) {
448  FILE* trace_file = fopen(filename.c_str(), "r");
449  if (!trace_file) {
450    return false;
451  }
452  int64_t first_timestamp = -1;
453  while(!feof(trace_file)) {
454    const size_t kMaxLineLength = 100;
455    char line[kMaxLineLength];
456    if (fgets(line, kMaxLineLength, trace_file)) {
457      std::string line_string(line);
458      std::istringstream buffer(line_string);
459      int64_t timestamp;
460      buffer >> timestamp;
461      timestamp /= 1000;  // Convert to microseconds.
462      if (first_timestamp == -1)
463        first_timestamp = timestamp;
464      assert(delivery_times_us_.empty() ||
465             timestamp - first_timestamp - delivery_times_us_.back() >= 0);
466      delivery_times_us_.push_back(timestamp - first_timestamp);
467    }
468  }
469  assert(!delivery_times_us_.empty());
470  next_delivery_it_ = delivery_times_us_.begin();
471  fclose(trace_file);
472  return true;
473}
474
475void TraceBasedDeliveryFilter::Plot(int64_t timestamp_ms) {
476  BWE_TEST_LOGGING_CONTEXT(name_.c_str());
477  // This plots the max possible throughput of the trace-based delivery filter,
478  // which will be reached if a packet sent on every packet slot of the trace.
479  BWE_TEST_LOGGING_PLOT("MaxThroughput_#1", timestamp_ms,
480                        rate_counter_->bits_per_second() / 1000.0);
481}
482
483void TraceBasedDeliveryFilter::RunFor(int64_t time_ms, Packets* in_out) {
484  assert(in_out);
485  for (PacketsIt it = in_out->begin(); it != in_out->end();) {
486    while (local_time_us_ < it->send_time_us()) {
487      ProceedToNextSlot();
488    }
489    // Drop any packets that have been queued for too long.
490    while (!delay_cap_helper_->ShouldSendPacket(local_time_us_,
491                                                it->send_time_us())) {
492      it = in_out->erase(it);
493      if (it == in_out->end()) {
494        return;
495      }
496    }
497    if (local_time_us_ >= it->send_time_us()) {
498      it->set_send_time_us(local_time_us_);
499      ProceedToNextSlot();
500    }
501    ++it;
502  }
503  packets_per_second_stats_.Push(rate_counter_->packets_per_second());
504  kbps_stats_.Push(rate_counter_->bits_per_second() / 1000.0);
505}
506
507void TraceBasedDeliveryFilter::SetMaxDelay(int max_delay_ms) {
508  delay_cap_helper_->SetMaxDelay(max_delay_ms);
509}
510
511Stats<double> TraceBasedDeliveryFilter::GetDelayStats() const {
512  return delay_cap_helper_->delay_stats();
513}
514
515Stats<double> TraceBasedDeliveryFilter::GetBitrateStats() const {
516  return kbps_stats_;
517}
518
519void TraceBasedDeliveryFilter::ProceedToNextSlot() {
520  if (*next_delivery_it_ <= local_time_us_) {
521    ++next_delivery_it_;
522    if (next_delivery_it_ == delivery_times_us_.end()) {
523      // When the trace wraps we allow two packets to be sent back-to-back.
524      for (TimeList::iterator it = delivery_times_us_.begin();
525           it != delivery_times_us_.end(); ++it) {
526        *it += local_time_us_ - current_offset_us_;
527      }
528      current_offset_us_ += local_time_us_ - current_offset_us_;
529      next_delivery_it_ = delivery_times_us_.begin();
530    }
531  }
532  local_time_us_ = *next_delivery_it_;
533  const int kPayloadSize = 1200;
534  rate_counter_->UpdateRates(local_time_us_, kPayloadSize);
535}
536
537PacketSender::PacketSender(PacketProcessorListener* listener)
538    : PacketProcessor(listener, true) {}
539
540PacketSender::PacketSender(PacketProcessorListener* listener,
541                           const FlowIds& flow_ids)
542    : PacketProcessor(listener, flow_ids, true) {
543
544}
545
546VideoSender::VideoSender(int flow_id, PacketProcessorListener* listener,
547                         float fps, uint32_t kbps, uint32_t ssrc,
548                         float first_frame_offset)
549    : PacketSender(listener, FlowIds(1, flow_id)),
550      kMaxPayloadSizeBytes(1200),
551      kTimestampBase(0xff80ff00ul),
552      frame_period_ms_(1000.0 / fps),
553      bytes_per_second_((1000 * kbps) / 8),
554      frame_size_bytes_(bytes_per_second_ / fps),
555      next_frame_ms_(frame_period_ms_ * first_frame_offset),
556      now_ms_(0.0),
557      prototype_header_() {
558  assert(first_frame_offset >= 0.0f);
559  assert(first_frame_offset < 1.0f);
560  memset(&prototype_header_, 0, sizeof(prototype_header_));
561  prototype_header_.ssrc = ssrc;
562  prototype_header_.sequenceNumber = 0xf000u;
563}
564
565uint32_t VideoSender::GetCapacityKbps() const {
566  return (bytes_per_second_ * 8) / 1000;
567}
568
569void VideoSender::RunFor(int64_t time_ms, Packets* in_out) {
570  assert(in_out);
571  now_ms_ += time_ms;
572  Packets new_packets;
573  while (now_ms_ >= next_frame_ms_) {
574    prototype_header_.timestamp = kTimestampBase +
575        static_cast<uint32_t>(next_frame_ms_ * 90.0);
576    prototype_header_.extension.transmissionTimeOffset = 0;
577
578    // Generate new packets for this frame, all with the same timestamp,
579    // but the payload size is capped, so if the whole frame doesn't fit in
580    // one packet, we will see a number of equally sized packets followed by
581    // one smaller at the tail.
582    int64_t send_time_us = next_frame_ms_ * 1000.0;
583    uint32_t payload_size = frame_size_bytes_;
584    while (payload_size > 0) {
585      ++prototype_header_.sequenceNumber;
586      uint32_t size = std::min(kMaxPayloadSizeBytes, payload_size);
587      new_packets.push_back(Packet(flow_ids()[0], send_time_us, size,
588                                   prototype_header_));
589      new_packets.back().SetAbsSendTimeMs(next_frame_ms_);
590      payload_size -= size;
591    }
592
593    next_frame_ms_ += frame_period_ms_;
594  }
595  in_out->merge(new_packets);
596}
597
598AdaptiveVideoSender::AdaptiveVideoSender(int flow_id,
599                                         PacketProcessorListener* listener,
600                                         float fps,
601                                         uint32_t kbps,
602                                         uint32_t ssrc,
603                                         float first_frame_offset)
604    : VideoSender(flow_id, listener, fps, kbps, ssrc, first_frame_offset) {}
605
606void AdaptiveVideoSender::GiveFeedback(const PacketSender::Feedback& feedback) {
607  bytes_per_second_ = feedback.estimated_bps / 8;
608  frame_size_bytes_ = (bytes_per_second_ * frame_period_ms_ + 500) / 1000;
609}
610
611PacedVideoSender::PacedVideoSender(PacketProcessorListener* listener,
612                                   uint32_t kbps,
613                                   AdaptiveVideoSender* source)
614    // It is important that the first_frame_offset and the initial time of
615    // clock_ are both zero, otherwise we can't have absolute time in this
616    // class.
617    : PacketSender(listener, source->flow_ids()),
618      clock_(0),
619      start_of_run_ms_(0),
620      pacer_(&clock_, this, PacedSender::kDefaultPaceMultiplier * kbps, 0),
621      source_(source) {}
622
623void PacedVideoSender::RunFor(int64_t time_ms, Packets* in_out) {
624  start_of_run_ms_ = clock_.TimeInMilliseconds();
625  Packets generated_packets;
626  source_->RunFor(time_ms, &generated_packets);
627  Packets::iterator it = generated_packets.begin();
628  // Run process periodically to allow the packets to be paced out.
629  const int kProcessIntervalMs = 10;
630  for (int64_t current_time = 0; current_time < time_ms;
631       current_time += kProcessIntervalMs) {
632    int64_t end_of_interval_us =
633        1000 * (clock_.TimeInMilliseconds() + kProcessIntervalMs);
634    while (it != generated_packets.end() &&
635           end_of_interval_us >= it->send_time_us()) {
636      // Time to send next packet to pacer.
637      pacer_.SendPacket(PacedSender::kNormalPriority,
638                        it->header().ssrc,
639                        it->header().sequenceNumber,
640                        (it->send_time_us() + 500) / 1000,
641                        it->payload_size(),
642                        false);
643      pacer_queue_.push_back(*it);
644      const size_t kMaxPacerQueueSize = 1000;
645      if (pacer_queue_.size() > kMaxPacerQueueSize) {
646        pacer_queue_.pop_front();
647      }
648      ++it;
649    }
650    clock_.AdvanceTimeMilliseconds(kProcessIntervalMs);
651    pacer_.Process();
652  }
653  QueuePackets(in_out, (start_of_run_ms_ + time_ms) * 1000);
654}
655
656void PacedVideoSender::QueuePackets(Packets* batch,
657                                    int64_t end_of_batch_time_us) {
658  queue_.merge(*batch);
659  if (queue_.empty()) {
660    return;
661  }
662  Packets::iterator it = queue_.begin();
663  for (; it != queue_.end(); ++it) {
664    if (it->send_time_us() > end_of_batch_time_us) {
665      break;
666    }
667  }
668  Packets to_transfer;
669  to_transfer.splice(to_transfer.begin(), queue_, queue_.begin(), it);
670  batch->merge(to_transfer);
671}
672
673void PacedVideoSender::GiveFeedback(const PacketSender::Feedback& feedback) {
674  source_->GiveFeedback(feedback);
675  pacer_.UpdateBitrate(
676      PacedSender::kDefaultPaceMultiplier * feedback.estimated_bps / 1000, 0);
677}
678
679bool PacedVideoSender::TimeToSendPacket(uint32_t ssrc,
680                                        uint16_t sequence_number,
681                                        int64_t capture_time_ms,
682                                        bool retransmission) {
683  for (Packets::iterator it = pacer_queue_.begin(); it != pacer_queue_.end();
684       ++it) {
685    if (it->header().sequenceNumber == sequence_number) {
686      int64_t pace_out_time_ms = clock_.TimeInMilliseconds();
687      // Make sure a packet is never paced out earlier than when it was put into
688      // the pacer.
689      assert(1000 * pace_out_time_ms >= it->send_time_us());
690      it->SetAbsSendTimeMs(pace_out_time_ms);
691      it->set_send_time_us(1000 * pace_out_time_ms);
692      queue_.push_back(*it);
693      return true;
694    }
695  }
696  return false;
697}
698
699int PacedVideoSender::TimeToSendPadding(int bytes) {
700  return 0;
701}
702}  // namespace bwe
703}  // namespace testing
704}  // namespace webrtc
705