1/* 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11#if defined(_MSC_VER) && _MSC_VER < 1300 12#pragma warning(disable:4786) 13#endif 14 15#include <assert.h> 16 17#ifdef MEMORY_SANITIZER 18#include <sanitizer/msan_interface.h> 19#endif 20 21#if defined(WEBRTC_POSIX) 22#include <string.h> 23#include <errno.h> 24#include <fcntl.h> 25#include <sys/time.h> 26#include <sys/select.h> 27#include <unistd.h> 28#include <signal.h> 29#endif 30 31#if defined(WEBRTC_WIN) 32#define WIN32_LEAN_AND_MEAN 33#include <windows.h> 34#include <winsock2.h> 35#include <ws2tcpip.h> 36#undef SetPort 37#endif 38 39#include <algorithm> 40#include <map> 41 42#include "webrtc/base/arraysize.h" 43#include "webrtc/base/basictypes.h" 44#include "webrtc/base/byteorder.h" 45#include "webrtc/base/common.h" 46#include "webrtc/base/logging.h" 47#include "webrtc/base/physicalsocketserver.h" 48#include "webrtc/base/timeutils.h" 49#include "webrtc/base/winping.h" 50#include "webrtc/base/win32socketinit.h" 51 52// stm: this will tell us if we are on OSX 53#ifdef HAVE_CONFIG_H 54#include "config.h" 55#endif 56 57#if defined(WEBRTC_POSIX) 58#include <netinet/tcp.h> // for TCP_NODELAY 59#define IP_MTU 14 // Until this is integrated from linux/in.h to netinet/in.h 60typedef void* SockOptArg; 61#endif // WEBRTC_POSIX 62 63#if defined(WEBRTC_WIN) 64typedef char* SockOptArg; 65#endif 66 67namespace rtc { 68 69#if defined(WEBRTC_WIN) 70// Standard MTUs, from RFC 1191 71const uint16_t PACKET_MAXIMUMS[] = { 72 65535, // Theoretical maximum, Hyperchannel 73 32000, // Nothing 74 17914, // 16Mb IBM Token Ring 75 8166, // IEEE 802.4 76 // 4464, // IEEE 802.5 (4Mb max) 77 4352, // FDDI 78 // 2048, // Wideband Network 79 2002, // IEEE 802.5 (4Mb recommended) 80 // 1536, // Expermental Ethernet Networks 81 // 1500, // Ethernet, Point-to-Point (default) 82 1492, // IEEE 802.3 83 1006, // SLIP, ARPANET 84 // 576, // X.25 Networks 85 // 544, // DEC IP Portal 86 // 512, // NETBIOS 87 508, // IEEE 802/Source-Rt Bridge, ARCNET 88 296, // Point-to-Point (low delay) 89 68, // Official minimum 90 0, // End of list marker 91}; 92 93static const int IP_HEADER_SIZE = 20u; 94static const int IPV6_HEADER_SIZE = 40u; 95static const int ICMP_HEADER_SIZE = 8u; 96static const int ICMP_PING_TIMEOUT_MILLIS = 10000u; 97#endif 98 99PhysicalSocket::PhysicalSocket(PhysicalSocketServer* ss, SOCKET s) 100 : ss_(ss), s_(s), enabled_events_(0), error_(0), 101 state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED), 102 resolver_(nullptr) { 103#if defined(WEBRTC_WIN) 104 // EnsureWinsockInit() ensures that winsock is initialized. The default 105 // version of this function doesn't do anything because winsock is 106 // initialized by constructor of a static object. If neccessary libjingle 107 // users can link it with a different version of this function by replacing 108 // win32socketinit.cc. See win32socketinit.cc for more details. 109 EnsureWinsockInit(); 110#endif 111 if (s_ != INVALID_SOCKET) { 112 enabled_events_ = DE_READ | DE_WRITE; 113 114 int type = SOCK_STREAM; 115 socklen_t len = sizeof(type); 116 VERIFY(0 == getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len)); 117 udp_ = (SOCK_DGRAM == type); 118 } 119} 120 121PhysicalSocket::~PhysicalSocket() { 122 Close(); 123} 124 125bool PhysicalSocket::Create(int family, int type) { 126 Close(); 127 s_ = ::socket(family, type, 0); 128 udp_ = (SOCK_DGRAM == type); 129 UpdateLastError(); 130 if (udp_) 131 enabled_events_ = DE_READ | DE_WRITE; 132 return s_ != INVALID_SOCKET; 133} 134 135SocketAddress PhysicalSocket::GetLocalAddress() const { 136 sockaddr_storage addr_storage = {0}; 137 socklen_t addrlen = sizeof(addr_storage); 138 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 139 int result = ::getsockname(s_, addr, &addrlen); 140 SocketAddress address; 141 if (result >= 0) { 142 SocketAddressFromSockAddrStorage(addr_storage, &address); 143 } else { 144 LOG(LS_WARNING) << "GetLocalAddress: unable to get local addr, socket=" 145 << s_; 146 } 147 return address; 148} 149 150SocketAddress PhysicalSocket::GetRemoteAddress() const { 151 sockaddr_storage addr_storage = {0}; 152 socklen_t addrlen = sizeof(addr_storage); 153 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 154 int result = ::getpeername(s_, addr, &addrlen); 155 SocketAddress address; 156 if (result >= 0) { 157 SocketAddressFromSockAddrStorage(addr_storage, &address); 158 } else { 159 LOG(LS_WARNING) << "GetRemoteAddress: unable to get remote addr, socket=" 160 << s_; 161 } 162 return address; 163} 164 165int PhysicalSocket::Bind(const SocketAddress& bind_addr) { 166 sockaddr_storage addr_storage; 167 size_t len = bind_addr.ToSockAddrStorage(&addr_storage); 168 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 169 int err = ::bind(s_, addr, static_cast<int>(len)); 170 UpdateLastError(); 171#if !defined(NDEBUG) 172 if (0 == err) { 173 dbg_addr_ = "Bound @ "; 174 dbg_addr_.append(GetLocalAddress().ToString()); 175 } 176#endif 177 return err; 178} 179 180int PhysicalSocket::Connect(const SocketAddress& addr) { 181 // TODO(pthatcher): Implicit creation is required to reconnect... 182 // ...but should we make it more explicit? 183 if (state_ != CS_CLOSED) { 184 SetError(EALREADY); 185 return SOCKET_ERROR; 186 } 187 if (addr.IsUnresolvedIP()) { 188 LOG(LS_VERBOSE) << "Resolving addr in PhysicalSocket::Connect"; 189 resolver_ = new AsyncResolver(); 190 resolver_->SignalDone.connect(this, &PhysicalSocket::OnResolveResult); 191 resolver_->Start(addr); 192 state_ = CS_CONNECTING; 193 return 0; 194 } 195 196 return DoConnect(addr); 197} 198 199int PhysicalSocket::DoConnect(const SocketAddress& connect_addr) { 200 if ((s_ == INVALID_SOCKET) && 201 !Create(connect_addr.family(), SOCK_STREAM)) { 202 return SOCKET_ERROR; 203 } 204 sockaddr_storage addr_storage; 205 size_t len = connect_addr.ToSockAddrStorage(&addr_storage); 206 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 207 int err = ::connect(s_, addr, static_cast<int>(len)); 208 UpdateLastError(); 209 if (err == 0) { 210 state_ = CS_CONNECTED; 211 } else if (IsBlockingError(GetError())) { 212 state_ = CS_CONNECTING; 213 enabled_events_ |= DE_CONNECT; 214 } else { 215 return SOCKET_ERROR; 216 } 217 218 enabled_events_ |= DE_READ | DE_WRITE; 219 return 0; 220} 221 222int PhysicalSocket::GetError() const { 223 CritScope cs(&crit_); 224 return error_; 225} 226 227void PhysicalSocket::SetError(int error) { 228 CritScope cs(&crit_); 229 error_ = error; 230} 231 232AsyncSocket::ConnState PhysicalSocket::GetState() const { 233 return state_; 234} 235 236int PhysicalSocket::GetOption(Option opt, int* value) { 237 int slevel; 238 int sopt; 239 if (TranslateOption(opt, &slevel, &sopt) == -1) 240 return -1; 241 socklen_t optlen = sizeof(*value); 242 int ret = ::getsockopt(s_, slevel, sopt, (SockOptArg)value, &optlen); 243 if (ret != -1 && opt == OPT_DONTFRAGMENT) { 244#if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID) 245 *value = (*value != IP_PMTUDISC_DONT) ? 1 : 0; 246#endif 247 } 248 return ret; 249} 250 251int PhysicalSocket::SetOption(Option opt, int value) { 252 int slevel; 253 int sopt; 254 if (TranslateOption(opt, &slevel, &sopt) == -1) 255 return -1; 256 if (opt == OPT_DONTFRAGMENT) { 257#if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID) 258 value = (value) ? IP_PMTUDISC_DO : IP_PMTUDISC_DONT; 259#endif 260 } 261 return ::setsockopt(s_, slevel, sopt, (SockOptArg)&value, sizeof(value)); 262} 263 264int PhysicalSocket::Send(const void* pv, size_t cb) { 265 int sent = ::send(s_, reinterpret_cast<const char *>(pv), (int)cb, 266#if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID) 267 // Suppress SIGPIPE. Without this, attempting to send on a socket whose 268 // other end is closed will result in a SIGPIPE signal being raised to 269 // our process, which by default will terminate the process, which we 270 // don't want. By specifying this flag, we'll just get the error EPIPE 271 // instead and can handle the error gracefully. 272 MSG_NOSIGNAL 273#else 274 0 275#endif 276 ); 277 UpdateLastError(); 278 MaybeRemapSendError(); 279 // We have seen minidumps where this may be false. 280 ASSERT(sent <= static_cast<int>(cb)); 281 if ((sent < 0) && IsBlockingError(GetError())) { 282 enabled_events_ |= DE_WRITE; 283 } 284 return sent; 285} 286 287int PhysicalSocket::SendTo(const void* buffer, 288 size_t length, 289 const SocketAddress& addr) { 290 sockaddr_storage saddr; 291 size_t len = addr.ToSockAddrStorage(&saddr); 292 int sent = ::sendto( 293 s_, static_cast<const char *>(buffer), static_cast<int>(length), 294#if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID) 295 // Suppress SIGPIPE. See above for explanation. 296 MSG_NOSIGNAL, 297#else 298 0, 299#endif 300 reinterpret_cast<sockaddr*>(&saddr), static_cast<int>(len)); 301 UpdateLastError(); 302 MaybeRemapSendError(); 303 // We have seen minidumps where this may be false. 304 ASSERT(sent <= static_cast<int>(length)); 305 if ((sent < 0) && IsBlockingError(GetError())) { 306 enabled_events_ |= DE_WRITE; 307 } 308 return sent; 309} 310 311int PhysicalSocket::Recv(void* buffer, size_t length) { 312 int received = ::recv(s_, static_cast<char*>(buffer), 313 static_cast<int>(length), 0); 314 if ((received == 0) && (length != 0)) { 315 // Note: on graceful shutdown, recv can return 0. In this case, we 316 // pretend it is blocking, and then signal close, so that simplifying 317 // assumptions can be made about Recv. 318 LOG(LS_WARNING) << "EOF from socket; deferring close event"; 319 // Must turn this back on so that the select() loop will notice the close 320 // event. 321 enabled_events_ |= DE_READ; 322 SetError(EWOULDBLOCK); 323 return SOCKET_ERROR; 324 } 325 UpdateLastError(); 326 int error = GetError(); 327 bool success = (received >= 0) || IsBlockingError(error); 328 if (udp_ || success) { 329 enabled_events_ |= DE_READ; 330 } 331 if (!success) { 332 LOG_F(LS_VERBOSE) << "Error = " << error; 333 } 334 return received; 335} 336 337int PhysicalSocket::RecvFrom(void* buffer, 338 size_t length, 339 SocketAddress* out_addr) { 340 sockaddr_storage addr_storage; 341 socklen_t addr_len = sizeof(addr_storage); 342 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 343 int received = ::recvfrom(s_, static_cast<char*>(buffer), 344 static_cast<int>(length), 0, addr, &addr_len); 345 UpdateLastError(); 346 if ((received >= 0) && (out_addr != nullptr)) 347 SocketAddressFromSockAddrStorage(addr_storage, out_addr); 348 int error = GetError(); 349 bool success = (received >= 0) || IsBlockingError(error); 350 if (udp_ || success) { 351 enabled_events_ |= DE_READ; 352 } 353 if (!success) { 354 LOG_F(LS_VERBOSE) << "Error = " << error; 355 } 356 return received; 357} 358 359int PhysicalSocket::Listen(int backlog) { 360 int err = ::listen(s_, backlog); 361 UpdateLastError(); 362 if (err == 0) { 363 state_ = CS_CONNECTING; 364 enabled_events_ |= DE_ACCEPT; 365#if !defined(NDEBUG) 366 dbg_addr_ = "Listening @ "; 367 dbg_addr_.append(GetLocalAddress().ToString()); 368#endif 369 } 370 return err; 371} 372 373AsyncSocket* PhysicalSocket::Accept(SocketAddress* out_addr) { 374 // Always re-subscribe DE_ACCEPT to make sure new incoming connections will 375 // trigger an event even if DoAccept returns an error here. 376 enabled_events_ |= DE_ACCEPT; 377 sockaddr_storage addr_storage; 378 socklen_t addr_len = sizeof(addr_storage); 379 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 380 SOCKET s = DoAccept(s_, addr, &addr_len); 381 UpdateLastError(); 382 if (s == INVALID_SOCKET) 383 return nullptr; 384 if (out_addr != nullptr) 385 SocketAddressFromSockAddrStorage(addr_storage, out_addr); 386 return ss_->WrapSocket(s); 387} 388 389int PhysicalSocket::Close() { 390 if (s_ == INVALID_SOCKET) 391 return 0; 392 int err = ::closesocket(s_); 393 UpdateLastError(); 394 s_ = INVALID_SOCKET; 395 state_ = CS_CLOSED; 396 enabled_events_ = 0; 397 if (resolver_) { 398 resolver_->Destroy(false); 399 resolver_ = nullptr; 400 } 401 return err; 402} 403 404int PhysicalSocket::EstimateMTU(uint16_t* mtu) { 405 SocketAddress addr = GetRemoteAddress(); 406 if (addr.IsAnyIP()) { 407 SetError(ENOTCONN); 408 return -1; 409 } 410 411#if defined(WEBRTC_WIN) 412 // Gets the interface MTU (TTL=1) for the interface used to reach |addr|. 413 WinPing ping; 414 if (!ping.IsValid()) { 415 SetError(EINVAL); // can't think of a better error ID 416 return -1; 417 } 418 int header_size = ICMP_HEADER_SIZE; 419 if (addr.family() == AF_INET6) { 420 header_size += IPV6_HEADER_SIZE; 421 } else if (addr.family() == AF_INET) { 422 header_size += IP_HEADER_SIZE; 423 } 424 425 for (int level = 0; PACKET_MAXIMUMS[level + 1] > 0; ++level) { 426 int32_t size = PACKET_MAXIMUMS[level] - header_size; 427 WinPing::PingResult result = ping.Ping(addr.ipaddr(), size, 428 ICMP_PING_TIMEOUT_MILLIS, 429 1, false); 430 if (result == WinPing::PING_FAIL) { 431 SetError(EINVAL); // can't think of a better error ID 432 return -1; 433 } else if (result != WinPing::PING_TOO_LARGE) { 434 *mtu = PACKET_MAXIMUMS[level]; 435 return 0; 436 } 437 } 438 439 ASSERT(false); 440 return -1; 441#elif defined(WEBRTC_MAC) 442 // No simple way to do this on Mac OS X. 443 // SIOCGIFMTU would work if we knew which interface would be used, but 444 // figuring that out is pretty complicated. For now we'll return an error 445 // and let the caller pick a default MTU. 446 SetError(EINVAL); 447 return -1; 448#elif defined(WEBRTC_LINUX) 449 // Gets the path MTU. 450 int value; 451 socklen_t vlen = sizeof(value); 452 int err = getsockopt(s_, IPPROTO_IP, IP_MTU, &value, &vlen); 453 if (err < 0) { 454 UpdateLastError(); 455 return err; 456 } 457 458 ASSERT((0 <= value) && (value <= 65536)); 459 *mtu = value; 460 return 0; 461#elif defined(__native_client__) 462 // Most socket operations, including this, will fail in NaCl's sandbox. 463 error_ = EACCES; 464 return -1; 465#endif 466} 467 468 469SOCKET PhysicalSocket::DoAccept(SOCKET socket, 470 sockaddr* addr, 471 socklen_t* addrlen) { 472 return ::accept(socket, addr, addrlen); 473} 474 475void PhysicalSocket::OnResolveResult(AsyncResolverInterface* resolver) { 476 if (resolver != resolver_) { 477 return; 478 } 479 480 int error = resolver_->GetError(); 481 if (error == 0) { 482 error = DoConnect(resolver_->address()); 483 } else { 484 Close(); 485 } 486 487 if (error) { 488 SetError(error); 489 SignalCloseEvent(this, error); 490 } 491} 492 493void PhysicalSocket::UpdateLastError() { 494 SetError(LAST_SYSTEM_ERROR); 495} 496 497void PhysicalSocket::MaybeRemapSendError() { 498#if defined(WEBRTC_MAC) 499 // https://developer.apple.com/library/mac/documentation/Darwin/ 500 // Reference/ManPages/man2/sendto.2.html 501 // ENOBUFS - The output queue for a network interface is full. 502 // This generally indicates that the interface has stopped sending, 503 // but may be caused by transient congestion. 504 if (GetError() == ENOBUFS) { 505 SetError(EWOULDBLOCK); 506 } 507#endif 508} 509 510int PhysicalSocket::TranslateOption(Option opt, int* slevel, int* sopt) { 511 switch (opt) { 512 case OPT_DONTFRAGMENT: 513#if defined(WEBRTC_WIN) 514 *slevel = IPPROTO_IP; 515 *sopt = IP_DONTFRAGMENT; 516 break; 517#elif defined(WEBRTC_MAC) || defined(BSD) || defined(__native_client__) 518 LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported."; 519 return -1; 520#elif defined(WEBRTC_POSIX) 521 *slevel = IPPROTO_IP; 522 *sopt = IP_MTU_DISCOVER; 523 break; 524#endif 525 case OPT_RCVBUF: 526 *slevel = SOL_SOCKET; 527 *sopt = SO_RCVBUF; 528 break; 529 case OPT_SNDBUF: 530 *slevel = SOL_SOCKET; 531 *sopt = SO_SNDBUF; 532 break; 533 case OPT_NODELAY: 534 *slevel = IPPROTO_TCP; 535 *sopt = TCP_NODELAY; 536 break; 537 case OPT_DSCP: 538 LOG(LS_WARNING) << "Socket::OPT_DSCP not supported."; 539 return -1; 540 case OPT_RTP_SENDTIME_EXTN_ID: 541 return -1; // No logging is necessary as this not a OS socket option. 542 default: 543 ASSERT(false); 544 return -1; 545 } 546 return 0; 547} 548 549SocketDispatcher::SocketDispatcher(PhysicalSocketServer *ss) 550#if defined(WEBRTC_WIN) 551 : PhysicalSocket(ss), id_(0), signal_close_(false) 552#else 553 : PhysicalSocket(ss) 554#endif 555{ 556} 557 558SocketDispatcher::SocketDispatcher(SOCKET s, PhysicalSocketServer *ss) 559#if defined(WEBRTC_WIN) 560 : PhysicalSocket(ss, s), id_(0), signal_close_(false) 561#else 562 : PhysicalSocket(ss, s) 563#endif 564{ 565} 566 567SocketDispatcher::~SocketDispatcher() { 568 Close(); 569} 570 571bool SocketDispatcher::Initialize() { 572 ASSERT(s_ != INVALID_SOCKET); 573 // Must be a non-blocking 574#if defined(WEBRTC_WIN) 575 u_long argp = 1; 576 ioctlsocket(s_, FIONBIO, &argp); 577#elif defined(WEBRTC_POSIX) 578 fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK); 579#endif 580 ss_->Add(this); 581 return true; 582} 583 584bool SocketDispatcher::Create(int type) { 585 return Create(AF_INET, type); 586} 587 588bool SocketDispatcher::Create(int family, int type) { 589 // Change the socket to be non-blocking. 590 if (!PhysicalSocket::Create(family, type)) 591 return false; 592 593 if (!Initialize()) 594 return false; 595 596#if defined(WEBRTC_WIN) 597 do { id_ = ++next_id_; } while (id_ == 0); 598#endif 599 return true; 600} 601 602#if defined(WEBRTC_WIN) 603 604WSAEVENT SocketDispatcher::GetWSAEvent() { 605 return WSA_INVALID_EVENT; 606} 607 608SOCKET SocketDispatcher::GetSocket() { 609 return s_; 610} 611 612bool SocketDispatcher::CheckSignalClose() { 613 if (!signal_close_) 614 return false; 615 616 char ch; 617 if (recv(s_, &ch, 1, MSG_PEEK) > 0) 618 return false; 619 620 state_ = CS_CLOSED; 621 signal_close_ = false; 622 SignalCloseEvent(this, signal_err_); 623 return true; 624} 625 626int SocketDispatcher::next_id_ = 0; 627 628#elif defined(WEBRTC_POSIX) 629 630int SocketDispatcher::GetDescriptor() { 631 return s_; 632} 633 634bool SocketDispatcher::IsDescriptorClosed() { 635 // We don't have a reliable way of distinguishing end-of-stream 636 // from readability. So test on each readable call. Is this 637 // inefficient? Probably. 638 char ch; 639 ssize_t res = ::recv(s_, &ch, 1, MSG_PEEK); 640 if (res > 0) { 641 // Data available, so not closed. 642 return false; 643 } else if (res == 0) { 644 // EOF, so closed. 645 return true; 646 } else { // error 647 switch (errno) { 648 // Returned if we've already closed s_. 649 case EBADF: 650 // Returned during ungraceful peer shutdown. 651 case ECONNRESET: 652 return true; 653 default: 654 // Assume that all other errors are just blocking errors, meaning the 655 // connection is still good but we just can't read from it right now. 656 // This should only happen when connecting (and at most once), because 657 // in all other cases this function is only called if the file 658 // descriptor is already known to be in the readable state. However, 659 // it's not necessary a problem if we spuriously interpret a 660 // "connection lost"-type error as a blocking error, because typically 661 // the next recv() will get EOF, so we'll still eventually notice that 662 // the socket is closed. 663 LOG_ERR(LS_WARNING) << "Assuming benign blocking error"; 664 return false; 665 } 666 } 667} 668 669#endif // WEBRTC_POSIX 670 671uint32_t SocketDispatcher::GetRequestedEvents() { 672 return enabled_events_; 673} 674 675void SocketDispatcher::OnPreEvent(uint32_t ff) { 676 if ((ff & DE_CONNECT) != 0) 677 state_ = CS_CONNECTED; 678 679#if defined(WEBRTC_WIN) 680 // We set CS_CLOSED from CheckSignalClose. 681#elif defined(WEBRTC_POSIX) 682 if ((ff & DE_CLOSE) != 0) 683 state_ = CS_CLOSED; 684#endif 685} 686 687#if defined(WEBRTC_WIN) 688 689void SocketDispatcher::OnEvent(uint32_t ff, int err) { 690 int cache_id = id_; 691 // Make sure we deliver connect/accept first. Otherwise, consumers may see 692 // something like a READ followed by a CONNECT, which would be odd. 693 if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) { 694 if (ff != DE_CONNECT) 695 LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff; 696 enabled_events_ &= ~DE_CONNECT; 697#if !defined(NDEBUG) 698 dbg_addr_ = "Connected @ "; 699 dbg_addr_.append(GetRemoteAddress().ToString()); 700#endif 701 SignalConnectEvent(this); 702 } 703 if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) { 704 enabled_events_ &= ~DE_ACCEPT; 705 SignalReadEvent(this); 706 } 707 if ((ff & DE_READ) != 0) { 708 enabled_events_ &= ~DE_READ; 709 SignalReadEvent(this); 710 } 711 if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) { 712 enabled_events_ &= ~DE_WRITE; 713 SignalWriteEvent(this); 714 } 715 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) { 716 signal_close_ = true; 717 signal_err_ = err; 718 } 719} 720 721#elif defined(WEBRTC_POSIX) 722 723void SocketDispatcher::OnEvent(uint32_t ff, int err) { 724 // Make sure we deliver connect/accept first. Otherwise, consumers may see 725 // something like a READ followed by a CONNECT, which would be odd. 726 if ((ff & DE_CONNECT) != 0) { 727 enabled_events_ &= ~DE_CONNECT; 728 SignalConnectEvent(this); 729 } 730 if ((ff & DE_ACCEPT) != 0) { 731 enabled_events_ &= ~DE_ACCEPT; 732 SignalReadEvent(this); 733 } 734 if ((ff & DE_READ) != 0) { 735 enabled_events_ &= ~DE_READ; 736 SignalReadEvent(this); 737 } 738 if ((ff & DE_WRITE) != 0) { 739 enabled_events_ &= ~DE_WRITE; 740 SignalWriteEvent(this); 741 } 742 if ((ff & DE_CLOSE) != 0) { 743 // The socket is now dead to us, so stop checking it. 744 enabled_events_ = 0; 745 SignalCloseEvent(this, err); 746 } 747} 748 749#endif // WEBRTC_POSIX 750 751int SocketDispatcher::Close() { 752 if (s_ == INVALID_SOCKET) 753 return 0; 754 755#if defined(WEBRTC_WIN) 756 id_ = 0; 757 signal_close_ = false; 758#endif 759 ss_->Remove(this); 760 return PhysicalSocket::Close(); 761} 762 763#if defined(WEBRTC_POSIX) 764class EventDispatcher : public Dispatcher { 765 public: 766 EventDispatcher(PhysicalSocketServer* ss) : ss_(ss), fSignaled_(false) { 767 if (pipe(afd_) < 0) 768 LOG(LERROR) << "pipe failed"; 769 ss_->Add(this); 770 } 771 772 ~EventDispatcher() override { 773 ss_->Remove(this); 774 close(afd_[0]); 775 close(afd_[1]); 776 } 777 778 virtual void Signal() { 779 CritScope cs(&crit_); 780 if (!fSignaled_) { 781 const uint8_t b[1] = {0}; 782 if (VERIFY(1 == write(afd_[1], b, sizeof(b)))) { 783 fSignaled_ = true; 784 } 785 } 786 } 787 788 uint32_t GetRequestedEvents() override { return DE_READ; } 789 790 void OnPreEvent(uint32_t ff) override { 791 // It is not possible to perfectly emulate an auto-resetting event with 792 // pipes. This simulates it by resetting before the event is handled. 793 794 CritScope cs(&crit_); 795 if (fSignaled_) { 796 uint8_t b[4]; // Allow for reading more than 1 byte, but expect 1. 797 VERIFY(1 == read(afd_[0], b, sizeof(b))); 798 fSignaled_ = false; 799 } 800 } 801 802 void OnEvent(uint32_t ff, int err) override { ASSERT(false); } 803 804 int GetDescriptor() override { return afd_[0]; } 805 806 bool IsDescriptorClosed() override { return false; } 807 808 private: 809 PhysicalSocketServer *ss_; 810 int afd_[2]; 811 bool fSignaled_; 812 CriticalSection crit_; 813}; 814 815// These two classes use the self-pipe trick to deliver POSIX signals to our 816// select loop. This is the only safe, reliable, cross-platform way to do 817// non-trivial things with a POSIX signal in an event-driven program (until 818// proper pselect() implementations become ubiquitous). 819 820class PosixSignalHandler { 821 public: 822 // POSIX only specifies 32 signals, but in principle the system might have 823 // more and the programmer might choose to use them, so we size our array 824 // for 128. 825 static const int kNumPosixSignals = 128; 826 827 // There is just a single global instance. (Signal handlers do not get any 828 // sort of user-defined void * parameter, so they can't access anything that 829 // isn't global.) 830 static PosixSignalHandler* Instance() { 831 RTC_DEFINE_STATIC_LOCAL(PosixSignalHandler, instance, ()); 832 return &instance; 833 } 834 835 // Returns true if the given signal number is set. 836 bool IsSignalSet(int signum) const { 837 ASSERT(signum < static_cast<int>(arraysize(received_signal_))); 838 if (signum < static_cast<int>(arraysize(received_signal_))) { 839 return received_signal_[signum]; 840 } else { 841 return false; 842 } 843 } 844 845 // Clears the given signal number. 846 void ClearSignal(int signum) { 847 ASSERT(signum < static_cast<int>(arraysize(received_signal_))); 848 if (signum < static_cast<int>(arraysize(received_signal_))) { 849 received_signal_[signum] = false; 850 } 851 } 852 853 // Returns the file descriptor to monitor for signal events. 854 int GetDescriptor() const { 855 return afd_[0]; 856 } 857 858 // This is called directly from our real signal handler, so it must be 859 // signal-handler-safe. That means it cannot assume anything about the 860 // user-level state of the process, since the handler could be executed at any 861 // time on any thread. 862 void OnPosixSignalReceived(int signum) { 863 if (signum >= static_cast<int>(arraysize(received_signal_))) { 864 // We don't have space in our array for this. 865 return; 866 } 867 // Set a flag saying we've seen this signal. 868 received_signal_[signum] = true; 869 // Notify application code that we got a signal. 870 const uint8_t b[1] = {0}; 871 if (-1 == write(afd_[1], b, sizeof(b))) { 872 // Nothing we can do here. If there's an error somehow then there's 873 // nothing we can safely do from a signal handler. 874 // No, we can't even safely log it. 875 // But, we still have to check the return value here. Otherwise, 876 // GCC 4.4.1 complains ignoring return value. Even (void) doesn't help. 877 return; 878 } 879 } 880 881 private: 882 PosixSignalHandler() { 883 if (pipe(afd_) < 0) { 884 LOG_ERR(LS_ERROR) << "pipe failed"; 885 return; 886 } 887 if (fcntl(afd_[0], F_SETFL, O_NONBLOCK) < 0) { 888 LOG_ERR(LS_WARNING) << "fcntl #1 failed"; 889 } 890 if (fcntl(afd_[1], F_SETFL, O_NONBLOCK) < 0) { 891 LOG_ERR(LS_WARNING) << "fcntl #2 failed"; 892 } 893 memset(const_cast<void *>(static_cast<volatile void *>(received_signal_)), 894 0, 895 sizeof(received_signal_)); 896 } 897 898 ~PosixSignalHandler() { 899 int fd1 = afd_[0]; 900 int fd2 = afd_[1]; 901 // We clobber the stored file descriptor numbers here or else in principle 902 // a signal that happens to be delivered during application termination 903 // could erroneously write a zero byte to an unrelated file handle in 904 // OnPosixSignalReceived() if some other file happens to be opened later 905 // during shutdown and happens to be given the same file descriptor number 906 // as our pipe had. Unfortunately even with this precaution there is still a 907 // race where that could occur if said signal happens to be handled 908 // concurrently with this code and happens to have already read the value of 909 // afd_[1] from memory before we clobber it, but that's unlikely. 910 afd_[0] = -1; 911 afd_[1] = -1; 912 close(fd1); 913 close(fd2); 914 } 915 916 int afd_[2]; 917 // These are boolean flags that will be set in our signal handler and read 918 // and cleared from Wait(). There is a race involved in this, but it is 919 // benign. The signal handler sets the flag before signaling the pipe, so 920 // we'll never end up blocking in select() while a flag is still true. 921 // However, if two of the same signal arrive close to each other then it's 922 // possible that the second time the handler may set the flag while it's still 923 // true, meaning that signal will be missed. But the first occurrence of it 924 // will still be handled, so this isn't a problem. 925 // Volatile is not necessary here for correctness, but this data _is_ volatile 926 // so I've marked it as such. 927 volatile uint8_t received_signal_[kNumPosixSignals]; 928}; 929 930class PosixSignalDispatcher : public Dispatcher { 931 public: 932 PosixSignalDispatcher(PhysicalSocketServer *owner) : owner_(owner) { 933 owner_->Add(this); 934 } 935 936 ~PosixSignalDispatcher() override { 937 owner_->Remove(this); 938 } 939 940 uint32_t GetRequestedEvents() override { return DE_READ; } 941 942 void OnPreEvent(uint32_t ff) override { 943 // Events might get grouped if signals come very fast, so we read out up to 944 // 16 bytes to make sure we keep the pipe empty. 945 uint8_t b[16]; 946 ssize_t ret = read(GetDescriptor(), b, sizeof(b)); 947 if (ret < 0) { 948 LOG_ERR(LS_WARNING) << "Error in read()"; 949 } else if (ret == 0) { 950 LOG(LS_WARNING) << "Should have read at least one byte"; 951 } 952 } 953 954 void OnEvent(uint32_t ff, int err) override { 955 for (int signum = 0; signum < PosixSignalHandler::kNumPosixSignals; 956 ++signum) { 957 if (PosixSignalHandler::Instance()->IsSignalSet(signum)) { 958 PosixSignalHandler::Instance()->ClearSignal(signum); 959 HandlerMap::iterator i = handlers_.find(signum); 960 if (i == handlers_.end()) { 961 // This can happen if a signal is delivered to our process at around 962 // the same time as we unset our handler for it. It is not an error 963 // condition, but it's unusual enough to be worth logging. 964 LOG(LS_INFO) << "Received signal with no handler: " << signum; 965 } else { 966 // Otherwise, execute our handler. 967 (*i->second)(signum); 968 } 969 } 970 } 971 } 972 973 int GetDescriptor() override { 974 return PosixSignalHandler::Instance()->GetDescriptor(); 975 } 976 977 bool IsDescriptorClosed() override { return false; } 978 979 void SetHandler(int signum, void (*handler)(int)) { 980 handlers_[signum] = handler; 981 } 982 983 void ClearHandler(int signum) { 984 handlers_.erase(signum); 985 } 986 987 bool HasHandlers() { 988 return !handlers_.empty(); 989 } 990 991 private: 992 typedef std::map<int, void (*)(int)> HandlerMap; 993 994 HandlerMap handlers_; 995 // Our owner. 996 PhysicalSocketServer *owner_; 997}; 998 999class FileDispatcher: public Dispatcher, public AsyncFile { 1000 public: 1001 FileDispatcher(int fd, PhysicalSocketServer *ss) : ss_(ss), fd_(fd) { 1002 set_readable(true); 1003 1004 ss_->Add(this); 1005 1006 fcntl(fd_, F_SETFL, fcntl(fd_, F_GETFL, 0) | O_NONBLOCK); 1007 } 1008 1009 ~FileDispatcher() override { 1010 ss_->Remove(this); 1011 } 1012 1013 SocketServer* socketserver() { return ss_; } 1014 1015 int GetDescriptor() override { return fd_; } 1016 1017 bool IsDescriptorClosed() override { return false; } 1018 1019 uint32_t GetRequestedEvents() override { return flags_; } 1020 1021 void OnPreEvent(uint32_t ff) override {} 1022 1023 void OnEvent(uint32_t ff, int err) override { 1024 if ((ff & DE_READ) != 0) 1025 SignalReadEvent(this); 1026 if ((ff & DE_WRITE) != 0) 1027 SignalWriteEvent(this); 1028 if ((ff & DE_CLOSE) != 0) 1029 SignalCloseEvent(this, err); 1030 } 1031 1032 bool readable() override { return (flags_ & DE_READ) != 0; } 1033 1034 void set_readable(bool value) override { 1035 flags_ = value ? (flags_ | DE_READ) : (flags_ & ~DE_READ); 1036 } 1037 1038 bool writable() override { return (flags_ & DE_WRITE) != 0; } 1039 1040 void set_writable(bool value) override { 1041 flags_ = value ? (flags_ | DE_WRITE) : (flags_ & ~DE_WRITE); 1042 } 1043 1044 private: 1045 PhysicalSocketServer* ss_; 1046 int fd_; 1047 int flags_; 1048}; 1049 1050AsyncFile* PhysicalSocketServer::CreateFile(int fd) { 1051 return new FileDispatcher(fd, this); 1052} 1053 1054#endif // WEBRTC_POSIX 1055 1056#if defined(WEBRTC_WIN) 1057static uint32_t FlagsToEvents(uint32_t events) { 1058 uint32_t ffFD = FD_CLOSE; 1059 if (events & DE_READ) 1060 ffFD |= FD_READ; 1061 if (events & DE_WRITE) 1062 ffFD |= FD_WRITE; 1063 if (events & DE_CONNECT) 1064 ffFD |= FD_CONNECT; 1065 if (events & DE_ACCEPT) 1066 ffFD |= FD_ACCEPT; 1067 return ffFD; 1068} 1069 1070class EventDispatcher : public Dispatcher { 1071 public: 1072 EventDispatcher(PhysicalSocketServer *ss) : ss_(ss) { 1073 hev_ = WSACreateEvent(); 1074 if (hev_) { 1075 ss_->Add(this); 1076 } 1077 } 1078 1079 ~EventDispatcher() { 1080 if (hev_ != NULL) { 1081 ss_->Remove(this); 1082 WSACloseEvent(hev_); 1083 hev_ = NULL; 1084 } 1085 } 1086 1087 virtual void Signal() { 1088 if (hev_ != NULL) 1089 WSASetEvent(hev_); 1090 } 1091 1092 virtual uint32_t GetRequestedEvents() { return 0; } 1093 1094 virtual void OnPreEvent(uint32_t ff) { WSAResetEvent(hev_); } 1095 1096 virtual void OnEvent(uint32_t ff, int err) {} 1097 1098 virtual WSAEVENT GetWSAEvent() { 1099 return hev_; 1100 } 1101 1102 virtual SOCKET GetSocket() { 1103 return INVALID_SOCKET; 1104 } 1105 1106 virtual bool CheckSignalClose() { return false; } 1107 1108private: 1109 PhysicalSocketServer* ss_; 1110 WSAEVENT hev_; 1111}; 1112#endif // WEBRTC_WIN 1113 1114// Sets the value of a boolean value to false when signaled. 1115class Signaler : public EventDispatcher { 1116 public: 1117 Signaler(PhysicalSocketServer* ss, bool* pf) 1118 : EventDispatcher(ss), pf_(pf) { 1119 } 1120 ~Signaler() override { } 1121 1122 void OnEvent(uint32_t ff, int err) override { 1123 if (pf_) 1124 *pf_ = false; 1125 } 1126 1127 private: 1128 bool *pf_; 1129}; 1130 1131PhysicalSocketServer::PhysicalSocketServer() 1132 : fWait_(false) { 1133 signal_wakeup_ = new Signaler(this, &fWait_); 1134#if defined(WEBRTC_WIN) 1135 socket_ev_ = WSACreateEvent(); 1136#endif 1137} 1138 1139PhysicalSocketServer::~PhysicalSocketServer() { 1140#if defined(WEBRTC_WIN) 1141 WSACloseEvent(socket_ev_); 1142#endif 1143#if defined(WEBRTC_POSIX) 1144 signal_dispatcher_.reset(); 1145#endif 1146 delete signal_wakeup_; 1147 ASSERT(dispatchers_.empty()); 1148} 1149 1150void PhysicalSocketServer::WakeUp() { 1151 signal_wakeup_->Signal(); 1152} 1153 1154Socket* PhysicalSocketServer::CreateSocket(int type) { 1155 return CreateSocket(AF_INET, type); 1156} 1157 1158Socket* PhysicalSocketServer::CreateSocket(int family, int type) { 1159 PhysicalSocket* socket = new PhysicalSocket(this); 1160 if (socket->Create(family, type)) { 1161 return socket; 1162 } else { 1163 delete socket; 1164 return nullptr; 1165 } 1166} 1167 1168AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int type) { 1169 return CreateAsyncSocket(AF_INET, type); 1170} 1171 1172AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int family, int type) { 1173 SocketDispatcher* dispatcher = new SocketDispatcher(this); 1174 if (dispatcher->Create(family, type)) { 1175 return dispatcher; 1176 } else { 1177 delete dispatcher; 1178 return nullptr; 1179 } 1180} 1181 1182AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) { 1183 SocketDispatcher* dispatcher = new SocketDispatcher(s, this); 1184 if (dispatcher->Initialize()) { 1185 return dispatcher; 1186 } else { 1187 delete dispatcher; 1188 return nullptr; 1189 } 1190} 1191 1192void PhysicalSocketServer::Add(Dispatcher *pdispatcher) { 1193 CritScope cs(&crit_); 1194 // Prevent duplicates. This can cause dead dispatchers to stick around. 1195 DispatcherList::iterator pos = std::find(dispatchers_.begin(), 1196 dispatchers_.end(), 1197 pdispatcher); 1198 if (pos != dispatchers_.end()) 1199 return; 1200 dispatchers_.push_back(pdispatcher); 1201} 1202 1203void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) { 1204 CritScope cs(&crit_); 1205 DispatcherList::iterator pos = std::find(dispatchers_.begin(), 1206 dispatchers_.end(), 1207 pdispatcher); 1208 // We silently ignore duplicate calls to Add, so we should silently ignore 1209 // the (expected) symmetric calls to Remove. Note that this may still hide 1210 // a real issue, so we at least log a warning about it. 1211 if (pos == dispatchers_.end()) { 1212 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown " 1213 << "dispatcher, potentially from a duplicate call to Add."; 1214 return; 1215 } 1216 size_t index = pos - dispatchers_.begin(); 1217 dispatchers_.erase(pos); 1218 for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end(); 1219 ++it) { 1220 if (index < **it) { 1221 --**it; 1222 } 1223 } 1224} 1225 1226#if defined(WEBRTC_POSIX) 1227bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { 1228 // Calculate timing information 1229 1230 struct timeval *ptvWait = NULL; 1231 struct timeval tvWait; 1232 struct timeval tvStop; 1233 if (cmsWait != kForever) { 1234 // Calculate wait timeval 1235 tvWait.tv_sec = cmsWait / 1000; 1236 tvWait.tv_usec = (cmsWait % 1000) * 1000; 1237 ptvWait = &tvWait; 1238 1239 // Calculate when to return in a timeval 1240 gettimeofday(&tvStop, NULL); 1241 tvStop.tv_sec += tvWait.tv_sec; 1242 tvStop.tv_usec += tvWait.tv_usec; 1243 if (tvStop.tv_usec >= 1000000) { 1244 tvStop.tv_usec -= 1000000; 1245 tvStop.tv_sec += 1; 1246 } 1247 } 1248 1249 // Zero all fd_sets. Don't need to do this inside the loop since 1250 // select() zeros the descriptors not signaled 1251 1252 fd_set fdsRead; 1253 FD_ZERO(&fdsRead); 1254 fd_set fdsWrite; 1255 FD_ZERO(&fdsWrite); 1256 // Explicitly unpoison these FDs on MemorySanitizer which doesn't handle the 1257 // inline assembly in FD_ZERO. 1258 // http://crbug.com/344505 1259#ifdef MEMORY_SANITIZER 1260 __msan_unpoison(&fdsRead, sizeof(fdsRead)); 1261 __msan_unpoison(&fdsWrite, sizeof(fdsWrite)); 1262#endif 1263 1264 fWait_ = true; 1265 1266 while (fWait_) { 1267 int fdmax = -1; 1268 { 1269 CritScope cr(&crit_); 1270 for (size_t i = 0; i < dispatchers_.size(); ++i) { 1271 // Query dispatchers for read and write wait state 1272 Dispatcher *pdispatcher = dispatchers_[i]; 1273 ASSERT(pdispatcher); 1274 if (!process_io && (pdispatcher != signal_wakeup_)) 1275 continue; 1276 int fd = pdispatcher->GetDescriptor(); 1277 if (fd > fdmax) 1278 fdmax = fd; 1279 1280 uint32_t ff = pdispatcher->GetRequestedEvents(); 1281 if (ff & (DE_READ | DE_ACCEPT)) 1282 FD_SET(fd, &fdsRead); 1283 if (ff & (DE_WRITE | DE_CONNECT)) 1284 FD_SET(fd, &fdsWrite); 1285 } 1286 } 1287 1288 // Wait then call handlers as appropriate 1289 // < 0 means error 1290 // 0 means timeout 1291 // > 0 means count of descriptors ready 1292 int n = select(fdmax + 1, &fdsRead, &fdsWrite, NULL, ptvWait); 1293 1294 // If error, return error. 1295 if (n < 0) { 1296 if (errno != EINTR) { 1297 LOG_E(LS_ERROR, EN, errno) << "select"; 1298 return false; 1299 } 1300 // Else ignore the error and keep going. If this EINTR was for one of the 1301 // signals managed by this PhysicalSocketServer, the 1302 // PosixSignalDeliveryDispatcher will be in the signaled state in the next 1303 // iteration. 1304 } else if (n == 0) { 1305 // If timeout, return success 1306 return true; 1307 } else { 1308 // We have signaled descriptors 1309 CritScope cr(&crit_); 1310 for (size_t i = 0; i < dispatchers_.size(); ++i) { 1311 Dispatcher *pdispatcher = dispatchers_[i]; 1312 int fd = pdispatcher->GetDescriptor(); 1313 uint32_t ff = 0; 1314 int errcode = 0; 1315 1316 // Reap any error code, which can be signaled through reads or writes. 1317 // TODO(pthatcher): Should we set errcode if getsockopt fails? 1318 if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) { 1319 socklen_t len = sizeof(errcode); 1320 ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len); 1321 } 1322 1323 // Check readable descriptors. If we're waiting on an accept, signal 1324 // that. Otherwise we're waiting for data, check to see if we're 1325 // readable or really closed. 1326 // TODO(pthatcher): Only peek at TCP descriptors. 1327 if (FD_ISSET(fd, &fdsRead)) { 1328 FD_CLR(fd, &fdsRead); 1329 if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) { 1330 ff |= DE_ACCEPT; 1331 } else if (errcode || pdispatcher->IsDescriptorClosed()) { 1332 ff |= DE_CLOSE; 1333 } else { 1334 ff |= DE_READ; 1335 } 1336 } 1337 1338 // Check writable descriptors. If we're waiting on a connect, detect 1339 // success versus failure by the reaped error code. 1340 if (FD_ISSET(fd, &fdsWrite)) { 1341 FD_CLR(fd, &fdsWrite); 1342 if (pdispatcher->GetRequestedEvents() & DE_CONNECT) { 1343 if (!errcode) { 1344 ff |= DE_CONNECT; 1345 } else { 1346 ff |= DE_CLOSE; 1347 } 1348 } else { 1349 ff |= DE_WRITE; 1350 } 1351 } 1352 1353 // Tell the descriptor about the event. 1354 if (ff != 0) { 1355 pdispatcher->OnPreEvent(ff); 1356 pdispatcher->OnEvent(ff, errcode); 1357 } 1358 } 1359 } 1360 1361 // Recalc the time remaining to wait. Doing it here means it doesn't get 1362 // calced twice the first time through the loop 1363 if (ptvWait) { 1364 ptvWait->tv_sec = 0; 1365 ptvWait->tv_usec = 0; 1366 struct timeval tvT; 1367 gettimeofday(&tvT, NULL); 1368 if ((tvStop.tv_sec > tvT.tv_sec) 1369 || ((tvStop.tv_sec == tvT.tv_sec) 1370 && (tvStop.tv_usec > tvT.tv_usec))) { 1371 ptvWait->tv_sec = tvStop.tv_sec - tvT.tv_sec; 1372 ptvWait->tv_usec = tvStop.tv_usec - tvT.tv_usec; 1373 if (ptvWait->tv_usec < 0) { 1374 ASSERT(ptvWait->tv_sec > 0); 1375 ptvWait->tv_usec += 1000000; 1376 ptvWait->tv_sec -= 1; 1377 } 1378 } 1379 } 1380 } 1381 1382 return true; 1383} 1384 1385static void GlobalSignalHandler(int signum) { 1386 PosixSignalHandler::Instance()->OnPosixSignalReceived(signum); 1387} 1388 1389bool PhysicalSocketServer::SetPosixSignalHandler(int signum, 1390 void (*handler)(int)) { 1391 // If handler is SIG_IGN or SIG_DFL then clear our user-level handler, 1392 // otherwise set one. 1393 if (handler == SIG_IGN || handler == SIG_DFL) { 1394 if (!InstallSignal(signum, handler)) { 1395 return false; 1396 } 1397 if (signal_dispatcher_) { 1398 signal_dispatcher_->ClearHandler(signum); 1399 if (!signal_dispatcher_->HasHandlers()) { 1400 signal_dispatcher_.reset(); 1401 } 1402 } 1403 } else { 1404 if (!signal_dispatcher_) { 1405 signal_dispatcher_.reset(new PosixSignalDispatcher(this)); 1406 } 1407 signal_dispatcher_->SetHandler(signum, handler); 1408 if (!InstallSignal(signum, &GlobalSignalHandler)) { 1409 return false; 1410 } 1411 } 1412 return true; 1413} 1414 1415Dispatcher* PhysicalSocketServer::signal_dispatcher() { 1416 return signal_dispatcher_.get(); 1417} 1418 1419bool PhysicalSocketServer::InstallSignal(int signum, void (*handler)(int)) { 1420 struct sigaction act; 1421 // It doesn't really matter what we set this mask to. 1422 if (sigemptyset(&act.sa_mask) != 0) { 1423 LOG_ERR(LS_ERROR) << "Couldn't set mask"; 1424 return false; 1425 } 1426 act.sa_handler = handler; 1427#if !defined(__native_client__) 1428 // Use SA_RESTART so that our syscalls don't get EINTR, since we don't need it 1429 // and it's a nuisance. Though some syscalls still return EINTR and there's no 1430 // real standard for which ones. :( 1431 act.sa_flags = SA_RESTART; 1432#else 1433 act.sa_flags = 0; 1434#endif 1435 if (sigaction(signum, &act, NULL) != 0) { 1436 LOG_ERR(LS_ERROR) << "Couldn't set sigaction"; 1437 return false; 1438 } 1439 return true; 1440} 1441#endif // WEBRTC_POSIX 1442 1443#if defined(WEBRTC_WIN) 1444bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { 1445 int cmsTotal = cmsWait; 1446 int cmsElapsed = 0; 1447 uint32_t msStart = Time(); 1448 1449 fWait_ = true; 1450 while (fWait_) { 1451 std::vector<WSAEVENT> events; 1452 std::vector<Dispatcher *> event_owners; 1453 1454 events.push_back(socket_ev_); 1455 1456 { 1457 CritScope cr(&crit_); 1458 size_t i = 0; 1459 iterators_.push_back(&i); 1460 // Don't track dispatchers_.size(), because we want to pick up any new 1461 // dispatchers that were added while processing the loop. 1462 while (i < dispatchers_.size()) { 1463 Dispatcher* disp = dispatchers_[i++]; 1464 if (!process_io && (disp != signal_wakeup_)) 1465 continue; 1466 SOCKET s = disp->GetSocket(); 1467 if (disp->CheckSignalClose()) { 1468 // We just signalled close, don't poll this socket 1469 } else if (s != INVALID_SOCKET) { 1470 WSAEventSelect(s, 1471 events[0], 1472 FlagsToEvents(disp->GetRequestedEvents())); 1473 } else { 1474 events.push_back(disp->GetWSAEvent()); 1475 event_owners.push_back(disp); 1476 } 1477 } 1478 ASSERT(iterators_.back() == &i); 1479 iterators_.pop_back(); 1480 } 1481 1482 // Which is shorter, the delay wait or the asked wait? 1483 1484 int cmsNext; 1485 if (cmsWait == kForever) { 1486 cmsNext = cmsWait; 1487 } else { 1488 cmsNext = std::max(0, cmsTotal - cmsElapsed); 1489 } 1490 1491 // Wait for one of the events to signal 1492 DWORD dw = WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()), 1493 &events[0], 1494 false, 1495 cmsNext, 1496 false); 1497 1498 if (dw == WSA_WAIT_FAILED) { 1499 // Failed? 1500 // TODO(pthatcher): need a better strategy than this! 1501 WSAGetLastError(); 1502 ASSERT(false); 1503 return false; 1504 } else if (dw == WSA_WAIT_TIMEOUT) { 1505 // Timeout? 1506 return true; 1507 } else { 1508 // Figure out which one it is and call it 1509 CritScope cr(&crit_); 1510 int index = dw - WSA_WAIT_EVENT_0; 1511 if (index > 0) { 1512 --index; // The first event is the socket event 1513 event_owners[index]->OnPreEvent(0); 1514 event_owners[index]->OnEvent(0, 0); 1515 } else if (process_io) { 1516 size_t i = 0, end = dispatchers_.size(); 1517 iterators_.push_back(&i); 1518 iterators_.push_back(&end); // Don't iterate over new dispatchers. 1519 while (i < end) { 1520 Dispatcher* disp = dispatchers_[i++]; 1521 SOCKET s = disp->GetSocket(); 1522 if (s == INVALID_SOCKET) 1523 continue; 1524 1525 WSANETWORKEVENTS wsaEvents; 1526 int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents); 1527 if (err == 0) { 1528 1529#if LOGGING 1530 { 1531 if ((wsaEvents.lNetworkEvents & FD_READ) && 1532 wsaEvents.iErrorCode[FD_READ_BIT] != 0) { 1533 LOG(WARNING) << "PhysicalSocketServer got FD_READ_BIT error " 1534 << wsaEvents.iErrorCode[FD_READ_BIT]; 1535 } 1536 if ((wsaEvents.lNetworkEvents & FD_WRITE) && 1537 wsaEvents.iErrorCode[FD_WRITE_BIT] != 0) { 1538 LOG(WARNING) << "PhysicalSocketServer got FD_WRITE_BIT error " 1539 << wsaEvents.iErrorCode[FD_WRITE_BIT]; 1540 } 1541 if ((wsaEvents.lNetworkEvents & FD_CONNECT) && 1542 wsaEvents.iErrorCode[FD_CONNECT_BIT] != 0) { 1543 LOG(WARNING) << "PhysicalSocketServer got FD_CONNECT_BIT error " 1544 << wsaEvents.iErrorCode[FD_CONNECT_BIT]; 1545 } 1546 if ((wsaEvents.lNetworkEvents & FD_ACCEPT) && 1547 wsaEvents.iErrorCode[FD_ACCEPT_BIT] != 0) { 1548 LOG(WARNING) << "PhysicalSocketServer got FD_ACCEPT_BIT error " 1549 << wsaEvents.iErrorCode[FD_ACCEPT_BIT]; 1550 } 1551 if ((wsaEvents.lNetworkEvents & FD_CLOSE) && 1552 wsaEvents.iErrorCode[FD_CLOSE_BIT] != 0) { 1553 LOG(WARNING) << "PhysicalSocketServer got FD_CLOSE_BIT error " 1554 << wsaEvents.iErrorCode[FD_CLOSE_BIT]; 1555 } 1556 } 1557#endif 1558 uint32_t ff = 0; 1559 int errcode = 0; 1560 if (wsaEvents.lNetworkEvents & FD_READ) 1561 ff |= DE_READ; 1562 if (wsaEvents.lNetworkEvents & FD_WRITE) 1563 ff |= DE_WRITE; 1564 if (wsaEvents.lNetworkEvents & FD_CONNECT) { 1565 if (wsaEvents.iErrorCode[FD_CONNECT_BIT] == 0) { 1566 ff |= DE_CONNECT; 1567 } else { 1568 ff |= DE_CLOSE; 1569 errcode = wsaEvents.iErrorCode[FD_CONNECT_BIT]; 1570 } 1571 } 1572 if (wsaEvents.lNetworkEvents & FD_ACCEPT) 1573 ff |= DE_ACCEPT; 1574 if (wsaEvents.lNetworkEvents & FD_CLOSE) { 1575 ff |= DE_CLOSE; 1576 errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT]; 1577 } 1578 if (ff != 0) { 1579 disp->OnPreEvent(ff); 1580 disp->OnEvent(ff, errcode); 1581 } 1582 } 1583 } 1584 ASSERT(iterators_.back() == &end); 1585 iterators_.pop_back(); 1586 ASSERT(iterators_.back() == &i); 1587 iterators_.pop_back(); 1588 } 1589 1590 // Reset the network event until new activity occurs 1591 WSAResetEvent(socket_ev_); 1592 } 1593 1594 // Break? 1595 if (!fWait_) 1596 break; 1597 cmsElapsed = TimeSince(msStart); 1598 if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) { 1599 break; 1600 } 1601 } 1602 1603 // Done 1604 return true; 1605} 1606#endif // WEBRTC_WIN 1607 1608} // namespace rtc 1609