1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "content/renderer/p2p/ipc_socket_factory.h"
6
7#include <deque>
8
9#include "base/compiler_specific.h"
10#include "base/debug/trace_event.h"
11#include "base/message_loop/message_loop.h"
12#include "base/message_loop/message_loop_proxy.h"
13#include "content/renderer/p2p/socket_client.h"
14#include "content/renderer/p2p/socket_dispatcher.h"
15#include "jingle/glue/utils.h"
16#include "third_party/libjingle/source/talk/base/asyncpacketsocket.h"
17
18namespace content {
19
20namespace {
21
22bool IsTcpClientSocket(P2PSocketType type) {
23  return (type == P2P_SOCKET_STUN_TCP_CLIENT) ||
24         (type == P2P_SOCKET_TCP_CLIENT) ||
25         (type == P2P_SOCKET_STUN_SSLTCP_CLIENT) ||
26         (type == P2P_SOCKET_SSLTCP_CLIENT) ||
27         (type == P2P_SOCKET_TLS_CLIENT) ||
28         (type == P2P_SOCKET_STUN_TLS_CLIENT);
29}
30
31// TODO(miu): This needs tuning.  http://crbug.com/237960
32const size_t kMaximumInFlightBytes = 64 * 1024;  // 64 KB
33
34// IpcPacketSocket implements talk_base::AsyncPacketSocket interface
35// using P2PSocketClient that works over IPC-channel. It must be used
36// on the thread it was created.
37class IpcPacketSocket : public talk_base::AsyncPacketSocket,
38                        public P2PSocketClient::Delegate {
39 public:
40  IpcPacketSocket();
41  virtual ~IpcPacketSocket();
42
43  // Always takes ownership of client even if initialization fails.
44  bool Init(P2PSocketType type, P2PSocketClient* client,
45            const talk_base::SocketAddress& local_address,
46            const talk_base::SocketAddress& remote_address);
47
48  // talk_base::AsyncPacketSocket interface.
49  virtual talk_base::SocketAddress GetLocalAddress() const OVERRIDE;
50  virtual talk_base::SocketAddress GetRemoteAddress() const OVERRIDE;
51  virtual int Send(const void *pv, size_t cb) OVERRIDE;
52  virtual int SendTo(const void *pv, size_t cb,
53                     const talk_base::SocketAddress& addr) OVERRIDE;
54  virtual int Close() OVERRIDE;
55  virtual State GetState() const OVERRIDE;
56  virtual int GetOption(talk_base::Socket::Option opt, int* value) OVERRIDE;
57  virtual int SetOption(talk_base::Socket::Option opt, int value) OVERRIDE;
58  virtual int GetError() const OVERRIDE;
59  virtual void SetError(int error) OVERRIDE;
60
61  // P2PSocketClient::Delegate implementation.
62  virtual void OnOpen(const net::IPEndPoint& address) OVERRIDE;
63  virtual void OnIncomingTcpConnection(const net::IPEndPoint& address,
64                                       P2PSocketClient* client) OVERRIDE;
65  virtual void OnSendComplete() OVERRIDE;
66  virtual void OnError() OVERRIDE;
67  virtual void OnDataReceived(const net::IPEndPoint& address,
68                              const std::vector<char>& data) OVERRIDE;
69
70 private:
71  enum InternalState {
72    IS_UNINITIALIZED,
73    IS_OPENING,
74    IS_OPEN,
75    IS_CLOSED,
76    IS_ERROR,
77  };
78
79  // Update trace of send throttling internal state. This should be called
80  // immediately after any changes to |send_bytes_available_| and/or
81  // |in_flight_packet_sizes_|.
82  void TraceSendThrottlingState() const;
83
84  void InitAcceptedTcp(P2PSocketClient* client,
85                       const talk_base::SocketAddress& local_address,
86                       const talk_base::SocketAddress& remote_address);
87  P2PSocketType type_;
88
89  // Message loop on which this socket was created and being used.
90  base::MessageLoop* message_loop_;
91
92  // Corresponding P2P socket client.
93  scoped_refptr<P2PSocketClient> client_;
94
95  // Local address is allocated by the browser process, and the
96  // renderer side doesn't know the address until it receives OnOpen()
97  // event from the browser.
98  talk_base::SocketAddress local_address_;
99
100  // Remote address for client TCP connections.
101  talk_base::SocketAddress remote_address_;
102
103  // Current state of the object.
104  InternalState state_;
105
106  // Track the number of bytes allowed to be sent non-blocking. This is used to
107  // throttle the sending of packets to the browser process. For each packet
108  // sent, the value is decreased. As callbacks to OnSendComplete() (as IPCs
109  // from the browser process) are made, the value is increased back. This
110  // allows short bursts of high-rate sending without dropping packets, but
111  // quickly restricts the client to a sustainable steady-state rate.
112  size_t send_bytes_available_;
113  std::deque<size_t> in_flight_packet_sizes_;
114
115  // Set to true once EWOULDBLOCK was returned from Send(). Indicates that the
116  // caller expects SignalWritable notification.
117  bool writable_signal_expected_;
118
119  // Current error code. Valid when state_ == IS_ERROR.
120  int error_;
121
122  DISALLOW_COPY_AND_ASSIGN(IpcPacketSocket);
123};
124
125IpcPacketSocket::IpcPacketSocket()
126    : type_(P2P_SOCKET_UDP),
127      message_loop_(base::MessageLoop::current()),
128      state_(IS_UNINITIALIZED),
129      send_bytes_available_(kMaximumInFlightBytes),
130      writable_signal_expected_(false),
131      error_(0) {
132  COMPILE_ASSERT(kMaximumInFlightBytes > 0, would_send_at_zero_rate);
133}
134
135IpcPacketSocket::~IpcPacketSocket() {
136  if (state_ == IS_OPENING || state_ == IS_OPEN ||
137      state_ == IS_ERROR) {
138    Close();
139  }
140}
141
142void IpcPacketSocket::TraceSendThrottlingState() const {
143  TRACE_COUNTER_ID1("p2p", "P2PSendBytesAvailable", local_address_.port(),
144                    send_bytes_available_);
145  TRACE_COUNTER_ID1("p2p", "P2PSendPacketsInFlight", local_address_.port(),
146                    in_flight_packet_sizes_.size());
147}
148
149bool IpcPacketSocket::Init(P2PSocketType type, P2PSocketClient* client,
150                           const talk_base::SocketAddress& local_address,
151                           const talk_base::SocketAddress& remote_address) {
152  DCHECK_EQ(base::MessageLoop::current(), message_loop_);
153  DCHECK_EQ(state_, IS_UNINITIALIZED);
154
155  type_ = type;
156  client_ = client;
157  local_address_ = local_address;
158  remote_address_ = remote_address;
159  state_ = IS_OPENING;
160
161  net::IPEndPoint local_endpoint;
162  if (!jingle_glue::SocketAddressToIPEndPoint(
163          local_address, &local_endpoint)) {
164    return false;
165  }
166
167  net::IPEndPoint remote_endpoint;
168  if (!remote_address.IsNil() &&
169      !jingle_glue::SocketAddressToIPEndPoint(
170          remote_address, &remote_endpoint)) {
171    return false;
172  }
173
174  client_->Init(type, local_endpoint, remote_endpoint, this);
175
176  return true;
177}
178
179void IpcPacketSocket::InitAcceptedTcp(
180    P2PSocketClient* client,
181    const talk_base::SocketAddress& local_address,
182    const talk_base::SocketAddress& remote_address) {
183  DCHECK_EQ(base::MessageLoop::current(), message_loop_);
184  DCHECK_EQ(state_, IS_UNINITIALIZED);
185
186  client_ = client;
187  local_address_ = local_address;
188  remote_address_ = remote_address;
189  state_ = IS_OPEN;
190  TraceSendThrottlingState();
191  client_->set_delegate(this);
192}
193
194// talk_base::AsyncPacketSocket interface.
195talk_base::SocketAddress IpcPacketSocket::GetLocalAddress() const {
196  DCHECK_EQ(base::MessageLoop::current(), message_loop_);
197  return local_address_;
198}
199
200talk_base::SocketAddress IpcPacketSocket::GetRemoteAddress() const {
201  DCHECK_EQ(base::MessageLoop::current(), message_loop_);
202  return remote_address_;
203}
204
205int IpcPacketSocket::Send(const void *data, size_t data_size) {
206  DCHECK_EQ(base::MessageLoop::current(), message_loop_);
207  return SendTo(data, data_size, remote_address_);
208}
209
210int IpcPacketSocket::SendTo(const void *data, size_t data_size,
211                            const talk_base::SocketAddress& address) {
212  DCHECK_EQ(base::MessageLoop::current(), message_loop_);
213
214  switch (state_) {
215    case IS_UNINITIALIZED:
216      NOTREACHED();
217      return EWOULDBLOCK;
218    case IS_OPENING:
219      return EWOULDBLOCK;
220    case IS_CLOSED:
221      return ENOTCONN;
222    case IS_ERROR:
223      return error_;
224    case IS_OPEN:
225      // Continue sending the packet.
226      break;
227  }
228
229  if (data_size == 0) {
230    NOTREACHED();
231    return 0;
232  }
233
234  if (data_size > send_bytes_available_) {
235    TRACE_EVENT_INSTANT1("p2p", "MaxPendingBytesWouldBlock",
236                         TRACE_EVENT_SCOPE_THREAD, "id", client_->socket_id());
237    writable_signal_expected_ = true;
238    error_ = EWOULDBLOCK;
239    return -1;
240  }
241
242  net::IPEndPoint address_chrome;
243  if (!jingle_glue::SocketAddressToIPEndPoint(address, &address_chrome)) {
244    NOTREACHED();
245    error_ = EINVAL;
246    return -1;
247  }
248
249  send_bytes_available_ -= data_size;
250  in_flight_packet_sizes_.push_back(data_size);
251  TraceSendThrottlingState();
252
253  const char* data_char = reinterpret_cast<const char*>(data);
254  std::vector<char> data_vector(data_char, data_char + data_size);
255  client_->Send(address_chrome, data_vector);
256
257  // Fake successful send. The caller ignores result anyway.
258  return data_size;
259}
260
261int IpcPacketSocket::Close() {
262  DCHECK_EQ(base::MessageLoop::current(), message_loop_);
263
264  client_->Close();
265  state_ = IS_CLOSED;
266
267  return 0;
268}
269
270talk_base::AsyncPacketSocket::State IpcPacketSocket::GetState() const {
271  DCHECK_EQ(base::MessageLoop::current(), message_loop_);
272
273  switch (state_) {
274    case IS_UNINITIALIZED:
275      NOTREACHED();
276      return STATE_CLOSED;
277
278    case IS_OPENING:
279      return STATE_BINDING;
280
281    case IS_OPEN:
282      if (IsTcpClientSocket(type_)) {
283        return STATE_CONNECTED;
284      } else {
285        return STATE_BOUND;
286      }
287
288    case IS_CLOSED:
289    case IS_ERROR:
290      return STATE_CLOSED;
291  }
292
293  NOTREACHED();
294  return STATE_CLOSED;
295}
296
297int IpcPacketSocket::GetOption(talk_base::Socket::Option opt, int* value) {
298  // We don't support socket options for IPC sockets.
299  return -1;
300}
301
302int IpcPacketSocket::SetOption(talk_base::Socket::Option opt, int value) {
303  // We don't support socket options for IPC sockets.
304  return -1;
305}
306
307int IpcPacketSocket::GetError() const {
308  DCHECK_EQ(base::MessageLoop::current(), message_loop_);
309  return error_;
310}
311
312void IpcPacketSocket::SetError(int error) {
313  DCHECK_EQ(base::MessageLoop::current(), message_loop_);
314  error_ = error;
315}
316
317void IpcPacketSocket::OnOpen(const net::IPEndPoint& address) {
318  DCHECK_EQ(base::MessageLoop::current(), message_loop_);
319
320  if (!jingle_glue::IPEndPointToSocketAddress(address, &local_address_)) {
321    // Always expect correct IPv4 address to be allocated.
322    NOTREACHED();
323    OnError();
324    return;
325  }
326
327  state_ = IS_OPEN;
328  TraceSendThrottlingState();
329
330  SignalAddressReady(this, local_address_);
331  if (IsTcpClientSocket(type_))
332    SignalConnect(this);
333}
334
335void IpcPacketSocket::OnIncomingTcpConnection(
336    const net::IPEndPoint& address,
337    P2PSocketClient* client) {
338  DCHECK_EQ(base::MessageLoop::current(), message_loop_);
339
340  scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
341
342  talk_base::SocketAddress remote_address;
343  if (!jingle_glue::IPEndPointToSocketAddress(address, &remote_address)) {
344    // Always expect correct IPv4 address to be allocated.
345    NOTREACHED();
346  }
347  socket->InitAcceptedTcp(client, local_address_, remote_address);
348  SignalNewConnection(this, socket.release());
349}
350
351void IpcPacketSocket::OnSendComplete() {
352  DCHECK_EQ(base::MessageLoop::current(), message_loop_);
353
354  CHECK(!in_flight_packet_sizes_.empty());
355  send_bytes_available_ += in_flight_packet_sizes_.front();
356  DCHECK_LE(send_bytes_available_, kMaximumInFlightBytes);
357  in_flight_packet_sizes_.pop_front();
358  TraceSendThrottlingState();
359
360  if (writable_signal_expected_ && send_bytes_available_ > 0) {
361    SignalReadyToSend(this);
362    writable_signal_expected_ = false;
363  }
364}
365
366void IpcPacketSocket::OnError() {
367  DCHECK_EQ(base::MessageLoop::current(), message_loop_);
368  state_ = IS_ERROR;
369  error_ = ECONNABORTED;
370}
371
372void IpcPacketSocket::OnDataReceived(const net::IPEndPoint& address,
373                                     const std::vector<char>& data) {
374  DCHECK_EQ(base::MessageLoop::current(), message_loop_);
375
376  talk_base::SocketAddress address_lj;
377  if (!jingle_glue::IPEndPointToSocketAddress(address, &address_lj)) {
378    // We should always be able to convert address here because we
379    // don't expect IPv6 address on IPv4 connections.
380    NOTREACHED();
381    return;
382  }
383
384  SignalReadPacket(this, &data[0], data.size(), address_lj);
385}
386
387}  // namespace
388
389IpcPacketSocketFactory::IpcPacketSocketFactory(
390    P2PSocketDispatcher* socket_dispatcher)
391    : socket_dispatcher_(socket_dispatcher) {
392}
393
394IpcPacketSocketFactory::~IpcPacketSocketFactory() {
395}
396
397talk_base::AsyncPacketSocket* IpcPacketSocketFactory::CreateUdpSocket(
398    const talk_base::SocketAddress& local_address, int min_port, int max_port) {
399  talk_base::SocketAddress crome_address;
400  P2PSocketClient* socket_client = new P2PSocketClient(socket_dispatcher_);
401  scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
402  // TODO(sergeyu): Respect local_address and port limits here (need
403  // to pass them over IPC channel to the browser).
404  if (!socket->Init(P2P_SOCKET_UDP, socket_client,
405                    local_address, talk_base::SocketAddress())) {
406    return NULL;
407  }
408  return socket.release();
409}
410
411talk_base::AsyncPacketSocket* IpcPacketSocketFactory::CreateServerTcpSocket(
412    const talk_base::SocketAddress& local_address, int min_port, int max_port,
413    int opts) {
414  // TODO(sergeyu): Implement SSL support.
415  if (opts & talk_base::PacketSocketFactory::OPT_SSLTCP)
416    return NULL;
417
418  P2PSocketType type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ?
419      P2P_SOCKET_STUN_TCP_SERVER : P2P_SOCKET_TCP_SERVER;
420  P2PSocketClient* socket_client = new P2PSocketClient(socket_dispatcher_);
421  scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
422  if (!socket->Init(type, socket_client, local_address,
423                    talk_base::SocketAddress())) {
424    return NULL;
425  }
426  return socket.release();
427}
428
429talk_base::AsyncPacketSocket* IpcPacketSocketFactory::CreateClientTcpSocket(
430    const talk_base::SocketAddress& local_address,
431    const talk_base::SocketAddress& remote_address,
432    const talk_base::ProxyInfo& proxy_info,
433    const std::string& user_agent, int opts) {
434  P2PSocketType type;
435  if (opts & talk_base::PacketSocketFactory::OPT_SSLTCP) {
436    type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ?
437        P2P_SOCKET_STUN_SSLTCP_CLIENT : P2P_SOCKET_SSLTCP_CLIENT;
438  } else if (opts & talk_base::PacketSocketFactory::OPT_TLS) {
439    type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ?
440        P2P_SOCKET_STUN_TLS_CLIENT : P2P_SOCKET_TLS_CLIENT;
441  } else {
442    type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ?
443        P2P_SOCKET_STUN_TCP_CLIENT : P2P_SOCKET_TCP_CLIENT;
444  }
445  P2PSocketClient* socket_client = new P2PSocketClient(socket_dispatcher_);
446  scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
447  if (!socket->Init(type, socket_client, local_address,
448                    remote_address))
449    return NULL;
450  return socket.release();
451}
452
453}  // namespace content
454