1/*
2 *  Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3 *
4 *  Use of this source code is governed by a BSD-style license
5 *  that can be found in the LICENSE file in the root of the source
6 *  tree. An additional intellectual property rights grant can be found
7 *  in the file PATENTS.  All contributing project authors may
8 *  be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "webrtc/modules/remote_bitrate_estimator/remote_bitrate_estimator_single_stream.h"
12
13#include <utility>
14
15#include "webrtc/base/constructormagic.h"
16#include "webrtc/base/logging.h"
17#include "webrtc/base/scoped_ptr.h"
18#include "webrtc/base/thread_annotations.h"
19#include "webrtc/modules/remote_bitrate_estimator/aimd_rate_control.h"
20#include "webrtc/modules/remote_bitrate_estimator/inter_arrival.h"
21#include "webrtc/modules/remote_bitrate_estimator/overuse_detector.h"
22#include "webrtc/modules/remote_bitrate_estimator/overuse_estimator.h"
23#include "webrtc/system_wrappers/include/clock.h"
24#include "webrtc/system_wrappers/include/critical_section_wrapper.h"
25#include "webrtc/typedefs.h"
26
27namespace webrtc {
28
29enum { kTimestampGroupLengthMs = 5 };
30static const double kTimestampToMs = 1.0 / 90.0;
31
32struct RemoteBitrateEstimatorSingleStream::Detector {
33  explicit Detector(int64_t last_packet_time_ms,
34                    const OverUseDetectorOptions& options,
35                    bool enable_burst_grouping)
36      : last_packet_time_ms(last_packet_time_ms),
37        inter_arrival(90 * kTimestampGroupLengthMs,
38                      kTimestampToMs,
39                      enable_burst_grouping),
40        estimator(options),
41        detector(options) {}
42  int64_t last_packet_time_ms;
43  InterArrival inter_arrival;
44  OveruseEstimator estimator;
45  OveruseDetector detector;
46};
47
48  RemoteBitrateEstimatorSingleStream::RemoteBitrateEstimatorSingleStream(
49      RemoteBitrateObserver* observer,
50      Clock* clock)
51      : clock_(clock),
52        incoming_bitrate_(kBitrateWindowMs, 8000),
53        remote_rate_(new AimdRateControl()),
54        observer_(observer),
55        crit_sect_(CriticalSectionWrapper::CreateCriticalSection()),
56        last_process_time_(-1),
57        process_interval_ms_(kProcessIntervalMs) {
58  assert(observer_);
59  LOG(LS_INFO) << "RemoteBitrateEstimatorSingleStream: Instantiating.";
60}
61
62RemoteBitrateEstimatorSingleStream::~RemoteBitrateEstimatorSingleStream() {
63  while (!overuse_detectors_.empty()) {
64    SsrcOveruseEstimatorMap::iterator it = overuse_detectors_.begin();
65    delete it->second;
66    overuse_detectors_.erase(it);
67  }
68}
69
70void RemoteBitrateEstimatorSingleStream::IncomingPacket(int64_t arrival_time_ms,
71                                                        size_t payload_size,
72                                                        const RTPHeader& header,
73                                                        bool was_paced) {
74  uint32_t ssrc = header.ssrc;
75  uint32_t rtp_timestamp = header.timestamp +
76      header.extension.transmissionTimeOffset;
77  int64_t now_ms = clock_->TimeInMilliseconds();
78  CriticalSectionScoped cs(crit_sect_.get());
79  SsrcOveruseEstimatorMap::iterator it = overuse_detectors_.find(ssrc);
80  if (it == overuse_detectors_.end()) {
81    // This is a new SSRC. Adding to map.
82    // TODO(holmer): If the channel changes SSRC the old SSRC will still be
83    // around in this map until the channel is deleted. This is OK since the
84    // callback will no longer be called for the old SSRC. This will be
85    // automatically cleaned up when we have one RemoteBitrateEstimator per REMB
86    // group.
87    std::pair<SsrcOveruseEstimatorMap::iterator, bool> insert_result =
88        overuse_detectors_.insert(std::make_pair(
89            ssrc, new Detector(now_ms, OverUseDetectorOptions(), true)));
90    it = insert_result.first;
91  }
92  Detector* estimator = it->second;
93  estimator->last_packet_time_ms = now_ms;
94  incoming_bitrate_.Update(payload_size, now_ms);
95  const BandwidthUsage prior_state = estimator->detector.State();
96  uint32_t timestamp_delta = 0;
97  int64_t time_delta = 0;
98  int size_delta = 0;
99  if (estimator->inter_arrival.ComputeDeltas(rtp_timestamp, arrival_time_ms,
100                                             payload_size, &timestamp_delta,
101                                             &time_delta, &size_delta)) {
102    double timestamp_delta_ms = timestamp_delta * kTimestampToMs;
103    estimator->estimator.Update(time_delta, timestamp_delta_ms, size_delta,
104                                estimator->detector.State());
105    estimator->detector.Detect(estimator->estimator.offset(),
106                               timestamp_delta_ms,
107                               estimator->estimator.num_of_deltas(), now_ms);
108  }
109  if (estimator->detector.State() == kBwOverusing) {
110    uint32_t incoming_bitrate_bps = incoming_bitrate_.Rate(now_ms);
111    if (prior_state != kBwOverusing ||
112        remote_rate_->TimeToReduceFurther(now_ms, incoming_bitrate_bps)) {
113      // The first overuse should immediately trigger a new estimate.
114      // We also have to update the estimate immediately if we are overusing
115      // and the target bitrate is too high compared to what we are receiving.
116      UpdateEstimate(now_ms);
117    }
118  }
119}
120
121int32_t RemoteBitrateEstimatorSingleStream::Process() {
122  if (TimeUntilNextProcess() > 0) {
123    return 0;
124  }
125  {
126    CriticalSectionScoped cs(crit_sect_.get());
127    UpdateEstimate(clock_->TimeInMilliseconds());
128  }
129  last_process_time_ = clock_->TimeInMilliseconds();
130  return 0;
131}
132
133int64_t RemoteBitrateEstimatorSingleStream::TimeUntilNextProcess() {
134  if (last_process_time_ < 0) {
135    return 0;
136  }
137  {
138    CriticalSectionScoped cs_(crit_sect_.get());
139    return last_process_time_ + process_interval_ms_ -
140        clock_->TimeInMilliseconds();
141  }
142}
143
144void RemoteBitrateEstimatorSingleStream::UpdateEstimate(int64_t now_ms) {
145  BandwidthUsage bw_state = kBwNormal;
146  double sum_var_noise = 0.0;
147  SsrcOveruseEstimatorMap::iterator it = overuse_detectors_.begin();
148  while (it != overuse_detectors_.end()) {
149    const int64_t time_of_last_received_packet =
150        it->second->last_packet_time_ms;
151    if (time_of_last_received_packet >= 0 &&
152        now_ms - time_of_last_received_packet > kStreamTimeOutMs) {
153      // This over-use detector hasn't received packets for |kStreamTimeOutMs|
154      // milliseconds and is considered stale.
155      delete it->second;
156      overuse_detectors_.erase(it++);
157    } else {
158      sum_var_noise += it->second->estimator.var_noise();
159      // Make sure that we trigger an over-use if any of the over-use detectors
160      // is detecting over-use.
161      if (it->second->detector.State() > bw_state) {
162        bw_state = it->second->detector.State();
163      }
164      ++it;
165    }
166  }
167  // We can't update the estimate if we don't have any active streams.
168  if (overuse_detectors_.empty()) {
169    remote_rate_.reset(new AimdRateControl());
170    return;
171  }
172  double mean_noise_var = sum_var_noise /
173      static_cast<double>(overuse_detectors_.size());
174  const RateControlInput input(bw_state,
175                               incoming_bitrate_.Rate(now_ms),
176                               mean_noise_var);
177  remote_rate_->Update(&input, now_ms);
178  unsigned int target_bitrate = remote_rate_->UpdateBandwidthEstimate(now_ms);
179  if (remote_rate_->ValidEstimate()) {
180    process_interval_ms_ = remote_rate_->GetFeedbackInterval();
181    std::vector<unsigned int> ssrcs;
182    GetSsrcs(&ssrcs);
183    observer_->OnReceiveBitrateChanged(ssrcs, target_bitrate);
184  }
185}
186
187void RemoteBitrateEstimatorSingleStream::OnRttUpdate(int64_t avg_rtt_ms,
188                                                     int64_t max_rtt_ms) {
189  CriticalSectionScoped cs(crit_sect_.get());
190  remote_rate_->SetRtt(avg_rtt_ms);
191}
192
193void RemoteBitrateEstimatorSingleStream::RemoveStream(unsigned int ssrc) {
194  CriticalSectionScoped cs(crit_sect_.get());
195  SsrcOveruseEstimatorMap::iterator it = overuse_detectors_.find(ssrc);
196  if (it != overuse_detectors_.end()) {
197    delete it->second;
198    overuse_detectors_.erase(it);
199  }
200}
201
202bool RemoteBitrateEstimatorSingleStream::LatestEstimate(
203    std::vector<unsigned int>* ssrcs,
204    unsigned int* bitrate_bps) const {
205  CriticalSectionScoped cs(crit_sect_.get());
206  assert(bitrate_bps);
207  if (!remote_rate_->ValidEstimate()) {
208    return false;
209  }
210  GetSsrcs(ssrcs);
211  if (ssrcs->empty())
212    *bitrate_bps = 0;
213  else
214    *bitrate_bps = remote_rate_->LatestEstimate();
215  return true;
216}
217
218bool RemoteBitrateEstimatorSingleStream::GetStats(
219    ReceiveBandwidthEstimatorStats* output) const {
220  // Not implemented.
221  return false;
222}
223
224void RemoteBitrateEstimatorSingleStream::GetSsrcs(
225    std::vector<unsigned int>* ssrcs) const {
226  assert(ssrcs);
227  ssrcs->resize(overuse_detectors_.size());
228  int i = 0;
229  for (SsrcOveruseEstimatorMap::const_iterator it = overuse_detectors_.begin();
230      it != overuse_detectors_.end(); ++it, ++i) {
231    (*ssrcs)[i] = it->first;
232  }
233}
234
235void RemoteBitrateEstimatorSingleStream::SetMinBitrate(int min_bitrate_bps) {
236  CriticalSectionScoped cs(crit_sect_.get());
237  remote_rate_->SetMinBitrate(min_bitrate_bps);
238}
239
240}  // namespace webrtc
241