1// Copyright (c) 2013 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5 6#include "nacl_io/ossocket.h" 7#ifdef PROVIDES_SOCKET_API 8 9#include <assert.h> 10#include <errno.h> 11#include <string.h> 12#include <algorithm> 13 14#include "nacl_io/kernel_handle.h" 15#include "nacl_io/mount_node_tcp.h" 16#include "nacl_io/mount_stream.h" 17#include "nacl_io/pepper_interface.h" 18 19namespace { 20 const size_t kMaxPacketSize = 65536; 21 const size_t kDefaultFifoSize = kMaxPacketSize * 8; 22} 23 24namespace nacl_io { 25 26class TCPWork : public MountStream::Work { 27 public: 28 explicit TCPWork(const ScopedEventEmitterTCP& emitter) 29 : MountStream::Work(emitter->stream()->mount_stream()), 30 emitter_(emitter), 31 data_(NULL) { 32 } 33 34 ~TCPWork() { 35 delete[] data_; 36 } 37 38 TCPSocketInterface* TCPInterface() { 39 return mount()->ppapi()->GetTCPSocketInterface(); 40 } 41 42 protected: 43 ScopedEventEmitterTCP emitter_; 44 char* data_; 45}; 46 47class TCPSendWork : public TCPWork { 48 public: 49 explicit TCPSendWork(const ScopedEventEmitterTCP& emitter, 50 const ScopedMountNodeSocket& stream) 51 : TCPWork(emitter), node_(stream) {} 52 53 virtual bool Start(int32_t val) { 54 AUTO_LOCK(emitter_->GetLock()); 55 56 // Does the stream exist, and can it send? 57 if (!node_->TestStreamFlags(SSF_CAN_SEND)) 58 return false; 59 60 // Check if we are already sending. 61 if (node_->TestStreamFlags(SSF_SENDING)) 62 return false; 63 64 size_t tx_data_avail = emitter_->BytesInOutputFIFO(); 65 int capped_len = std::min(tx_data_avail, kMaxPacketSize); 66 if (capped_len == 0) 67 return false; 68 69 data_ = new char[capped_len]; 70 emitter_->ReadOut_Locked(data_, capped_len); 71 72 int err = TCPInterface()->Write(node_->socket_resource(), 73 data_, 74 capped_len, 75 mount()->GetRunCompletion(this)); 76 77 if (err != PP_OK_COMPLETIONPENDING) { 78 // Anything else, we should assume the socket has gone bad. 79 node_->SetError_Locked(err); 80 return false; 81 } 82 83 node_->SetStreamFlags(SSF_SENDING); 84 return true; 85 } 86 87 virtual void Run(int32_t length_error) { 88 AUTO_LOCK(emitter_->GetLock()); 89 90 if (length_error < 0) { 91 // Send failed, mark the socket as bad 92 node_->SetError_Locked(length_error); 93 return; 94 } 95 96 // If we did send, then Q more work. 97 node_->ClearStreamFlags(SSF_SENDING); 98 node_->QueueOutput(); 99 } 100 101 private: 102 // We assume that transmits will always complete. If the upstream 103 // actually back pressures, enough to prevent the Send callback 104 // from triggering, this resource may never go away. 105 ScopedMountNodeSocket node_; 106}; 107 108class TCPRecvWork : public TCPWork { 109 public: 110 explicit TCPRecvWork(const ScopedEventEmitterTCP& emitter) 111 : TCPWork(emitter) {} 112 113 virtual bool Start(int32_t val) { 114 AUTO_LOCK(emitter_->GetLock()); 115 MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream()); 116 117 // Does the stream exist, and can it recv? 118 if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV)) 119 return false; 120 121 // If we are not currently receiving 122 if (stream->TestStreamFlags(SSF_RECVING)) 123 return false; 124 125 size_t rx_space_avail = emitter_->SpaceInInputFIFO(); 126 int capped_len = 127 static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize)); 128 129 if (capped_len == 0) 130 return false; 131 132 data_ = new char[capped_len]; 133 int err = TCPInterface()->Read(stream->socket_resource(), 134 data_, 135 capped_len, 136 mount()->GetRunCompletion(this)); 137 if (err != PP_OK_COMPLETIONPENDING) { 138 // Anything else, we should assume the socket has gone bad. 139 stream->SetError_Locked(err); 140 return false; 141 } 142 143 stream->SetStreamFlags(SSF_RECVING); 144 return true; 145 } 146 147 virtual void Run(int32_t length_error) { 148 AUTO_LOCK(emitter_->GetLock()); 149 MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream()); 150 151 if (!stream) 152 return; 153 154 if (length_error <= 0) { 155 stream->SetError_Locked(length_error); 156 return; 157 } 158 159 // If we successfully received, queue more input 160 emitter_->WriteIn_Locked(data_, length_error); 161 stream->ClearStreamFlags(SSF_RECVING); 162 stream->QueueInput(); 163 } 164}; 165 166class TCPAcceptWork : public MountStream::Work { 167 public: 168 explicit TCPAcceptWork(MountStream* stream, 169 const ScopedEventEmitterTCP& emitter) 170 : MountStream::Work(stream), 171 emitter_(emitter) {} 172 173 TCPSocketInterface* TCPInterface() { 174 return mount()->ppapi()->GetTCPSocketInterface(); 175 } 176 177 virtual bool Start(int32_t val) { 178 AUTO_LOCK(emitter_->GetLock()); 179 MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream()); 180 181 // Does the stream exist, and can it accept? 182 if (NULL == node) 183 return false; 184 185 // If we are not currently accepting 186 if (!node->TestStreamFlags(SSF_LISTENING)) 187 return false; 188 189 int err = TCPInterface()->Accept(node->socket_resource(), 190 &new_socket_, 191 mount()->GetRunCompletion(this)); 192 193 if (err != PP_OK_COMPLETIONPENDING) { 194 // Anything else, we should assume the socket has gone bad. 195 node->SetError_Locked(err); 196 return false; 197 } 198 199 return true; 200 } 201 202 virtual void Run(int32_t error) { 203 AUTO_LOCK(emitter_->GetLock()); 204 MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream()); 205 206 if (node == NULL) 207 return; 208 209 if (error != PP_OK) { 210 node->SetError_Locked(error); 211 return; 212 } 213 214 emitter_->SetAcceptedSocket_Locked(new_socket_); 215 } 216 217 protected: 218 PP_Resource new_socket_; 219 ScopedEventEmitterTCP emitter_; 220}; 221 222class TCPConnectWork : public MountStream::Work { 223 public: 224 explicit TCPConnectWork(MountStream* stream, 225 const ScopedEventEmitterTCP& emitter) 226 : MountStream::Work(stream), 227 emitter_(emitter) {} 228 229 TCPSocketInterface* TCPInterface() { 230 return mount()->ppapi()->GetTCPSocketInterface(); 231 } 232 233 virtual bool Start(int32_t val) { 234 AUTO_LOCK(emitter_->GetLock()); 235 MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream()); 236 237 // Does the stream exist, and can it connect? 238 if (NULL == node) 239 return false; 240 241 int err = TCPInterface()->Connect(node->socket_resource(), 242 node->remote_addr(), 243 mount()->GetRunCompletion(this)); 244 if (err != PP_OK_COMPLETIONPENDING) { 245 // Anything else, we should assume the socket has gone bad. 246 node->SetError_Locked(err); 247 return false; 248 } 249 250 return true; 251 } 252 253 virtual void Run(int32_t error) { 254 AUTO_LOCK(emitter_->GetLock()); 255 MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream()); 256 257 if (node == NULL) 258 return; 259 260 if (error != PP_OK) { 261 node->ConnectFailed_Locked(); 262 node->SetError_Locked(error); 263 return; 264 } 265 266 node->ConnectDone_Locked(); 267 } 268 269 protected: 270 ScopedEventEmitterTCP emitter_; 271}; 272 273MountNodeTCP::MountNodeTCP(Mount* mount) 274 : MountNodeSocket(mount), 275 emitter_(new EventEmitterTCP(kDefaultFifoSize, kDefaultFifoSize)), 276 tcp_nodelay_(false) { 277 emitter_->AttachStream(this); 278} 279 280MountNodeTCP::MountNodeTCP(Mount* mount, PP_Resource socket) 281 : MountNodeSocket(mount, socket), 282 emitter_(new EventEmitterTCP(kDefaultFifoSize, kDefaultFifoSize)), 283 tcp_nodelay_(false) { 284 emitter_->AttachStream(this); 285} 286 287void MountNodeTCP::Destroy() { 288 emitter_->DetachStream(); 289 MountNodeSocket::Destroy(); 290} 291 292Error MountNodeTCP::Init(int open_flags) { 293 Error err = MountNodeSocket::Init(open_flags); 294 if (err != 0) 295 return err; 296 297 if (TCPInterface() == NULL) 298 return EACCES; 299 300 if (socket_resource_ != 0) { 301 // TCP sockets that are contructed with an existing socket_resource_ 302 // are those that generated from calls to Accept() and therefore are 303 // already connected. 304 remote_addr_ = TCPInterface()->GetRemoteAddress(socket_resource_); 305 ConnectDone_Locked(); 306 } else { 307 socket_resource_ = TCPInterface()->Create(mount_->ppapi()->GetInstance()); 308 if (0 == socket_resource_) 309 return EACCES; 310 SetStreamFlags(SSF_CAN_CONNECT); 311 } 312 313 return 0; 314} 315 316EventEmitter* MountNodeTCP::GetEventEmitter() { 317 return emitter_.get(); 318} 319 320void MountNodeTCP::SetError_Locked(int pp_error_num) { 321 MountNodeSocket::SetError_Locked(pp_error_num); 322 emitter_->SetError_Locked(); 323} 324 325Error MountNodeTCP::GetSockOpt(int lvl, 326 int optname, 327 void* optval, 328 socklen_t* len) { 329 if (lvl == IPPROTO_TCP && optname == TCP_NODELAY) { 330 AUTO_LOCK(node_lock_); 331 int value = tcp_nodelay_; 332 socklen_t value_len = sizeof(value); 333 int copy_bytes = std::min(value_len, *len); 334 memcpy(optval, &value, copy_bytes); 335 *len = value_len; 336 return 0; 337 } 338 339 return MountNodeSocket::GetSockOpt(lvl, optname, optval, len); 340} 341 342 343Error MountNodeTCP::SetNoDelay_Locked() { 344 if (!IsConnected()) 345 return 0; 346 347 int32_t error = TCPInterface()->SetOption(socket_resource_, 348 PP_TCPSOCKET_OPTION_NO_DELAY, 349 PP_MakeBool(tcp_nodelay_ ? PP_TRUE : PP_FALSE), 350 PP_BlockUntilComplete()); 351 return PPErrorToErrno(error); 352} 353 354Error MountNodeTCP::SetSockOpt(int lvl, 355 int optname, 356 const void* optval, 357 socklen_t len) { 358 if (lvl == IPPROTO_TCP && optname == TCP_NODELAY) { 359 if (len < sizeof(int)) 360 return EINVAL; 361 AUTO_LOCK(node_lock_); 362 tcp_nodelay_ = *static_cast<const int*>(optval) != 0; 363 return SetNoDelay_Locked(); 364 } 365 366 return MountNodeSocket::SetSockOpt(lvl, optname, optval, len); 367} 368 369void MountNodeTCP::QueueAccept() { 370 MountStream::Work* work = new TCPAcceptWork(mount_stream(), emitter_); 371 mount_stream()->EnqueueWork(work); 372} 373 374void MountNodeTCP::QueueConnect() { 375 MountStream::Work* work = new TCPConnectWork(mount_stream(), emitter_); 376 mount_stream()->EnqueueWork(work); 377} 378 379void MountNodeTCP::QueueInput() { 380 MountStream::Work* work = new TCPRecvWork(emitter_); 381 mount_stream()->EnqueueWork(work); 382} 383 384void MountNodeTCP::QueueOutput() { 385 if (TestStreamFlags(SSF_SENDING)) 386 return; 387 388 if (!TestStreamFlags(SSF_CAN_SEND)) 389 return; 390 391 if (0 == emitter_->BytesInOutputFIFO()) 392 return; 393 394 MountStream::Work* work = new TCPSendWork(emitter_, 395 ScopedMountNodeSocket(this)); 396 mount_stream()->EnqueueWork(work); 397} 398 399Error MountNodeTCP::Accept(const HandleAttr& attr, 400 PP_Resource* out_sock, 401 struct sockaddr* addr, 402 socklen_t* len) { 403 EventListenerLock wait(GetEventEmitter()); 404 405 if (!TestStreamFlags(SSF_LISTENING)) 406 return EINVAL; 407 408 // Either block forever or not at all 409 int ms = attr.IsBlocking() ? -1 : 0; 410 411 Error err = wait.WaitOnEvent(POLLIN, ms); 412 if (ETIMEDOUT == err) 413 return EWOULDBLOCK; 414 415 int s = emitter_->GetAcceptedSocket_Locked(); 416 // Non-blocking case. 417 if (s == 0) 418 return EAGAIN; 419 420 // Consume the new socket and start listening for the next one 421 *out_sock = s; 422 emitter_->ClearEvents_Locked(POLLIN); 423 424 // Set the out paramaters 425 PP_Resource remote_addr = TCPInterface()->GetRemoteAddress(*out_sock); 426 *len = ResourceToSockAddr(remote_addr, *len, addr); 427 mount_->ppapi()->ReleaseResource(remote_addr); 428 429 QueueAccept(); 430 return 0; 431} 432 433// We can not bind a client socket with PPAPI. For now we ignore the 434// bind but report the correct address later, just in case someone is 435// binding without really caring what the address is (for example to 436// select a more optimized interface/route.) 437Error MountNodeTCP::Bind(const struct sockaddr* addr, socklen_t len) { 438 AUTO_LOCK(node_lock_); 439 440 /* Only bind once. */ 441 if (IsBound()) 442 return EINVAL; 443 444 local_addr_ = SockAddrToResource(addr, len); 445 int err = TCPInterface()->Bind(socket_resource_, 446 local_addr_, 447 PP_BlockUntilComplete()); 448 449 // If we fail, release the local addr resource 450 if (err != PP_OK) { 451 mount_->ppapi()->ReleaseResource(local_addr_); 452 local_addr_ = 0; 453 return PPErrorToErrno(err); 454 } 455 456 return 0; 457} 458 459Error MountNodeTCP::Connect(const HandleAttr& attr, 460 const struct sockaddr* addr, 461 socklen_t len) { 462 EventListenerLock wait(GetEventEmitter()); 463 464 if (TestStreamFlags(SSF_CONNECTING)) 465 return EALREADY; 466 467 if (IsConnected()) 468 return EISCONN; 469 470 remote_addr_ = SockAddrToResource(addr, len); 471 if (0 == remote_addr_) 472 return EINVAL; 473 474 int ms = attr.IsBlocking() ? -1 : 0; 475 476 SetStreamFlags(SSF_CONNECTING); 477 QueueConnect(); 478 479 Error err = wait.WaitOnEvent(POLLOUT, ms); 480 if (ETIMEDOUT == err) 481 return EINPROGRESS; 482 483 // If we fail, release the dest addr resource 484 if (err != 0) { 485 ConnectFailed_Locked(); 486 return err; 487 } 488 489 ConnectDone_Locked(); 490 return 0; 491} 492 493Error MountNodeTCP::Shutdown(int how) { 494 AUTO_LOCK(node_lock_); 495 if (!IsConnected()) 496 return ENOTCONN; 497 { 498 AUTO_LOCK(emitter_->GetLock()); 499 emitter_->SetError_Locked(); 500 } 501 return 0; 502} 503 504void MountNodeTCP::ConnectDone_Locked() { 505 local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_); 506 507 // Now that we are connected, we can start sending and receiving. 508 ClearStreamFlags(SSF_CONNECTING | SSF_CAN_CONNECT); 509 SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV); 510 511 emitter_->ConnectDone_Locked(); 512 513 // The NODELAY option cannot be set in PPAPI before the socket 514 // is connected, but setsockopt() might have already set it. 515 SetNoDelay_Locked(); 516 517 // Begin the input pump 518 QueueInput(); 519} 520 521void MountNodeTCP::ConnectFailed_Locked() { 522 mount_->ppapi()->ReleaseResource(remote_addr_); 523 remote_addr_ = 0; 524} 525 526Error MountNodeTCP::Listen(int backlog) { 527 AUTO_LOCK(node_lock_); 528 if (!IsBound()) 529 return EINVAL; 530 531 int err = TCPInterface()->Listen(socket_resource_, 532 backlog, 533 PP_BlockUntilComplete()); 534 if (err != PP_OK) 535 return PPErrorToErrno(err); 536 537 ClearStreamFlags(SSF_CAN_CONNECT); 538 SetStreamFlags(SSF_LISTENING); 539 emitter_->SetListening_Locked(); 540 QueueAccept(); 541 return 0; 542} 543 544Error MountNodeTCP::Recv_Locked(void* buf, 545 size_t len, 546 PP_Resource* out_addr, 547 int* out_len) { 548 assert(emitter_.get()); 549 *out_len = emitter_->ReadIn_Locked((char*)buf, len); 550 *out_addr = remote_addr_; 551 552 // Ref the address copy we pass back. 553 mount_->ppapi()->AddRefResource(remote_addr_); 554 return 0; 555} 556 557// TCP ignores dst addr passed to send_to, and always uses bound address 558Error MountNodeTCP::Send_Locked(const void* buf, 559 size_t len, 560 PP_Resource, 561 int* out_len) { 562 assert(emitter_.get()); 563 if (emitter_->GetError_Locked()) 564 return EPIPE; 565 *out_len = emitter_->WriteOut_Locked((char*)buf, len); 566 return 0; 567} 568 569 570} // namespace nacl_io 571 572#endif // PROVIDES_SOCKET_API 573