1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "content/renderer/p2p/ipc_socket_factory.h" 6 7#include <deque> 8 9#include "base/compiler_specific.h" 10#include "base/debug/trace_event.h" 11#include "base/message_loop/message_loop.h" 12#include "base/message_loop/message_loop_proxy.h" 13#include "content/renderer/p2p/socket_client.h" 14#include "content/renderer/p2p/socket_dispatcher.h" 15#include "jingle/glue/utils.h" 16#include "third_party/libjingle/source/talk/base/asyncpacketsocket.h" 17 18namespace content { 19 20namespace { 21 22bool IsTcpClientSocket(P2PSocketType type) { 23 return (type == P2P_SOCKET_STUN_TCP_CLIENT) || 24 (type == P2P_SOCKET_TCP_CLIENT) || 25 (type == P2P_SOCKET_STUN_SSLTCP_CLIENT) || 26 (type == P2P_SOCKET_SSLTCP_CLIENT) || 27 (type == P2P_SOCKET_TLS_CLIENT) || 28 (type == P2P_SOCKET_STUN_TLS_CLIENT); 29} 30 31// TODO(miu): This needs tuning. http://crbug.com/237960 32const size_t kMaximumInFlightBytes = 64 * 1024; // 64 KB 33 34// IpcPacketSocket implements talk_base::AsyncPacketSocket interface 35// using P2PSocketClient that works over IPC-channel. It must be used 36// on the thread it was created. 37class IpcPacketSocket : public talk_base::AsyncPacketSocket, 38 public P2PSocketClient::Delegate { 39 public: 40 IpcPacketSocket(); 41 virtual ~IpcPacketSocket(); 42 43 // Always takes ownership of client even if initialization fails. 44 bool Init(P2PSocketType type, P2PSocketClient* client, 45 const talk_base::SocketAddress& local_address, 46 const talk_base::SocketAddress& remote_address); 47 48 // talk_base::AsyncPacketSocket interface. 49 virtual talk_base::SocketAddress GetLocalAddress() const OVERRIDE; 50 virtual talk_base::SocketAddress GetRemoteAddress() const OVERRIDE; 51 virtual int Send(const void *pv, size_t cb) OVERRIDE; 52 virtual int SendTo(const void *pv, size_t cb, 53 const talk_base::SocketAddress& addr) OVERRIDE; 54 virtual int Close() OVERRIDE; 55 virtual State GetState() const OVERRIDE; 56 virtual int GetOption(talk_base::Socket::Option opt, int* value) OVERRIDE; 57 virtual int SetOption(talk_base::Socket::Option opt, int value) OVERRIDE; 58 virtual int GetError() const OVERRIDE; 59 virtual void SetError(int error) OVERRIDE; 60 61 // P2PSocketClient::Delegate implementation. 62 virtual void OnOpen(const net::IPEndPoint& address) OVERRIDE; 63 virtual void OnIncomingTcpConnection(const net::IPEndPoint& address, 64 P2PSocketClient* client) OVERRIDE; 65 virtual void OnSendComplete() OVERRIDE; 66 virtual void OnError() OVERRIDE; 67 virtual void OnDataReceived(const net::IPEndPoint& address, 68 const std::vector<char>& data) OVERRIDE; 69 70 private: 71 enum InternalState { 72 IS_UNINITIALIZED, 73 IS_OPENING, 74 IS_OPEN, 75 IS_CLOSED, 76 IS_ERROR, 77 }; 78 79 // Update trace of send throttling internal state. This should be called 80 // immediately after any changes to |send_bytes_available_| and/or 81 // |in_flight_packet_sizes_|. 82 void TraceSendThrottlingState() const; 83 84 void InitAcceptedTcp(P2PSocketClient* client, 85 const talk_base::SocketAddress& local_address, 86 const talk_base::SocketAddress& remote_address); 87 P2PSocketType type_; 88 89 // Message loop on which this socket was created and being used. 90 base::MessageLoop* message_loop_; 91 92 // Corresponding P2P socket client. 93 scoped_refptr<P2PSocketClient> client_; 94 95 // Local address is allocated by the browser process, and the 96 // renderer side doesn't know the address until it receives OnOpen() 97 // event from the browser. 98 talk_base::SocketAddress local_address_; 99 100 // Remote address for client TCP connections. 101 talk_base::SocketAddress remote_address_; 102 103 // Current state of the object. 104 InternalState state_; 105 106 // Track the number of bytes allowed to be sent non-blocking. This is used to 107 // throttle the sending of packets to the browser process. For each packet 108 // sent, the value is decreased. As callbacks to OnSendComplete() (as IPCs 109 // from the browser process) are made, the value is increased back. This 110 // allows short bursts of high-rate sending without dropping packets, but 111 // quickly restricts the client to a sustainable steady-state rate. 112 size_t send_bytes_available_; 113 std::deque<size_t> in_flight_packet_sizes_; 114 115 // Set to true once EWOULDBLOCK was returned from Send(). Indicates that the 116 // caller expects SignalWritable notification. 117 bool writable_signal_expected_; 118 119 // Current error code. Valid when state_ == IS_ERROR. 120 int error_; 121 122 DISALLOW_COPY_AND_ASSIGN(IpcPacketSocket); 123}; 124 125IpcPacketSocket::IpcPacketSocket() 126 : type_(P2P_SOCKET_UDP), 127 message_loop_(base::MessageLoop::current()), 128 state_(IS_UNINITIALIZED), 129 send_bytes_available_(kMaximumInFlightBytes), 130 writable_signal_expected_(false), 131 error_(0) { 132 COMPILE_ASSERT(kMaximumInFlightBytes > 0, would_send_at_zero_rate); 133} 134 135IpcPacketSocket::~IpcPacketSocket() { 136 if (state_ == IS_OPENING || state_ == IS_OPEN || 137 state_ == IS_ERROR) { 138 Close(); 139 } 140} 141 142void IpcPacketSocket::TraceSendThrottlingState() const { 143 TRACE_COUNTER_ID1("p2p", "P2PSendBytesAvailable", local_address_.port(), 144 send_bytes_available_); 145 TRACE_COUNTER_ID1("p2p", "P2PSendPacketsInFlight", local_address_.port(), 146 in_flight_packet_sizes_.size()); 147} 148 149bool IpcPacketSocket::Init(P2PSocketType type, P2PSocketClient* client, 150 const talk_base::SocketAddress& local_address, 151 const talk_base::SocketAddress& remote_address) { 152 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 153 DCHECK_EQ(state_, IS_UNINITIALIZED); 154 155 type_ = type; 156 client_ = client; 157 local_address_ = local_address; 158 remote_address_ = remote_address; 159 state_ = IS_OPENING; 160 161 net::IPEndPoint local_endpoint; 162 if (!jingle_glue::SocketAddressToIPEndPoint( 163 local_address, &local_endpoint)) { 164 return false; 165 } 166 167 net::IPEndPoint remote_endpoint; 168 if (!remote_address.IsNil() && 169 !jingle_glue::SocketAddressToIPEndPoint( 170 remote_address, &remote_endpoint)) { 171 return false; 172 } 173 174 client_->Init(type, local_endpoint, remote_endpoint, this); 175 176 return true; 177} 178 179void IpcPacketSocket::InitAcceptedTcp( 180 P2PSocketClient* client, 181 const talk_base::SocketAddress& local_address, 182 const talk_base::SocketAddress& remote_address) { 183 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 184 DCHECK_EQ(state_, IS_UNINITIALIZED); 185 186 client_ = client; 187 local_address_ = local_address; 188 remote_address_ = remote_address; 189 state_ = IS_OPEN; 190 TraceSendThrottlingState(); 191 client_->set_delegate(this); 192} 193 194// talk_base::AsyncPacketSocket interface. 195talk_base::SocketAddress IpcPacketSocket::GetLocalAddress() const { 196 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 197 return local_address_; 198} 199 200talk_base::SocketAddress IpcPacketSocket::GetRemoteAddress() const { 201 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 202 return remote_address_; 203} 204 205int IpcPacketSocket::Send(const void *data, size_t data_size) { 206 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 207 return SendTo(data, data_size, remote_address_); 208} 209 210int IpcPacketSocket::SendTo(const void *data, size_t data_size, 211 const talk_base::SocketAddress& address) { 212 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 213 214 switch (state_) { 215 case IS_UNINITIALIZED: 216 NOTREACHED(); 217 return EWOULDBLOCK; 218 case IS_OPENING: 219 return EWOULDBLOCK; 220 case IS_CLOSED: 221 return ENOTCONN; 222 case IS_ERROR: 223 return error_; 224 case IS_OPEN: 225 // Continue sending the packet. 226 break; 227 } 228 229 if (data_size == 0) { 230 NOTREACHED(); 231 return 0; 232 } 233 234 if (data_size > send_bytes_available_) { 235 TRACE_EVENT_INSTANT1("p2p", "MaxPendingBytesWouldBlock", 236 TRACE_EVENT_SCOPE_THREAD, "id", client_->socket_id()); 237 writable_signal_expected_ = true; 238 error_ = EWOULDBLOCK; 239 return -1; 240 } 241 242 net::IPEndPoint address_chrome; 243 if (!jingle_glue::SocketAddressToIPEndPoint(address, &address_chrome)) { 244 NOTREACHED(); 245 error_ = EINVAL; 246 return -1; 247 } 248 249 send_bytes_available_ -= data_size; 250 in_flight_packet_sizes_.push_back(data_size); 251 TraceSendThrottlingState(); 252 253 const char* data_char = reinterpret_cast<const char*>(data); 254 std::vector<char> data_vector(data_char, data_char + data_size); 255 client_->Send(address_chrome, data_vector); 256 257 // Fake successful send. The caller ignores result anyway. 258 return data_size; 259} 260 261int IpcPacketSocket::Close() { 262 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 263 264 client_->Close(); 265 state_ = IS_CLOSED; 266 267 return 0; 268} 269 270talk_base::AsyncPacketSocket::State IpcPacketSocket::GetState() const { 271 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 272 273 switch (state_) { 274 case IS_UNINITIALIZED: 275 NOTREACHED(); 276 return STATE_CLOSED; 277 278 case IS_OPENING: 279 return STATE_BINDING; 280 281 case IS_OPEN: 282 if (IsTcpClientSocket(type_)) { 283 return STATE_CONNECTED; 284 } else { 285 return STATE_BOUND; 286 } 287 288 case IS_CLOSED: 289 case IS_ERROR: 290 return STATE_CLOSED; 291 } 292 293 NOTREACHED(); 294 return STATE_CLOSED; 295} 296 297int IpcPacketSocket::GetOption(talk_base::Socket::Option opt, int* value) { 298 // We don't support socket options for IPC sockets. 299 return -1; 300} 301 302int IpcPacketSocket::SetOption(talk_base::Socket::Option opt, int value) { 303 // We don't support socket options for IPC sockets. 304 return -1; 305} 306 307int IpcPacketSocket::GetError() const { 308 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 309 return error_; 310} 311 312void IpcPacketSocket::SetError(int error) { 313 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 314 error_ = error; 315} 316 317void IpcPacketSocket::OnOpen(const net::IPEndPoint& address) { 318 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 319 320 if (!jingle_glue::IPEndPointToSocketAddress(address, &local_address_)) { 321 // Always expect correct IPv4 address to be allocated. 322 NOTREACHED(); 323 OnError(); 324 return; 325 } 326 327 state_ = IS_OPEN; 328 TraceSendThrottlingState(); 329 330 SignalAddressReady(this, local_address_); 331 if (IsTcpClientSocket(type_)) 332 SignalConnect(this); 333} 334 335void IpcPacketSocket::OnIncomingTcpConnection( 336 const net::IPEndPoint& address, 337 P2PSocketClient* client) { 338 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 339 340 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket()); 341 342 talk_base::SocketAddress remote_address; 343 if (!jingle_glue::IPEndPointToSocketAddress(address, &remote_address)) { 344 // Always expect correct IPv4 address to be allocated. 345 NOTREACHED(); 346 } 347 socket->InitAcceptedTcp(client, local_address_, remote_address); 348 SignalNewConnection(this, socket.release()); 349} 350 351void IpcPacketSocket::OnSendComplete() { 352 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 353 354 CHECK(!in_flight_packet_sizes_.empty()); 355 send_bytes_available_ += in_flight_packet_sizes_.front(); 356 DCHECK_LE(send_bytes_available_, kMaximumInFlightBytes); 357 in_flight_packet_sizes_.pop_front(); 358 TraceSendThrottlingState(); 359 360 if (writable_signal_expected_ && send_bytes_available_ > 0) { 361 SignalReadyToSend(this); 362 writable_signal_expected_ = false; 363 } 364} 365 366void IpcPacketSocket::OnError() { 367 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 368 state_ = IS_ERROR; 369 error_ = ECONNABORTED; 370} 371 372void IpcPacketSocket::OnDataReceived(const net::IPEndPoint& address, 373 const std::vector<char>& data) { 374 DCHECK_EQ(base::MessageLoop::current(), message_loop_); 375 376 talk_base::SocketAddress address_lj; 377 if (!jingle_glue::IPEndPointToSocketAddress(address, &address_lj)) { 378 // We should always be able to convert address here because we 379 // don't expect IPv6 address on IPv4 connections. 380 NOTREACHED(); 381 return; 382 } 383 384 SignalReadPacket(this, &data[0], data.size(), address_lj); 385} 386 387} // namespace 388 389IpcPacketSocketFactory::IpcPacketSocketFactory( 390 P2PSocketDispatcher* socket_dispatcher) 391 : socket_dispatcher_(socket_dispatcher) { 392} 393 394IpcPacketSocketFactory::~IpcPacketSocketFactory() { 395} 396 397talk_base::AsyncPacketSocket* IpcPacketSocketFactory::CreateUdpSocket( 398 const talk_base::SocketAddress& local_address, int min_port, int max_port) { 399 talk_base::SocketAddress crome_address; 400 P2PSocketClient* socket_client = new P2PSocketClient(socket_dispatcher_); 401 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket()); 402 // TODO(sergeyu): Respect local_address and port limits here (need 403 // to pass them over IPC channel to the browser). 404 if (!socket->Init(P2P_SOCKET_UDP, socket_client, 405 local_address, talk_base::SocketAddress())) { 406 return NULL; 407 } 408 return socket.release(); 409} 410 411talk_base::AsyncPacketSocket* IpcPacketSocketFactory::CreateServerTcpSocket( 412 const talk_base::SocketAddress& local_address, int min_port, int max_port, 413 int opts) { 414 // TODO(sergeyu): Implement SSL support. 415 if (opts & talk_base::PacketSocketFactory::OPT_SSLTCP) 416 return NULL; 417 418 P2PSocketType type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ? 419 P2P_SOCKET_STUN_TCP_SERVER : P2P_SOCKET_TCP_SERVER; 420 P2PSocketClient* socket_client = new P2PSocketClient(socket_dispatcher_); 421 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket()); 422 if (!socket->Init(type, socket_client, local_address, 423 talk_base::SocketAddress())) { 424 return NULL; 425 } 426 return socket.release(); 427} 428 429talk_base::AsyncPacketSocket* IpcPacketSocketFactory::CreateClientTcpSocket( 430 const talk_base::SocketAddress& local_address, 431 const talk_base::SocketAddress& remote_address, 432 const talk_base::ProxyInfo& proxy_info, 433 const std::string& user_agent, int opts) { 434 P2PSocketType type; 435 if (opts & talk_base::PacketSocketFactory::OPT_SSLTCP) { 436 type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ? 437 P2P_SOCKET_STUN_SSLTCP_CLIENT : P2P_SOCKET_SSLTCP_CLIENT; 438 } else if (opts & talk_base::PacketSocketFactory::OPT_TLS) { 439 type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ? 440 P2P_SOCKET_STUN_TLS_CLIENT : P2P_SOCKET_TLS_CLIENT; 441 } else { 442 type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ? 443 P2P_SOCKET_STUN_TCP_CLIENT : P2P_SOCKET_TCP_CLIENT; 444 } 445 P2PSocketClient* socket_client = new P2PSocketClient(socket_dispatcher_); 446 scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket()); 447 if (!socket->Init(type, socket_client, local_address, 448 remote_address)) 449 return NULL; 450 return socket.release(); 451} 452 453} // namespace content 454