1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/websockets/websocket_throttle.h"
6
7#include <algorithm>
8#include <set>
9#include <string>
10#include <utility>
11
12#include "base/memory/singleton.h"
13#include "base/message_loop/message_loop.h"
14#include "base/strings/string_number_conversions.h"
15#include "base/strings/string_util.h"
16#include "base/strings/stringprintf.h"
17#include "net/base/io_buffer.h"
18#include "net/socket_stream/socket_stream.h"
19#include "net/websockets/websocket_job.h"
20
21namespace net {
22
23namespace {
24
25const size_t kMaxWebSocketJobsThrottled = 1024;
26
27}  // namespace
28
29WebSocketThrottle::WebSocketThrottle() {
30}
31
32WebSocketThrottle::~WebSocketThrottle() {
33  DCHECK(queue_.empty());
34  DCHECK(addr_map_.empty());
35}
36
37// static
38WebSocketThrottle* WebSocketThrottle::GetInstance() {
39  return Singleton<WebSocketThrottle>::get();
40}
41
42bool WebSocketThrottle::PutInQueue(WebSocketJob* job) {
43  if (queue_.size() >= kMaxWebSocketJobsThrottled)
44    return false;
45
46  queue_.push_back(job);
47  const AddressList& address_list = job->address_list();
48  std::set<IPEndPoint> address_set;
49  for (AddressList::const_iterator addr_iter = address_list.begin();
50       addr_iter != address_list.end();
51       ++addr_iter) {
52    const IPEndPoint& address = *addr_iter;
53    // If |address| is already processed, don't do it again.
54    if (!address_set.insert(address).second)
55      continue;
56
57    ConnectingAddressMap::iterator iter = addr_map_.find(address);
58    if (iter == addr_map_.end()) {
59      ConnectingAddressMap::iterator new_queue =
60          addr_map_.insert(make_pair(address, ConnectingQueue())).first;
61      new_queue->second.push_back(job);
62    } else {
63      DCHECK(!iter->second.empty());
64      iter->second.push_back(job);
65      job->SetWaiting();
66      DVLOG(1) << "Waiting on " << address.ToString();
67    }
68  }
69
70  return true;
71}
72
73void WebSocketThrottle::RemoveFromQueue(WebSocketJob* job) {
74  ConnectingQueue::iterator queue_iter =
75      std::find(queue_.begin(), queue_.end(), job);
76  if (queue_iter == queue_.end())
77    return;
78  queue_.erase(queue_iter);
79
80  std::set<WebSocketJob*> wakeup_candidates;
81
82  const AddressList& resolved_address_list = job->address_list();
83  std::set<IPEndPoint> address_set;
84  for (AddressList::const_iterator addr_iter = resolved_address_list.begin();
85       addr_iter != resolved_address_list.end();
86       ++addr_iter) {
87    const IPEndPoint& address = *addr_iter;
88    // If |address| is already processed, don't do it again.
89    if (!address_set.insert(address).second)
90      continue;
91
92    ConnectingAddressMap::iterator map_iter = addr_map_.find(address);
93    DCHECK(map_iter != addr_map_.end());
94
95    ConnectingQueue& per_address_queue = map_iter->second;
96    DCHECK(!per_address_queue.empty());
97    // Job may not be front of the queue if the socket is closed while waiting.
98    ConnectingQueue::iterator per_address_queue_iter =
99        std::find(per_address_queue.begin(), per_address_queue.end(), job);
100    bool was_front = false;
101    if (per_address_queue_iter != per_address_queue.end()) {
102      was_front = (per_address_queue_iter == per_address_queue.begin());
103      per_address_queue.erase(per_address_queue_iter);
104    }
105    if (per_address_queue.empty()) {
106      addr_map_.erase(map_iter);
107    } else if (was_front) {
108      // The new front is a wake-up candidate.
109      wakeup_candidates.insert(per_address_queue.front());
110    }
111  }
112
113  WakeupSocketIfNecessary(wakeup_candidates);
114}
115
116void WebSocketThrottle::WakeupSocketIfNecessary(
117    const std::set<WebSocketJob*>& wakeup_candidates) {
118  for (std::set<WebSocketJob*>::const_iterator iter = wakeup_candidates.begin();
119       iter != wakeup_candidates.end();
120       ++iter) {
121    WebSocketJob* job = *iter;
122    if (!job->IsWaiting())
123      continue;
124
125    bool should_wakeup = true;
126    const AddressList& resolved_address_list = job->address_list();
127    for (AddressList::const_iterator addr_iter = resolved_address_list.begin();
128         addr_iter != resolved_address_list.end();
129         ++addr_iter) {
130      const IPEndPoint& address = *addr_iter;
131      ConnectingAddressMap::iterator map_iter = addr_map_.find(address);
132      DCHECK(map_iter != addr_map_.end());
133      const ConnectingQueue& per_address_queue = map_iter->second;
134      if (job != per_address_queue.front()) {
135        should_wakeup = false;
136        break;
137      }
138    }
139    if (should_wakeup)
140      job->Wakeup();
141  }
142}
143
144}  // namespace net
145