1// Copyright (c) 2009 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/tools/flip_server/output_ordering.h"
6
7#include "net/tools/flip_server/flip_config.h"
8#include "net/tools/flip_server/sm_connection.h"
9
10
11namespace net {
12
13// static
14double OutputOrdering::server_think_time_in_s_ = 0.0;
15
16OutputOrdering::OutputOrdering(SMConnectionInterface* connection)
17    : first_data_senders_threshold_(kInitialDataSendersThreshold),
18      connection_(connection) {
19  if (connection)
20    epoll_server_ = connection->epoll_server();
21}
22
23OutputOrdering::~OutputOrdering() {}
24
25void OutputOrdering::Reset() {
26  while (!stream_ids_.empty()) {
27    StreamIdToPriorityMap::iterator sitpmi = stream_ids_.begin();
28    PriorityMapPointer& pmp = sitpmi->second;
29    if (pmp.alarm_enabled) {
30      epoll_server_->UnregisterAlarm(pmp.alarm_token);
31    }
32    stream_ids_.erase(sitpmi);
33  }
34  priority_map_.clear();
35  first_data_senders_.clear();
36}
37
38bool OutputOrdering::ExistsInPriorityMaps(uint32 stream_id) {
39  StreamIdToPriorityMap::iterator sitpmi = stream_ids_.find(stream_id);
40  return sitpmi != stream_ids_.end();
41}
42
43OutputOrdering::BeginOutputtingAlarm::BeginOutputtingAlarm(
44    OutputOrdering* oo,
45    OutputOrdering::PriorityMapPointer* pmp,
46    const MemCacheIter& mci)
47    : output_ordering_(oo),
48      pmp_(pmp),
49      mci_(mci),
50      epoll_server_(NULL) {
51}
52
53OutputOrdering::BeginOutputtingAlarm::~BeginOutputtingAlarm() {
54  if (epoll_server_ && pmp_->alarm_enabled)
55    epoll_server_->UnregisterAlarm(pmp_->alarm_token);
56}
57
58int64 OutputOrdering::BeginOutputtingAlarm::OnAlarm() {
59  OnUnregistration();
60  output_ordering_->MoveToActive(pmp_, mci_);
61  VLOG(2) << "ON ALARM! Should now start to output...";
62  delete this;
63  return 0;
64}
65
66void OutputOrdering::BeginOutputtingAlarm::OnRegistration(
67    const EpollServer::AlarmRegToken& tok,
68    EpollServer* eps) {
69  epoll_server_ = eps;
70  pmp_->alarm_token = tok;
71  pmp_->alarm_enabled = true;
72}
73
74void OutputOrdering::BeginOutputtingAlarm::OnUnregistration() {
75  pmp_->alarm_enabled = false;
76}
77
78void OutputOrdering::BeginOutputtingAlarm::OnShutdown(EpollServer* eps) {
79  OnUnregistration();
80}
81
82void OutputOrdering::MoveToActive(PriorityMapPointer* pmp, MemCacheIter mci) {
83  VLOG(2) << "Moving to active!";
84  first_data_senders_.push_back(mci);
85  pmp->ring = &first_data_senders_;
86  pmp->it = first_data_senders_.end();
87  --pmp->it;
88  connection_->ReadyToSend();
89}
90
91void OutputOrdering::AddToOutputOrder(const MemCacheIter& mci) {
92  if (ExistsInPriorityMaps(mci.stream_id))
93    LOG(ERROR) << "OOps, already was inserted here?!";
94
95  double think_time_in_s = server_think_time_in_s_;
96  std::string x_server_latency =
97    mci.file_data->headers->GetHeader("X-Server-Latency").as_string();
98  if (!x_server_latency.empty()) {
99    char* endp;
100    double tmp_think_time_in_s = strtod(x_server_latency.c_str(), &endp);
101    if (endp != x_server_latency.c_str() + x_server_latency.size()) {
102      LOG(ERROR) << "Unable to understand X-Server-Latency of: "
103                 << x_server_latency << " for resource: "
104                 <<  mci.file_data->filename.c_str();
105    } else {
106      think_time_in_s = tmp_think_time_in_s;
107    }
108  }
109  StreamIdToPriorityMap::iterator sitpmi;
110  sitpmi = stream_ids_.insert(
111      std::pair<uint32, PriorityMapPointer>(mci.stream_id,
112                                            PriorityMapPointer())).first;
113  PriorityMapPointer& pmp = sitpmi->second;
114
115  BeginOutputtingAlarm* boa = new BeginOutputtingAlarm(this, &pmp, mci);
116  VLOG(1) << "Server think time: " << think_time_in_s;
117  epoll_server_->RegisterAlarmApproximateDelta(
118      think_time_in_s * 1000000, boa);
119}
120
121void OutputOrdering::SpliceToPriorityRing(PriorityRing::iterator pri) {
122  MemCacheIter& mci = *pri;
123  PriorityMap::iterator pmi = priority_map_.find(mci.priority);
124  if (pmi == priority_map_.end()) {
125    pmi = priority_map_.insert(
126        std::pair<uint32, PriorityRing>(mci.priority, PriorityRing())).first;
127  }
128
129  pmi->second.splice(pmi->second.end(),
130                     first_data_senders_,
131                     pri);
132  StreamIdToPriorityMap::iterator sitpmi = stream_ids_.find(mci.stream_id);
133  sitpmi->second.ring = &(pmi->second);
134}
135
136MemCacheIter* OutputOrdering::GetIter() {
137  while (!first_data_senders_.empty()) {
138    MemCacheIter& mci = first_data_senders_.front();
139    if (mci.bytes_sent >= first_data_senders_threshold_) {
140      SpliceToPriorityRing(first_data_senders_.begin());
141    } else {
142      first_data_senders_.splice(first_data_senders_.end(),
143                                first_data_senders_,
144                                first_data_senders_.begin());
145      mci.max_segment_size = kInitialDataSendersThreshold;
146      return &mci;
147    }
148  }
149  while (!priority_map_.empty()) {
150    PriorityRing& first_ring = priority_map_.begin()->second;
151    if (first_ring.empty()) {
152      priority_map_.erase(priority_map_.begin());
153      continue;
154    }
155    MemCacheIter& mci = first_ring.front();
156    first_ring.splice(first_ring.end(),
157                      first_ring,
158                      first_ring.begin());
159    mci.max_segment_size = kSpdySegmentSize;
160    return &mci;
161  }
162  return NULL;
163}
164
165void OutputOrdering::RemoveStreamId(uint32 stream_id) {
166  StreamIdToPriorityMap::iterator sitpmi = stream_ids_.find(stream_id);
167  if (sitpmi == stream_ids_.end())
168    return;
169
170  PriorityMapPointer& pmp = sitpmi->second;
171  if (pmp.alarm_enabled)
172    epoll_server_->UnregisterAlarm(pmp.alarm_token);
173  else
174    pmp.ring->erase(pmp.it);
175  stream_ids_.erase(sitpmi);
176}
177
178}  // namespace net
179
180