1/* 2 * libjingle 3 * Copyright 2004--2005, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28#if defined(_MSC_VER) && _MSC_VER < 1300 29#pragma warning(disable:4786) 30#endif 31 32#include <cassert> 33 34#ifdef POSIX 35#include <string.h> 36#include <errno.h> 37#include <fcntl.h> 38#include <sys/time.h> 39#include <unistd.h> 40#include <signal.h> 41#endif 42 43#ifdef WIN32 44#define WIN32_LEAN_AND_MEAN 45#include <windows.h> 46#include <winsock2.h> 47#include <ws2tcpip.h> 48#undef SetPort 49#endif 50 51#include <algorithm> 52#include <map> 53 54#include "talk/base/basictypes.h" 55#include "talk/base/byteorder.h" 56#include "talk/base/common.h" 57#include "talk/base/logging.h" 58#include "talk/base/nethelpers.h" 59#include "talk/base/physicalsocketserver.h" 60#include "talk/base/timeutils.h" 61#include "talk/base/winping.h" 62#include "talk/base/win32socketinit.h" 63 64// stm: this will tell us if we are on OSX 65#ifdef HAVE_CONFIG_H 66#include "config.h" 67#endif 68 69#ifdef POSIX 70#include <netinet/tcp.h> // for TCP_NODELAY 71#define IP_MTU 14 // Until this is integrated from linux/in.h to netinet/in.h 72typedef void* SockOptArg; 73#endif // POSIX 74 75#ifdef WIN32 76typedef char* SockOptArg; 77#endif 78 79namespace talk_base { 80 81// Standard MTUs, from RFC 1191 82const uint16 PACKET_MAXIMUMS[] = { 83 65535, // Theoretical maximum, Hyperchannel 84 32000, // Nothing 85 17914, // 16Mb IBM Token Ring 86 8166, // IEEE 802.4 87 //4464, // IEEE 802.5 (4Mb max) 88 4352, // FDDI 89 //2048, // Wideband Network 90 2002, // IEEE 802.5 (4Mb recommended) 91 //1536, // Expermental Ethernet Networks 92 //1500, // Ethernet, Point-to-Point (default) 93 1492, // IEEE 802.3 94 1006, // SLIP, ARPANET 95 //576, // X.25 Networks 96 //544, // DEC IP Portal 97 //512, // NETBIOS 98 508, // IEEE 802/Source-Rt Bridge, ARCNET 99 296, // Point-to-Point (low delay) 100 68, // Official minimum 101 0, // End of list marker 102}; 103 104static const int IP_HEADER_SIZE = 20u; 105static const int IPV6_HEADER_SIZE = 40u; 106static const int ICMP_HEADER_SIZE = 8u; 107static const int ICMP_PING_TIMEOUT_MILLIS = 10000u; 108 109class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> { 110 public: 111 PhysicalSocket(PhysicalSocketServer* ss, SOCKET s = INVALID_SOCKET) 112 : ss_(ss), s_(s), enabled_events_(0), error_(0), 113 state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED), 114 resolver_(NULL) { 115#ifdef WIN32 116 // EnsureWinsockInit() ensures that winsock is initialized. The default 117 // version of this function doesn't do anything because winsock is 118 // initialized by constructor of a static object. If neccessary libjingle 119 // users can link it with a different version of this function by replacing 120 // win32socketinit.cc. See win32socketinit.cc for more details. 121 EnsureWinsockInit(); 122#endif 123 if (s_ != INVALID_SOCKET) { 124 enabled_events_ = DE_READ | DE_WRITE; 125 126 int type = SOCK_STREAM; 127 socklen_t len = sizeof(type); 128 VERIFY(0 == getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len)); 129 udp_ = (SOCK_DGRAM == type); 130 } 131 } 132 133 virtual ~PhysicalSocket() { 134 Close(); 135 } 136 137 // Creates the underlying OS socket (same as the "socket" function). 138 virtual bool Create(int family, int type) { 139 Close(); 140 s_ = ::socket(family, type, 0); 141 udp_ = (SOCK_DGRAM == type); 142 UpdateLastError(); 143 if (udp_) 144 enabled_events_ = DE_READ | DE_WRITE; 145 return s_ != INVALID_SOCKET; 146 } 147 148 SocketAddress GetLocalAddress() const { 149 sockaddr_storage addr_storage = {0}; 150 socklen_t addrlen = sizeof(addr_storage); 151 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 152 int result = ::getsockname(s_, addr, &addrlen); 153 SocketAddress address; 154 if (result >= 0) { 155 SocketAddressFromSockAddrStorage(addr_storage, &address); 156 } else { 157 LOG(LS_WARNING) << "GetLocalAddress: unable to get local addr, socket=" 158 << s_; 159 } 160 return address; 161 } 162 163 SocketAddress GetRemoteAddress() const { 164 sockaddr_storage addr_storage = {0}; 165 socklen_t addrlen = sizeof(addr_storage); 166 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 167 int result = ::getpeername(s_, addr, &addrlen); 168 SocketAddress address; 169 if (result >= 0) { 170 SocketAddressFromSockAddrStorage(addr_storage, &address); 171 } else { 172 LOG(LS_WARNING) << "GetRemoteAddress: unable to get remote addr, socket=" 173 << s_; 174 } 175 return address; 176 } 177 178 int Bind(const SocketAddress& bind_addr) { 179 sockaddr_storage addr_storage; 180 size_t len = bind_addr.ToSockAddrStorage(&addr_storage); 181 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 182 int err = ::bind(s_, addr, static_cast<int>(len)); 183 UpdateLastError(); 184#ifdef _DEBUG 185 if (0 == err) { 186 dbg_addr_ = "Bound @ "; 187 dbg_addr_.append(GetLocalAddress().ToString()); 188 } 189#endif // _DEBUG 190 return err; 191 } 192 193 int Connect(const SocketAddress& addr) { 194 // TODO: Implicit creation is required to reconnect... 195 // ...but should we make it more explicit? 196 if (state_ != CS_CLOSED) { 197 SetError(EALREADY); 198 return SOCKET_ERROR; 199 } 200 if (addr.IsUnresolved()) { 201 LOG(LS_VERBOSE) << "Resolving addr in PhysicalSocket::Connect"; 202 resolver_ = new AsyncResolver(); 203 resolver_->set_address(addr); 204 resolver_->SignalWorkDone.connect(this, &PhysicalSocket::OnResolveResult); 205 resolver_->Start(); 206 state_ = CS_CONNECTING; 207 return 0; 208 } 209 210 return DoConnect(addr); 211 } 212 213 int DoConnect(const SocketAddress& connect_addr) { 214 if ((s_ == INVALID_SOCKET) && 215 !Create(connect_addr.family(), SOCK_STREAM)) { 216 return SOCKET_ERROR; 217 } 218 sockaddr_storage addr_storage; 219 size_t len = connect_addr.ToSockAddrStorage(&addr_storage); 220 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 221 int err = ::connect(s_, addr, static_cast<int>(len)); 222 UpdateLastError(); 223 if (err == 0) { 224 state_ = CS_CONNECTED; 225 } else if (IsBlockingError(error_)) { 226 state_ = CS_CONNECTING; 227 enabled_events_ |= DE_CONNECT; 228 } else { 229 return SOCKET_ERROR; 230 } 231 232 enabled_events_ |= DE_READ | DE_WRITE; 233 return 0; 234 } 235 236 int GetError() const { 237 return error_; 238 } 239 240 void SetError(int error) { 241 error_ = error; 242 } 243 244 ConnState GetState() const { 245 return state_; 246 } 247 248 int GetOption(Option opt, int* value) { 249 int slevel; 250 int sopt; 251 if (TranslateOption(opt, &slevel, &sopt) == -1) 252 return -1; 253 socklen_t optlen = sizeof(*value); 254 int ret = ::getsockopt(s_, slevel, sopt, (SockOptArg)value, &optlen); 255 if (ret != -1 && opt == OPT_DONTFRAGMENT) { 256#ifdef LINUX 257 *value = (*value != IP_PMTUDISC_DONT) ? 1 : 0; 258#endif 259 } 260 return ret; 261 } 262 263 int SetOption(Option opt, int value) { 264 int slevel; 265 int sopt; 266 if (TranslateOption(opt, &slevel, &sopt) == -1) 267 return -1; 268 if (opt == OPT_DONTFRAGMENT) { 269#ifdef LINUX 270 value = (value) ? IP_PMTUDISC_DO : IP_PMTUDISC_DONT; 271#endif 272 } 273 return ::setsockopt(s_, slevel, sopt, (SockOptArg)&value, sizeof(value)); 274 } 275 276 int Send(const void *pv, size_t cb) { 277 int sent = ::send(s_, reinterpret_cast<const char *>(pv), (int)cb, 278#ifdef LINUX 279 // Suppress SIGPIPE. Without this, attempting to send on a socket whose 280 // other end is closed will result in a SIGPIPE signal being raised to 281 // our process, which by default will terminate the process, which we 282 // don't want. By specifying this flag, we'll just get the error EPIPE 283 // instead and can handle the error gracefully. 284 MSG_NOSIGNAL 285#else 286 0 287#endif 288 ); 289 UpdateLastError(); 290 MaybeRemapSendError(); 291 // We have seen minidumps where this may be false. 292 ASSERT(sent <= static_cast<int>(cb)); 293 if ((sent < 0) && IsBlockingError(error_)) { 294 enabled_events_ |= DE_WRITE; 295 } 296 return sent; 297 } 298 299 int SendTo(const void* buffer, size_t length, const SocketAddress& addr) { 300 sockaddr_storage saddr; 301 size_t len = addr.ToSockAddrStorage(&saddr); 302 int sent = ::sendto( 303 s_, static_cast<const char *>(buffer), static_cast<int>(length), 304#ifdef LINUX 305 // Suppress SIGPIPE. See above for explanation. 306 MSG_NOSIGNAL, 307#else 308 0, 309#endif 310 reinterpret_cast<sockaddr*>(&saddr), static_cast<int>(len)); 311 UpdateLastError(); 312 MaybeRemapSendError(); 313 // We have seen minidumps where this may be false. 314 ASSERT(sent <= static_cast<int>(length)); 315 if ((sent < 0) && IsBlockingError(error_)) { 316 enabled_events_ |= DE_WRITE; 317 } 318 return sent; 319 } 320 321 int Recv(void* buffer, size_t length) { 322 int received = ::recv(s_, static_cast<char*>(buffer), 323 static_cast<int>(length), 0); 324 if ((received == 0) && (length != 0)) { 325 // Note: on graceful shutdown, recv can return 0. In this case, we 326 // pretend it is blocking, and then signal close, so that simplifying 327 // assumptions can be made about Recv. 328 LOG(LS_WARNING) << "EOF from socket; deferring close event"; 329 // Must turn this back on so that the select() loop will notice the close 330 // event. 331 enabled_events_ |= DE_READ; 332 error_ = EWOULDBLOCK; 333 return SOCKET_ERROR; 334 } 335 UpdateLastError(); 336 bool success = (received >= 0) || IsBlockingError(error_); 337 if (udp_ || success) { 338 enabled_events_ |= DE_READ; 339 } 340 if (!success) { 341 LOG_F(LS_VERBOSE) << "Error = " << error_; 342 } 343 return received; 344 } 345 346 int RecvFrom(void* buffer, size_t length, SocketAddress *out_addr) { 347 sockaddr_storage addr_storage; 348 socklen_t addr_len = sizeof(addr_storage); 349 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 350 int received = ::recvfrom(s_, static_cast<char*>(buffer), 351 static_cast<int>(length), 0, addr, &addr_len); 352 UpdateLastError(); 353 if ((received >= 0) && (out_addr != NULL)) 354 SocketAddressFromSockAddrStorage(addr_storage, out_addr); 355 bool success = (received >= 0) || IsBlockingError(error_); 356 if (udp_ || success) { 357 enabled_events_ |= DE_READ; 358 } 359 if (!success) { 360 LOG_F(LS_VERBOSE) << "Error = " << error_; 361 } 362 return received; 363 } 364 365 int Listen(int backlog) { 366 int err = ::listen(s_, backlog); 367 UpdateLastError(); 368 if (err == 0) { 369 state_ = CS_CONNECTING; 370 enabled_events_ |= DE_ACCEPT; 371#ifdef _DEBUG 372 dbg_addr_ = "Listening @ "; 373 dbg_addr_.append(GetLocalAddress().ToString()); 374#endif // _DEBUG 375 } 376 return err; 377 } 378 379 AsyncSocket* Accept(SocketAddress *out_addr) { 380 sockaddr_storage addr_storage; 381 socklen_t addr_len = sizeof(addr_storage); 382 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 383 SOCKET s = ::accept(s_, addr, &addr_len); 384 UpdateLastError(); 385 if (s == INVALID_SOCKET) 386 return NULL; 387 enabled_events_ |= DE_ACCEPT; 388 if (out_addr != NULL) 389 SocketAddressFromSockAddrStorage(addr_storage, out_addr); 390 return ss_->WrapSocket(s); 391 } 392 393 int Close() { 394 if (s_ == INVALID_SOCKET) 395 return 0; 396 int err = ::closesocket(s_); 397 UpdateLastError(); 398 s_ = INVALID_SOCKET; 399 state_ = CS_CLOSED; 400 enabled_events_ = 0; 401 if (resolver_) { 402 resolver_->Destroy(false); 403 resolver_ = NULL; 404 } 405 return err; 406 } 407 408 int EstimateMTU(uint16* mtu) { 409 SocketAddress addr = GetRemoteAddress(); 410 if (addr.IsAny()) { 411 error_ = ENOTCONN; 412 return -1; 413 } 414 415#if defined(WIN32) 416 // Gets the interface MTU (TTL=1) for the interface used to reach |addr|. 417 WinPing ping; 418 if (!ping.IsValid()) { 419 error_ = EINVAL; // can't think of a better error ID 420 return -1; 421 } 422 int header_size = ICMP_HEADER_SIZE; 423 if (addr.family() == AF_INET6) { 424 header_size += IPV6_HEADER_SIZE; 425 } else if (addr.family() == AF_INET) { 426 header_size += IP_HEADER_SIZE; 427 } 428 429 for (int level = 0; PACKET_MAXIMUMS[level + 1] > 0; ++level) { 430 int32 size = PACKET_MAXIMUMS[level] - header_size; 431 WinPing::PingResult result = ping.Ping(addr.ipaddr(), size, 432 ICMP_PING_TIMEOUT_MILLIS, 433 1, false); 434 if (result == WinPing::PING_FAIL) { 435 error_ = EINVAL; // can't think of a better error ID 436 return -1; 437 } else if (result != WinPing::PING_TOO_LARGE) { 438 *mtu = PACKET_MAXIMUMS[level]; 439 return 0; 440 } 441 } 442 443 ASSERT(false); 444 return -1; 445#elif defined(IOS) || defined(OSX) 446 // No simple way to do this on Mac OS X. 447 // SIOCGIFMTU would work if we knew which interface would be used, but 448 // figuring that out is pretty complicated. For now we'll return an error 449 // and let the caller pick a default MTU. 450 error_ = EINVAL; 451 return -1; 452#elif defined(LINUX) || defined(ANDROID) 453 // Gets the path MTU. 454 int value; 455 socklen_t vlen = sizeof(value); 456 int err = getsockopt(s_, IPPROTO_IP, IP_MTU, &value, &vlen); 457 if (err < 0) { 458 UpdateLastError(); 459 return err; 460 } 461 462 ASSERT((0 <= value) && (value <= 65536)); 463 *mtu = value; 464 return 0; 465#endif 466 } 467 468 SocketServer* socketserver() { return ss_; } 469 470 protected: 471 void OnResolveResult(SignalThread* thread) { 472 if (thread != resolver_) { 473 return; 474 } 475 476 int error = resolver_->error(); 477 if (error == 0) { 478 error = DoConnect(resolver_->address()); 479 } else { 480 Close(); 481 } 482 483 if (error) { 484 error_ = error; 485 SignalCloseEvent(this, error_); 486 } 487 } 488 489 void UpdateLastError() { 490 error_ = LAST_SYSTEM_ERROR; 491 } 492 493 void MaybeRemapSendError() { 494#if defined(OSX) 495 // https://developer.apple.com/library/mac/documentation/Darwin/ 496 // Reference/ManPages/man2/sendto.2.html 497 // ENOBUFS - The output queue for a network interface is full. 498 // This generally indicates that the interface has stopped sending, 499 // but may be caused by transient congestion. 500 if (error_ == ENOBUFS) { 501 error_ = EWOULDBLOCK; 502 } 503#endif 504 } 505 506 static int TranslateOption(Option opt, int* slevel, int* sopt) { 507 switch (opt) { 508 case OPT_DONTFRAGMENT: 509#ifdef WIN32 510 *slevel = IPPROTO_IP; 511 *sopt = IP_DONTFRAGMENT; 512 break; 513#elif defined(IOS) || defined(OSX) || defined(BSD) 514 LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported."; 515 return -1; 516#elif defined(POSIX) 517 *slevel = IPPROTO_IP; 518 *sopt = IP_MTU_DISCOVER; 519 break; 520#endif 521 case OPT_RCVBUF: 522 *slevel = SOL_SOCKET; 523 *sopt = SO_RCVBUF; 524 break; 525 case OPT_SNDBUF: 526 *slevel = SOL_SOCKET; 527 *sopt = SO_SNDBUF; 528 break; 529 case OPT_NODELAY: 530 *slevel = IPPROTO_TCP; 531 *sopt = TCP_NODELAY; 532 break; 533 default: 534 ASSERT(false); 535 return -1; 536 } 537 return 0; 538 } 539 540 PhysicalSocketServer* ss_; 541 SOCKET s_; 542 uint8 enabled_events_; 543 bool udp_; 544 int error_; 545 ConnState state_; 546 AsyncResolver* resolver_; 547 548#ifdef _DEBUG 549 std::string dbg_addr_; 550#endif // _DEBUG; 551}; 552 553#ifdef POSIX 554class EventDispatcher : public Dispatcher { 555 public: 556 EventDispatcher(PhysicalSocketServer* ss) : ss_(ss), fSignaled_(false) { 557 if (pipe(afd_) < 0) 558 LOG(LERROR) << "pipe failed"; 559 ss_->Add(this); 560 } 561 562 virtual ~EventDispatcher() { 563 ss_->Remove(this); 564 close(afd_[0]); 565 close(afd_[1]); 566 } 567 568 virtual void Signal() { 569 CritScope cs(&crit_); 570 if (!fSignaled_) { 571 const uint8 b[1] = { 0 }; 572 if (VERIFY(1 == write(afd_[1], b, sizeof(b)))) { 573 fSignaled_ = true; 574 } 575 } 576 } 577 578 virtual uint32 GetRequestedEvents() { 579 return DE_READ; 580 } 581 582 virtual void OnPreEvent(uint32 ff) { 583 // It is not possible to perfectly emulate an auto-resetting event with 584 // pipes. This simulates it by resetting before the event is handled. 585 586 CritScope cs(&crit_); 587 if (fSignaled_) { 588 uint8 b[4]; // Allow for reading more than 1 byte, but expect 1. 589 VERIFY(1 == read(afd_[0], b, sizeof(b))); 590 fSignaled_ = false; 591 } 592 } 593 594 virtual void OnEvent(uint32 ff, int err) { 595 ASSERT(false); 596 } 597 598 virtual int GetDescriptor() { 599 return afd_[0]; 600 } 601 602 virtual bool IsDescriptorClosed() { 603 return false; 604 } 605 606 private: 607 PhysicalSocketServer *ss_; 608 int afd_[2]; 609 bool fSignaled_; 610 CriticalSection crit_; 611}; 612 613// These two classes use the self-pipe trick to deliver POSIX signals to our 614// select loop. This is the only safe, reliable, cross-platform way to do 615// non-trivial things with a POSIX signal in an event-driven program (until 616// proper pselect() implementations become ubiquitous). 617 618class PosixSignalHandler { 619 public: 620 // POSIX only specifies 32 signals, but in principle the system might have 621 // more and the programmer might choose to use them, so we size our array 622 // for 128. 623 static const int kNumPosixSignals = 128; 624 625 // There is just a single global instance. (Signal handlers do not get any 626 // sort of user-defined void * parameter, so they can't access anything that 627 // isn't global.) 628 static PosixSignalHandler* Instance() { 629 LIBJINGLE_DEFINE_STATIC_LOCAL(PosixSignalHandler, instance, ()); 630 return &instance; 631 } 632 633 // Returns true if the given signal number is set. 634 bool IsSignalSet(int signum) const { 635 ASSERT(signum < ARRAY_SIZE(received_signal_)); 636 if (signum < ARRAY_SIZE(received_signal_)) { 637 return received_signal_[signum]; 638 } else { 639 return false; 640 } 641 } 642 643 // Clears the given signal number. 644 void ClearSignal(int signum) { 645 ASSERT(signum < ARRAY_SIZE(received_signal_)); 646 if (signum < ARRAY_SIZE(received_signal_)) { 647 received_signal_[signum] = false; 648 } 649 } 650 651 // Returns the file descriptor to monitor for signal events. 652 int GetDescriptor() const { 653 return afd_[0]; 654 } 655 656 // This is called directly from our real signal handler, so it must be 657 // signal-handler-safe. That means it cannot assume anything about the 658 // user-level state of the process, since the handler could be executed at any 659 // time on any thread. 660 void OnPosixSignalReceived(int signum) { 661 if (signum >= ARRAY_SIZE(received_signal_)) { 662 // We don't have space in our array for this. 663 return; 664 } 665 // Set a flag saying we've seen this signal. 666 received_signal_[signum] = true; 667 // Notify application code that we got a signal. 668 const uint8 b[1] = { 0 }; 669 if (-1 == write(afd_[1], b, sizeof(b))) { 670 // Nothing we can do here. If there's an error somehow then there's 671 // nothing we can safely do from a signal handler. 672 // No, we can't even safely log it. 673 // But, we still have to check the return value here. Otherwise, 674 // GCC 4.4.1 complains ignoring return value. Even (void) doesn't help. 675 return; 676 } 677 } 678 679 private: 680 PosixSignalHandler() { 681 if (pipe(afd_) < 0) { 682 LOG_ERR(LS_ERROR) << "pipe failed"; 683 return; 684 } 685 if (fcntl(afd_[0], F_SETFL, O_NONBLOCK) < 0) { 686 LOG_ERR(LS_WARNING) << "fcntl #1 failed"; 687 } 688 if (fcntl(afd_[1], F_SETFL, O_NONBLOCK) < 0) { 689 LOG_ERR(LS_WARNING) << "fcntl #2 failed"; 690 } 691 memset(const_cast<void *>(static_cast<volatile void *>(received_signal_)), 692 0, 693 sizeof(received_signal_)); 694 } 695 696 ~PosixSignalHandler() { 697 int fd1 = afd_[0]; 698 int fd2 = afd_[1]; 699 // We clobber the stored file descriptor numbers here or else in principle 700 // a signal that happens to be delivered during application termination 701 // could erroneously write a zero byte to an unrelated file handle in 702 // OnPosixSignalReceived() if some other file happens to be opened later 703 // during shutdown and happens to be given the same file descriptor number 704 // as our pipe had. Unfortunately even with this precaution there is still a 705 // race where that could occur if said signal happens to be handled 706 // concurrently with this code and happens to have already read the value of 707 // afd_[1] from memory before we clobber it, but that's unlikely. 708 afd_[0] = -1; 709 afd_[1] = -1; 710 close(fd1); 711 close(fd2); 712 } 713 714 int afd_[2]; 715 // These are boolean flags that will be set in our signal handler and read 716 // and cleared from Wait(). There is a race involved in this, but it is 717 // benign. The signal handler sets the flag before signaling the pipe, so 718 // we'll never end up blocking in select() while a flag is still true. 719 // However, if two of the same signal arrive close to each other then it's 720 // possible that the second time the handler may set the flag while it's still 721 // true, meaning that signal will be missed. But the first occurrence of it 722 // will still be handled, so this isn't a problem. 723 // Volatile is not necessary here for correctness, but this data _is_ volatile 724 // so I've marked it as such. 725 volatile uint8 received_signal_[kNumPosixSignals]; 726}; 727 728class PosixSignalDispatcher : public Dispatcher { 729 public: 730 PosixSignalDispatcher(PhysicalSocketServer *owner) : owner_(owner) { 731 owner_->Add(this); 732 } 733 734 virtual ~PosixSignalDispatcher() { 735 owner_->Remove(this); 736 } 737 738 virtual uint32 GetRequestedEvents() { 739 return DE_READ; 740 } 741 742 virtual void OnPreEvent(uint32 ff) { 743 // Events might get grouped if signals come very fast, so we read out up to 744 // 16 bytes to make sure we keep the pipe empty. 745 uint8 b[16]; 746 ssize_t ret = read(GetDescriptor(), b, sizeof(b)); 747 if (ret < 0) { 748 LOG_ERR(LS_WARNING) << "Error in read()"; 749 } else if (ret == 0) { 750 LOG(LS_WARNING) << "Should have read at least one byte"; 751 } 752 } 753 754 virtual void OnEvent(uint32 ff, int err) { 755 for (int signum = 0; signum < PosixSignalHandler::kNumPosixSignals; 756 ++signum) { 757 if (PosixSignalHandler::Instance()->IsSignalSet(signum)) { 758 PosixSignalHandler::Instance()->ClearSignal(signum); 759 HandlerMap::iterator i = handlers_.find(signum); 760 if (i == handlers_.end()) { 761 // This can happen if a signal is delivered to our process at around 762 // the same time as we unset our handler for it. It is not an error 763 // condition, but it's unusual enough to be worth logging. 764 LOG(LS_INFO) << "Received signal with no handler: " << signum; 765 } else { 766 // Otherwise, execute our handler. 767 (*i->second)(signum); 768 } 769 } 770 } 771 } 772 773 virtual int GetDescriptor() { 774 return PosixSignalHandler::Instance()->GetDescriptor(); 775 } 776 777 virtual bool IsDescriptorClosed() { 778 return false; 779 } 780 781 void SetHandler(int signum, void (*handler)(int)) { 782 handlers_[signum] = handler; 783 } 784 785 void ClearHandler(int signum) { 786 handlers_.erase(signum); 787 } 788 789 bool HasHandlers() { 790 return !handlers_.empty(); 791 } 792 793 private: 794 typedef std::map<int, void (*)(int)> HandlerMap; 795 796 HandlerMap handlers_; 797 // Our owner. 798 PhysicalSocketServer *owner_; 799}; 800 801class SocketDispatcher : public Dispatcher, public PhysicalSocket { 802 public: 803 explicit SocketDispatcher(PhysicalSocketServer *ss) : PhysicalSocket(ss) { 804 } 805 SocketDispatcher(SOCKET s, PhysicalSocketServer *ss) : PhysicalSocket(ss, s) { 806 } 807 808 virtual ~SocketDispatcher() { 809 Close(); 810 } 811 812 bool Initialize() { 813 ss_->Add(this); 814 fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK); 815 return true; 816 } 817 818 virtual bool Create(int type) { 819 return Create(AF_INET, type); 820 } 821 822 virtual bool Create(int family, int type) { 823 // Change the socket to be non-blocking. 824 if (!PhysicalSocket::Create(family, type)) 825 return false; 826 827 return Initialize(); 828 } 829 830 virtual int GetDescriptor() { 831 return s_; 832 } 833 834 virtual bool IsDescriptorClosed() { 835 // We don't have a reliable way of distinguishing end-of-stream 836 // from readability. So test on each readable call. Is this 837 // inefficient? Probably. 838 char ch; 839 ssize_t res = ::recv(s_, &ch, 1, MSG_PEEK); 840 if (res > 0) { 841 // Data available, so not closed. 842 return false; 843 } else if (res == 0) { 844 // EOF, so closed. 845 return true; 846 } else { // error 847 switch (errno) { 848 // Returned if we've already closed s_. 849 case EBADF: 850 // Returned during ungraceful peer shutdown. 851 case ECONNRESET: 852 return true; 853 default: 854 // Assume that all other errors are just blocking errors, meaning the 855 // connection is still good but we just can't read from it right now. 856 // This should only happen when connecting (and at most once), because 857 // in all other cases this function is only called if the file 858 // descriptor is already known to be in the readable state. However, 859 // it's not necessary a problem if we spuriously interpret a 860 // "connection lost"-type error as a blocking error, because typically 861 // the next recv() will get EOF, so we'll still eventually notice that 862 // the socket is closed. 863 LOG_ERR(LS_WARNING) << "Assuming benign blocking error"; 864 return false; 865 } 866 } 867 } 868 869 virtual uint32 GetRequestedEvents() { 870 return enabled_events_; 871 } 872 873 virtual void OnPreEvent(uint32 ff) { 874 if ((ff & DE_CONNECT) != 0) 875 state_ = CS_CONNECTED; 876 if ((ff & DE_CLOSE) != 0) 877 state_ = CS_CLOSED; 878 } 879 880 virtual void OnEvent(uint32 ff, int err) { 881 // Make sure we deliver connect/accept first. Otherwise, consumers may see 882 // something like a READ followed by a CONNECT, which would be odd. 883 if ((ff & DE_CONNECT) != 0) { 884 enabled_events_ &= ~DE_CONNECT; 885 SignalConnectEvent(this); 886 } 887 if ((ff & DE_ACCEPT) != 0) { 888 enabled_events_ &= ~DE_ACCEPT; 889 SignalReadEvent(this); 890 } 891 if ((ff & DE_READ) != 0) { 892 enabled_events_ &= ~DE_READ; 893 SignalReadEvent(this); 894 } 895 if ((ff & DE_WRITE) != 0) { 896 enabled_events_ &= ~DE_WRITE; 897 SignalWriteEvent(this); 898 } 899 if ((ff & DE_CLOSE) != 0) { 900 // The socket is now dead to us, so stop checking it. 901 enabled_events_ = 0; 902 SignalCloseEvent(this, err); 903 } 904 } 905 906 virtual int Close() { 907 if (s_ == INVALID_SOCKET) 908 return 0; 909 910 ss_->Remove(this); 911 return PhysicalSocket::Close(); 912 } 913}; 914 915class FileDispatcher: public Dispatcher, public AsyncFile { 916 public: 917 FileDispatcher(int fd, PhysicalSocketServer *ss) : ss_(ss), fd_(fd) { 918 set_readable(true); 919 920 ss_->Add(this); 921 922 fcntl(fd_, F_SETFL, fcntl(fd_, F_GETFL, 0) | O_NONBLOCK); 923 } 924 925 virtual ~FileDispatcher() { 926 ss_->Remove(this); 927 } 928 929 SocketServer* socketserver() { return ss_; } 930 931 virtual int GetDescriptor() { 932 return fd_; 933 } 934 935 virtual bool IsDescriptorClosed() { 936 return false; 937 } 938 939 virtual uint32 GetRequestedEvents() { 940 return flags_; 941 } 942 943 virtual void OnPreEvent(uint32 ff) { 944 } 945 946 virtual void OnEvent(uint32 ff, int err) { 947 if ((ff & DE_READ) != 0) 948 SignalReadEvent(this); 949 if ((ff & DE_WRITE) != 0) 950 SignalWriteEvent(this); 951 if ((ff & DE_CLOSE) != 0) 952 SignalCloseEvent(this, err); 953 } 954 955 virtual bool readable() { 956 return (flags_ & DE_READ) != 0; 957 } 958 959 virtual void set_readable(bool value) { 960 flags_ = value ? (flags_ | DE_READ) : (flags_ & ~DE_READ); 961 } 962 963 virtual bool writable() { 964 return (flags_ & DE_WRITE) != 0; 965 } 966 967 virtual void set_writable(bool value) { 968 flags_ = value ? (flags_ | DE_WRITE) : (flags_ & ~DE_WRITE); 969 } 970 971 private: 972 PhysicalSocketServer* ss_; 973 int fd_; 974 int flags_; 975}; 976 977AsyncFile* PhysicalSocketServer::CreateFile(int fd) { 978 return new FileDispatcher(fd, this); 979} 980 981#endif // POSIX 982 983#ifdef WIN32 984static uint32 FlagsToEvents(uint32 events) { 985 uint32 ffFD = FD_CLOSE; 986 if (events & DE_READ) 987 ffFD |= FD_READ; 988 if (events & DE_WRITE) 989 ffFD |= FD_WRITE; 990 if (events & DE_CONNECT) 991 ffFD |= FD_CONNECT; 992 if (events & DE_ACCEPT) 993 ffFD |= FD_ACCEPT; 994 return ffFD; 995} 996 997class EventDispatcher : public Dispatcher { 998 public: 999 EventDispatcher(PhysicalSocketServer *ss) : ss_(ss) { 1000 hev_ = WSACreateEvent(); 1001 if (hev_) { 1002 ss_->Add(this); 1003 } 1004 } 1005 1006 ~EventDispatcher() { 1007 if (hev_ != NULL) { 1008 ss_->Remove(this); 1009 WSACloseEvent(hev_); 1010 hev_ = NULL; 1011 } 1012 } 1013 1014 virtual void Signal() { 1015 if (hev_ != NULL) 1016 WSASetEvent(hev_); 1017 } 1018 1019 virtual uint32 GetRequestedEvents() { 1020 return 0; 1021 } 1022 1023 virtual void OnPreEvent(uint32 ff) { 1024 WSAResetEvent(hev_); 1025 } 1026 1027 virtual void OnEvent(uint32 ff, int err) { 1028 } 1029 1030 virtual WSAEVENT GetWSAEvent() { 1031 return hev_; 1032 } 1033 1034 virtual SOCKET GetSocket() { 1035 return INVALID_SOCKET; 1036 } 1037 1038 virtual bool CheckSignalClose() { return false; } 1039 1040private: 1041 PhysicalSocketServer* ss_; 1042 WSAEVENT hev_; 1043}; 1044 1045class SocketDispatcher : public Dispatcher, public PhysicalSocket { 1046 public: 1047 static int next_id_; 1048 int id_; 1049 bool signal_close_; 1050 int signal_err_; 1051 1052 SocketDispatcher(PhysicalSocketServer* ss) 1053 : PhysicalSocket(ss), 1054 id_(0), 1055 signal_close_(false) { 1056 } 1057 1058 SocketDispatcher(SOCKET s, PhysicalSocketServer* ss) 1059 : PhysicalSocket(ss, s), 1060 id_(0), 1061 signal_close_(false) { 1062 } 1063 1064 virtual ~SocketDispatcher() { 1065 Close(); 1066 } 1067 1068 bool Initialize() { 1069 ASSERT(s_ != INVALID_SOCKET); 1070 // Must be a non-blocking 1071 u_long argp = 1; 1072 ioctlsocket(s_, FIONBIO, &argp); 1073 ss_->Add(this); 1074 return true; 1075 } 1076 1077 virtual bool Create(int type) { 1078 return Create(AF_INET, type); 1079 } 1080 1081 virtual bool Create(int family, int type) { 1082 // Create socket 1083 if (!PhysicalSocket::Create(family, type)) 1084 return false; 1085 1086 if (!Initialize()) 1087 return false; 1088 1089 do { id_ = ++next_id_; } while (id_ == 0); 1090 return true; 1091 } 1092 1093 virtual int Close() { 1094 if (s_ == INVALID_SOCKET) 1095 return 0; 1096 1097 id_ = 0; 1098 signal_close_ = false; 1099 ss_->Remove(this); 1100 return PhysicalSocket::Close(); 1101 } 1102 1103 virtual uint32 GetRequestedEvents() { 1104 return enabled_events_; 1105 } 1106 1107 virtual void OnPreEvent(uint32 ff) { 1108 if ((ff & DE_CONNECT) != 0) 1109 state_ = CS_CONNECTED; 1110 // We set CS_CLOSED from CheckSignalClose. 1111 } 1112 1113 virtual void OnEvent(uint32 ff, int err) { 1114 int cache_id = id_; 1115 // Make sure we deliver connect/accept first. Otherwise, consumers may see 1116 // something like a READ followed by a CONNECT, which would be odd. 1117 if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) { 1118 if (ff != DE_CONNECT) 1119 LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff; 1120 enabled_events_ &= ~DE_CONNECT; 1121#ifdef _DEBUG 1122 dbg_addr_ = "Connected @ "; 1123 dbg_addr_.append(GetRemoteAddress().ToString()); 1124#endif // _DEBUG 1125 SignalConnectEvent(this); 1126 } 1127 if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) { 1128 enabled_events_ &= ~DE_ACCEPT; 1129 SignalReadEvent(this); 1130 } 1131 if ((ff & DE_READ) != 0) { 1132 enabled_events_ &= ~DE_READ; 1133 SignalReadEvent(this); 1134 } 1135 if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) { 1136 enabled_events_ &= ~DE_WRITE; 1137 SignalWriteEvent(this); 1138 } 1139 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) { 1140 signal_close_ = true; 1141 signal_err_ = err; 1142 } 1143 } 1144 1145 virtual WSAEVENT GetWSAEvent() { 1146 return WSA_INVALID_EVENT; 1147 } 1148 1149 virtual SOCKET GetSocket() { 1150 return s_; 1151 } 1152 1153 virtual bool CheckSignalClose() { 1154 if (!signal_close_) 1155 return false; 1156 1157 char ch; 1158 if (recv(s_, &ch, 1, MSG_PEEK) > 0) 1159 return false; 1160 1161 state_ = CS_CLOSED; 1162 signal_close_ = false; 1163 SignalCloseEvent(this, signal_err_); 1164 return true; 1165 } 1166}; 1167 1168int SocketDispatcher::next_id_ = 0; 1169 1170#endif // WIN32 1171 1172// Sets the value of a boolean value to false when signaled. 1173class Signaler : public EventDispatcher { 1174 public: 1175 Signaler(PhysicalSocketServer* ss, bool* pf) 1176 : EventDispatcher(ss), pf_(pf) { 1177 } 1178 virtual ~Signaler() { } 1179 1180 void OnEvent(uint32 ff, int err) { 1181 if (pf_) 1182 *pf_ = false; 1183 } 1184 1185 private: 1186 bool *pf_; 1187}; 1188 1189PhysicalSocketServer::PhysicalSocketServer() 1190 : fWait_(false), 1191 last_tick_tracked_(0), 1192 last_tick_dispatch_count_(0) { 1193 signal_wakeup_ = new Signaler(this, &fWait_); 1194#ifdef WIN32 1195 socket_ev_ = WSACreateEvent(); 1196#endif 1197} 1198 1199PhysicalSocketServer::~PhysicalSocketServer() { 1200#ifdef WIN32 1201 WSACloseEvent(socket_ev_); 1202#endif 1203#ifdef POSIX 1204 signal_dispatcher_.reset(); 1205#endif 1206 delete signal_wakeup_; 1207 ASSERT(dispatchers_.empty()); 1208} 1209 1210void PhysicalSocketServer::WakeUp() { 1211 signal_wakeup_->Signal(); 1212} 1213 1214Socket* PhysicalSocketServer::CreateSocket(int type) { 1215 return CreateSocket(AF_INET, type); 1216} 1217 1218Socket* PhysicalSocketServer::CreateSocket(int family, int type) { 1219 PhysicalSocket* socket = new PhysicalSocket(this); 1220 if (socket->Create(family, type)) { 1221 return socket; 1222 } else { 1223 delete socket; 1224 return 0; 1225 } 1226} 1227 1228AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int type) { 1229 return CreateAsyncSocket(AF_INET, type); 1230} 1231 1232AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int family, int type) { 1233 SocketDispatcher* dispatcher = new SocketDispatcher(this); 1234 if (dispatcher->Create(family, type)) { 1235 return dispatcher; 1236 } else { 1237 delete dispatcher; 1238 return 0; 1239 } 1240} 1241 1242AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) { 1243 SocketDispatcher* dispatcher = new SocketDispatcher(s, this); 1244 if (dispatcher->Initialize()) { 1245 return dispatcher; 1246 } else { 1247 delete dispatcher; 1248 return 0; 1249 } 1250} 1251 1252void PhysicalSocketServer::Add(Dispatcher *pdispatcher) { 1253 CritScope cs(&crit_); 1254 // Prevent duplicates. This can cause dead dispatchers to stick around. 1255 DispatcherList::iterator pos = std::find(dispatchers_.begin(), 1256 dispatchers_.end(), 1257 pdispatcher); 1258 if (pos != dispatchers_.end()) 1259 return; 1260 dispatchers_.push_back(pdispatcher); 1261} 1262 1263void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) { 1264 CritScope cs(&crit_); 1265 DispatcherList::iterator pos = std::find(dispatchers_.begin(), 1266 dispatchers_.end(), 1267 pdispatcher); 1268 ASSERT(pos != dispatchers_.end()); 1269 size_t index = pos - dispatchers_.begin(); 1270 dispatchers_.erase(pos); 1271 for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end(); 1272 ++it) { 1273 if (index < **it) { 1274 --**it; 1275 } 1276 } 1277} 1278 1279#ifdef POSIX 1280bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { 1281 // Calculate timing information 1282 1283 struct timeval *ptvWait = NULL; 1284 struct timeval tvWait; 1285 struct timeval tvStop; 1286 if (cmsWait != kForever) { 1287 // Calculate wait timeval 1288 tvWait.tv_sec = cmsWait / 1000; 1289 tvWait.tv_usec = (cmsWait % 1000) * 1000; 1290 ptvWait = &tvWait; 1291 1292 // Calculate when to return in a timeval 1293 gettimeofday(&tvStop, NULL); 1294 tvStop.tv_sec += tvWait.tv_sec; 1295 tvStop.tv_usec += tvWait.tv_usec; 1296 if (tvStop.tv_usec >= 1000000) { 1297 tvStop.tv_usec -= 1000000; 1298 tvStop.tv_sec += 1; 1299 } 1300 } 1301 1302 // Zero all fd_sets. Don't need to do this inside the loop since 1303 // select() zeros the descriptors not signaled 1304 1305 fd_set fdsRead; 1306 FD_ZERO(&fdsRead); 1307 fd_set fdsWrite; 1308 FD_ZERO(&fdsWrite); 1309 1310 fWait_ = true; 1311 1312 while (fWait_) { 1313 int fdmax = -1; 1314 { 1315 CritScope cr(&crit_); 1316 for (size_t i = 0; i < dispatchers_.size(); ++i) { 1317 // Query dispatchers for read and write wait state 1318 Dispatcher *pdispatcher = dispatchers_[i]; 1319 ASSERT(pdispatcher); 1320 if (!process_io && (pdispatcher != signal_wakeup_)) 1321 continue; 1322 int fd = pdispatcher->GetDescriptor(); 1323 if (fd > fdmax) 1324 fdmax = fd; 1325 1326 uint32 ff = pdispatcher->GetRequestedEvents(); 1327 if (ff & (DE_READ | DE_ACCEPT)) 1328 FD_SET(fd, &fdsRead); 1329 if (ff & (DE_WRITE | DE_CONNECT)) 1330 FD_SET(fd, &fdsWrite); 1331 } 1332 } 1333 1334 // Wait then call handlers as appropriate 1335 // < 0 means error 1336 // 0 means timeout 1337 // > 0 means count of descriptors ready 1338 int n = select(fdmax + 1, &fdsRead, &fdsWrite, NULL, ptvWait); 1339 1340 // If error, return error. 1341 if (n < 0) { 1342 if (errno != EINTR) { 1343 LOG_E(LS_ERROR, EN, errno) << "select"; 1344 return false; 1345 } 1346 // Else ignore the error and keep going. If this EINTR was for one of the 1347 // signals managed by this PhysicalSocketServer, the 1348 // PosixSignalDeliveryDispatcher will be in the signaled state in the next 1349 // iteration. 1350 } else if (n == 0) { 1351 // If timeout, return success 1352 return true; 1353 } else { 1354 // We have signaled descriptors 1355 CritScope cr(&crit_); 1356 for (size_t i = 0; i < dispatchers_.size(); ++i) { 1357 Dispatcher *pdispatcher = dispatchers_[i]; 1358 int fd = pdispatcher->GetDescriptor(); 1359 uint32 ff = 0; 1360 int errcode = 0; 1361 1362 // Reap any error code, which can be signaled through reads or writes. 1363 // TODO: Should we set errcode if getsockopt fails? 1364 if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) { 1365 socklen_t len = sizeof(errcode); 1366 ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len); 1367 } 1368 1369 // Check readable descriptors. If we're waiting on an accept, signal 1370 // that. Otherwise we're waiting for data, check to see if we're 1371 // readable or really closed. 1372 // TODO: Only peek at TCP descriptors. 1373 if (FD_ISSET(fd, &fdsRead)) { 1374 FD_CLR(fd, &fdsRead); 1375 if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) { 1376 ff |= DE_ACCEPT; 1377 } else if (errcode || pdispatcher->IsDescriptorClosed()) { 1378 ff |= DE_CLOSE; 1379 } else { 1380 ff |= DE_READ; 1381 } 1382 } 1383 1384 // Check writable descriptors. If we're waiting on a connect, detect 1385 // success versus failure by the reaped error code. 1386 if (FD_ISSET(fd, &fdsWrite)) { 1387 FD_CLR(fd, &fdsWrite); 1388 if (pdispatcher->GetRequestedEvents() & DE_CONNECT) { 1389 if (!errcode) { 1390 ff |= DE_CONNECT; 1391 } else { 1392 ff |= DE_CLOSE; 1393 } 1394 } else { 1395 ff |= DE_WRITE; 1396 } 1397 } 1398 1399 // Tell the descriptor about the event. 1400 if (ff != 0) { 1401 pdispatcher->OnPreEvent(ff); 1402 pdispatcher->OnEvent(ff, errcode); 1403 } 1404 } 1405 } 1406 1407 // Recalc the time remaining to wait. Doing it here means it doesn't get 1408 // calced twice the first time through the loop 1409 if (ptvWait) { 1410 ptvWait->tv_sec = 0; 1411 ptvWait->tv_usec = 0; 1412 struct timeval tvT; 1413 gettimeofday(&tvT, NULL); 1414 if ((tvStop.tv_sec > tvT.tv_sec) 1415 || ((tvStop.tv_sec == tvT.tv_sec) 1416 && (tvStop.tv_usec > tvT.tv_usec))) { 1417 ptvWait->tv_sec = tvStop.tv_sec - tvT.tv_sec; 1418 ptvWait->tv_usec = tvStop.tv_usec - tvT.tv_usec; 1419 if (ptvWait->tv_usec < 0) { 1420 ASSERT(ptvWait->tv_sec > 0); 1421 ptvWait->tv_usec += 1000000; 1422 ptvWait->tv_sec -= 1; 1423 } 1424 } 1425 } 1426 } 1427 1428 return true; 1429} 1430 1431static void GlobalSignalHandler(int signum) { 1432 PosixSignalHandler::Instance()->OnPosixSignalReceived(signum); 1433} 1434 1435bool PhysicalSocketServer::SetPosixSignalHandler(int signum, 1436 void (*handler)(int)) { 1437 // If handler is SIG_IGN or SIG_DFL then clear our user-level handler, 1438 // otherwise set one. 1439 if (handler == SIG_IGN || handler == SIG_DFL) { 1440 if (!InstallSignal(signum, handler)) { 1441 return false; 1442 } 1443 if (signal_dispatcher_) { 1444 signal_dispatcher_->ClearHandler(signum); 1445 if (!signal_dispatcher_->HasHandlers()) { 1446 signal_dispatcher_.reset(); 1447 } 1448 } 1449 } else { 1450 if (!signal_dispatcher_) { 1451 signal_dispatcher_.reset(new PosixSignalDispatcher(this)); 1452 } 1453 signal_dispatcher_->SetHandler(signum, handler); 1454 if (!InstallSignal(signum, &GlobalSignalHandler)) { 1455 return false; 1456 } 1457 } 1458 return true; 1459} 1460 1461Dispatcher* PhysicalSocketServer::signal_dispatcher() { 1462 return signal_dispatcher_.get(); 1463} 1464 1465bool PhysicalSocketServer::InstallSignal(int signum, void (*handler)(int)) { 1466 struct sigaction act; 1467 // It doesn't really matter what we set this mask to. 1468 if (sigemptyset(&act.sa_mask) != 0) { 1469 LOG_ERR(LS_ERROR) << "Couldn't set mask"; 1470 return false; 1471 } 1472 act.sa_handler = handler; 1473 // Use SA_RESTART so that our syscalls don't get EINTR, since we don't need it 1474 // and it's a nuisance. Though some syscalls still return EINTR and there's no 1475 // real standard for which ones. :( 1476 act.sa_flags = SA_RESTART; 1477 if (sigaction(signum, &act, NULL) != 0) { 1478 LOG_ERR(LS_ERROR) << "Couldn't set sigaction"; 1479 return false; 1480 } 1481 return true; 1482} 1483#endif // POSIX 1484 1485#ifdef WIN32 1486bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { 1487 int cmsTotal = cmsWait; 1488 int cmsElapsed = 0; 1489 uint32 msStart = Time(); 1490 1491#if LOGGING 1492 if (last_tick_dispatch_count_ == 0) { 1493 last_tick_tracked_ = msStart; 1494 } 1495#endif 1496 1497 fWait_ = true; 1498 while (fWait_) { 1499 std::vector<WSAEVENT> events; 1500 std::vector<Dispatcher *> event_owners; 1501 1502 events.push_back(socket_ev_); 1503 1504 { 1505 CritScope cr(&crit_); 1506 size_t i = 0; 1507 iterators_.push_back(&i); 1508 // Don't track dispatchers_.size(), because we want to pick up any new 1509 // dispatchers that were added while processing the loop. 1510 while (i < dispatchers_.size()) { 1511 Dispatcher* disp = dispatchers_[i++]; 1512 if (!process_io && (disp != signal_wakeup_)) 1513 continue; 1514 SOCKET s = disp->GetSocket(); 1515 if (disp->CheckSignalClose()) { 1516 // We just signalled close, don't poll this socket 1517 } else if (s != INVALID_SOCKET) { 1518 WSAEventSelect(s, 1519 events[0], 1520 FlagsToEvents(disp->GetRequestedEvents())); 1521 } else { 1522 events.push_back(disp->GetWSAEvent()); 1523 event_owners.push_back(disp); 1524 } 1525 } 1526 ASSERT(iterators_.back() == &i); 1527 iterators_.pop_back(); 1528 } 1529 1530 // Which is shorter, the delay wait or the asked wait? 1531 1532 int cmsNext; 1533 if (cmsWait == kForever) { 1534 cmsNext = cmsWait; 1535 } else { 1536 cmsNext = _max(0, cmsTotal - cmsElapsed); 1537 } 1538 1539 // Wait for one of the events to signal 1540 DWORD dw = WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()), 1541 &events[0], 1542 false, 1543 cmsNext, 1544 false); 1545 1546#if 0 // LOGGING 1547 // we track this information purely for logging purposes. 1548 last_tick_dispatch_count_++; 1549 if (last_tick_dispatch_count_ >= 1000) { 1550 int32 elapsed = TimeSince(last_tick_tracked_); 1551 LOG(INFO) << "PhysicalSocketServer took " << elapsed 1552 << "ms for 1000 events"; 1553 1554 // If we get more than 1000 events in a second, we are spinning badly 1555 // (normally it should take about 8-20 seconds). 1556 ASSERT(elapsed > 1000); 1557 1558 last_tick_tracked_ = Time(); 1559 last_tick_dispatch_count_ = 0; 1560 } 1561#endif 1562 1563 if (dw == WSA_WAIT_FAILED) { 1564 // Failed? 1565 // TODO: need a better strategy than this! 1566 int error = WSAGetLastError(); 1567 ASSERT(false); 1568 return false; 1569 } else if (dw == WSA_WAIT_TIMEOUT) { 1570 // Timeout? 1571 return true; 1572 } else { 1573 // Figure out which one it is and call it 1574 CritScope cr(&crit_); 1575 int index = dw - WSA_WAIT_EVENT_0; 1576 if (index > 0) { 1577 --index; // The first event is the socket event 1578 event_owners[index]->OnPreEvent(0); 1579 event_owners[index]->OnEvent(0, 0); 1580 } else if (process_io) { 1581 size_t i = 0, end = dispatchers_.size(); 1582 iterators_.push_back(&i); 1583 iterators_.push_back(&end); // Don't iterate over new dispatchers. 1584 while (i < end) { 1585 Dispatcher* disp = dispatchers_[i++]; 1586 SOCKET s = disp->GetSocket(); 1587 if (s == INVALID_SOCKET) 1588 continue; 1589 1590 WSANETWORKEVENTS wsaEvents; 1591 int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents); 1592 if (err == 0) { 1593 1594#if LOGGING 1595 { 1596 if ((wsaEvents.lNetworkEvents & FD_READ) && 1597 wsaEvents.iErrorCode[FD_READ_BIT] != 0) { 1598 LOG(WARNING) << "PhysicalSocketServer got FD_READ_BIT error " 1599 << wsaEvents.iErrorCode[FD_READ_BIT]; 1600 } 1601 if ((wsaEvents.lNetworkEvents & FD_WRITE) && 1602 wsaEvents.iErrorCode[FD_WRITE_BIT] != 0) { 1603 LOG(WARNING) << "PhysicalSocketServer got FD_WRITE_BIT error " 1604 << wsaEvents.iErrorCode[FD_WRITE_BIT]; 1605 } 1606 if ((wsaEvents.lNetworkEvents & FD_CONNECT) && 1607 wsaEvents.iErrorCode[FD_CONNECT_BIT] != 0) { 1608 LOG(WARNING) << "PhysicalSocketServer got FD_CONNECT_BIT error " 1609 << wsaEvents.iErrorCode[FD_CONNECT_BIT]; 1610 } 1611 if ((wsaEvents.lNetworkEvents & FD_ACCEPT) && 1612 wsaEvents.iErrorCode[FD_ACCEPT_BIT] != 0) { 1613 LOG(WARNING) << "PhysicalSocketServer got FD_ACCEPT_BIT error " 1614 << wsaEvents.iErrorCode[FD_ACCEPT_BIT]; 1615 } 1616 if ((wsaEvents.lNetworkEvents & FD_CLOSE) && 1617 wsaEvents.iErrorCode[FD_CLOSE_BIT] != 0) { 1618 LOG(WARNING) << "PhysicalSocketServer got FD_CLOSE_BIT error " 1619 << wsaEvents.iErrorCode[FD_CLOSE_BIT]; 1620 } 1621 } 1622#endif 1623 uint32 ff = 0; 1624 int errcode = 0; 1625 if (wsaEvents.lNetworkEvents & FD_READ) 1626 ff |= DE_READ; 1627 if (wsaEvents.lNetworkEvents & FD_WRITE) 1628 ff |= DE_WRITE; 1629 if (wsaEvents.lNetworkEvents & FD_CONNECT) { 1630 if (wsaEvents.iErrorCode[FD_CONNECT_BIT] == 0) { 1631 ff |= DE_CONNECT; 1632 } else { 1633 ff |= DE_CLOSE; 1634 errcode = wsaEvents.iErrorCode[FD_CONNECT_BIT]; 1635 } 1636 } 1637 if (wsaEvents.lNetworkEvents & FD_ACCEPT) 1638 ff |= DE_ACCEPT; 1639 if (wsaEvents.lNetworkEvents & FD_CLOSE) { 1640 ff |= DE_CLOSE; 1641 errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT]; 1642 } 1643 if (ff != 0) { 1644 disp->OnPreEvent(ff); 1645 disp->OnEvent(ff, errcode); 1646 } 1647 } 1648 } 1649 ASSERT(iterators_.back() == &end); 1650 iterators_.pop_back(); 1651 ASSERT(iterators_.back() == &i); 1652 iterators_.pop_back(); 1653 } 1654 1655 // Reset the network event until new activity occurs 1656 WSAResetEvent(socket_ev_); 1657 } 1658 1659 // Break? 1660 if (!fWait_) 1661 break; 1662 cmsElapsed = TimeSince(msStart); 1663 if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) { 1664 break; 1665 } 1666 } 1667 1668 // Done 1669 return true; 1670} 1671#endif // WIN32 1672 1673} // namespace talk_base 1674