1/*
2 * libjingle
3 * Copyright 2004--2005, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 *  1. Redistributions of source code must retain the above copyright notice,
9 *     this list of conditions and the following disclaimer.
10 *  2. Redistributions in binary form must reproduce the above copyright notice,
11 *     this list of conditions and the following disclaimer in the documentation
12 *     and/or other materials provided with the distribution.
13 *  3. The name of the author may not be used to endorse or promote products
14 *     derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#if defined(_MSC_VER) && _MSC_VER < 1300
29#pragma warning(disable:4786)
30#endif
31
32#include <cassert>
33
34#ifdef POSIX
35#include <string.h>
36#include <errno.h>
37#include <fcntl.h>
38#include <sys/time.h>
39#include <unistd.h>
40#include <signal.h>
41#endif
42
43#ifdef WIN32
44#define WIN32_LEAN_AND_MEAN
45#include <windows.h>
46#include <winsock2.h>
47#include <ws2tcpip.h>
48#undef SetPort
49#endif
50
51#include <algorithm>
52#include <map>
53
54#include "talk/base/basictypes.h"
55#include "talk/base/byteorder.h"
56#include "talk/base/common.h"
57#include "talk/base/logging.h"
58#include "talk/base/nethelpers.h"
59#include "talk/base/physicalsocketserver.h"
60#include "talk/base/time.h"
61#include "talk/base/winping.h"
62#include "talk/base/win32socketinit.h"
63
64// stm: this will tell us if we are on OSX
65#ifdef HAVE_CONFIG_H
66#include "config.h"
67#endif
68
69#ifdef POSIX
70#include <netinet/tcp.h>  // for TCP_NODELAY
71#define IP_MTU 14 // Until this is integrated from linux/in.h to netinet/in.h
72typedef void* SockOptArg;
73#endif  // POSIX
74
75#ifdef WIN32
76typedef char* SockOptArg;
77#endif
78
79namespace talk_base {
80
81// Standard MTUs, from RFC 1191
82const uint16 PACKET_MAXIMUMS[] = {
83  65535,    // Theoretical maximum, Hyperchannel
84  32000,    // Nothing
85  17914,    // 16Mb IBM Token Ring
86  8166,     // IEEE 802.4
87  //4464,   // IEEE 802.5 (4Mb max)
88  4352,     // FDDI
89  //2048,   // Wideband Network
90  2002,     // IEEE 802.5 (4Mb recommended)
91  //1536,   // Expermental Ethernet Networks
92  //1500,   // Ethernet, Point-to-Point (default)
93  1492,     // IEEE 802.3
94  1006,     // SLIP, ARPANET
95  //576,    // X.25 Networks
96  //544,    // DEC IP Portal
97  //512,    // NETBIOS
98  508,      // IEEE 802/Source-Rt Bridge, ARCNET
99  296,      // Point-to-Point (low delay)
100  68,       // Official minimum
101  0,        // End of list marker
102};
103
104const uint32 IP_HEADER_SIZE = 20;
105const uint32 ICMP_HEADER_SIZE = 8;
106
107class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
108 public:
109  PhysicalSocket(PhysicalSocketServer* ss, SOCKET s = INVALID_SOCKET)
110    : ss_(ss), s_(s), enabled_events_(0), error_(0),
111      state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED),
112      resolver_(NULL) {
113#ifdef WIN32
114    // EnsureWinsockInit() ensures that winsock is initialized. The default
115    // version of this function doesn't do anything because winsock is
116    // initialized by constructor of a static object. If neccessary libjingle
117    // users can link it with a different version of this function by replacing
118    // win32socketinit.cc. See win32socketinit.cc for more details.
119    EnsureWinsockInit();
120#endif
121    if (s_ != INVALID_SOCKET) {
122      enabled_events_ = DE_READ | DE_WRITE;
123
124      int type = SOCK_STREAM;
125      socklen_t len = sizeof(type);
126      VERIFY(0 == getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len));
127      udp_ = (SOCK_DGRAM == type);
128    }
129  }
130
131  virtual ~PhysicalSocket() {
132    Close();
133  }
134
135  // Creates the underlying OS socket (same as the "socket" function).
136  virtual bool Create(int type) {
137    Close();
138    s_ = ::socket(AF_INET, type, 0);
139    udp_ = (SOCK_DGRAM == type);
140    UpdateLastError();
141    if (udp_)
142      enabled_events_ = DE_READ | DE_WRITE;
143    return s_ != INVALID_SOCKET;
144  }
145
146  SocketAddress GetLocalAddress() const {
147    sockaddr_in addr;
148    socklen_t addrlen = sizeof(addr);
149    int result = ::getsockname(s_, (sockaddr*)&addr, &addrlen);
150    SocketAddress address;
151    if (result >= 0) {
152      ASSERT(addrlen == sizeof(addr));
153      address.FromSockAddr(addr);
154    } else {
155      LOG(LS_WARNING) << "GetLocalAddress: unable to get local addr, socket="
156                      << s_;
157    }
158    return address;
159  }
160
161  SocketAddress GetRemoteAddress() const {
162    sockaddr_in addr;
163    socklen_t addrlen = sizeof(addr);
164    int result = ::getpeername(s_, (sockaddr*)&addr, &addrlen);
165    SocketAddress address;
166    if (result >= 0) {
167      ASSERT(addrlen == sizeof(addr));
168      address.FromSockAddr(addr);
169    } else {
170      LOG(LS_WARNING) << "GetRemoteAddress: unable to get remote addr, socket="
171                      << s_;
172    }
173    return address;
174  }
175
176  int Bind(const SocketAddress& addr) {
177    sockaddr_in saddr;
178    addr.ToSockAddr(&saddr);
179    int err = ::bind(s_, (sockaddr*)&saddr, sizeof(saddr));
180    UpdateLastError();
181#ifdef _DEBUG
182    if (0 == err) {
183      dbg_addr_ = "Bound @ ";
184      dbg_addr_.append(GetLocalAddress().ToString());
185    }
186#endif  // _DEBUG
187    return err;
188  }
189
190  int Connect(const SocketAddress& addr) {
191    // TODO: Implicit creation is required to reconnect...
192    // ...but should we make it more explicit?
193    if ((s_ == INVALID_SOCKET) && !Create(SOCK_STREAM))
194      return SOCKET_ERROR;
195    if (addr.IsUnresolved()) {
196      if (state_ != CS_CLOSED) {
197        SetError(EALREADY);
198        return SOCKET_ERROR;
199      }
200
201      LOG(LS_VERBOSE) << "Resolving addr in PhysicalSocket::Connect";
202      resolver_ = new AsyncResolver();
203      resolver_->set_address(addr);
204      resolver_->SignalWorkDone.connect(this, &PhysicalSocket::OnResolveResult);
205      resolver_->Start();
206      state_ = CS_CONNECTING;
207      return 0;
208    }
209
210    return DoConnect(addr);
211  }
212
213  int DoConnect(const SocketAddress& addr) {
214    sockaddr_in saddr;
215    addr.ToSockAddr(&saddr);
216    int err = ::connect(s_, (sockaddr*)&saddr, sizeof(saddr));
217    UpdateLastError();
218    if (err == 0) {
219      state_ = CS_CONNECTED;
220    } else if (IsBlockingError(error_)) {
221      state_ = CS_CONNECTING;
222      enabled_events_ |= DE_CONNECT;
223    } else {
224      return SOCKET_ERROR;
225    }
226
227    enabled_events_ |= DE_READ | DE_WRITE;
228    return 0;
229  }
230
231  int GetError() const {
232    return error_;
233  }
234
235  void SetError(int error) {
236    error_ = error;
237  }
238
239  ConnState GetState() const {
240    return state_;
241  }
242
243  int GetOption(Option opt, int* value) {
244    int slevel;
245    int sopt;
246    if (TranslateOption(opt, &slevel, &sopt) == -1)
247      return -1;
248    socklen_t optlen = sizeof(*value);
249    int ret = ::getsockopt(s_, slevel, sopt, (SockOptArg)value, &optlen);
250    if (ret != -1 && opt == OPT_DONTFRAGMENT) {
251#ifdef LINUX
252      *value = (*value != IP_PMTUDISC_DONT) ? 1 : 0;
253#endif
254    }
255    return ret;
256  }
257
258  int SetOption(Option opt, int value) {
259    int slevel;
260    int sopt;
261    if (TranslateOption(opt, &slevel, &sopt) == -1)
262      return -1;
263    if (opt == OPT_DONTFRAGMENT) {
264#ifdef LINUX
265      value = (value) ? IP_PMTUDISC_DO : IP_PMTUDISC_DONT;
266#endif
267    }
268    return ::setsockopt(s_, slevel, sopt, (SockOptArg)&value, sizeof(value));
269  }
270
271  int Send(const void *pv, size_t cb) {
272    int sent = ::send(s_, reinterpret_cast<const char *>(pv), (int)cb,
273#ifdef LINUX
274        // Suppress SIGPIPE. Without this, attempting to send on a socket whose
275        // other end is closed will result in a SIGPIPE signal being raised to
276        // our process, which by default will terminate the process, which we
277        // don't want. By specifying this flag, we'll just get the error EPIPE
278        // instead and can handle the error gracefully.
279        MSG_NOSIGNAL
280#else
281        0
282#endif
283        );
284    UpdateLastError();
285    // We have seen minidumps where this may be false.
286    ASSERT(sent <= static_cast<int>(cb));
287    if ((sent < 0) && IsBlockingError(error_)) {
288      enabled_events_ |= DE_WRITE;
289    }
290    return sent;
291  }
292
293  int SendTo(const void *pv, size_t cb, const SocketAddress& addr) {
294    sockaddr_in saddr;
295    addr.ToSockAddr(&saddr);
296    int sent = ::sendto(
297        s_, (const char *)pv, (int)cb,
298#ifdef LINUX
299        // Suppress SIGPIPE. See above for explanation.
300        MSG_NOSIGNAL,
301#else
302        0,
303#endif
304        (sockaddr*)&saddr, sizeof(saddr));
305    UpdateLastError();
306    // We have seen minidumps where this may be false.
307    ASSERT(sent <= static_cast<int>(cb));
308    if ((sent < 0) && IsBlockingError(error_)) {
309      enabled_events_ |= DE_WRITE;
310    }
311    return sent;
312  }
313
314  int Recv(void *pv, size_t cb) {
315    int received = ::recv(s_, (char *)pv, (int)cb, 0);
316    if ((received == 0) && (cb != 0)) {
317      // Note: on graceful shutdown, recv can return 0.  In this case, we
318      // pretend it is blocking, and then signal close, so that simplifying
319      // assumptions can be made about Recv.
320      LOG(LS_WARNING) << "EOF from socket; deferring close event";
321      // Must turn this back on so that the select() loop will notice the close
322      // event.
323      enabled_events_ |= DE_READ;
324      error_ = EWOULDBLOCK;
325      return SOCKET_ERROR;
326    }
327    UpdateLastError();
328    bool success = (received >= 0) || IsBlockingError(error_);
329    if (udp_ || success) {
330      enabled_events_ |= DE_READ;
331    }
332    if (!success) {
333      LOG_F(LS_VERBOSE) << "Error = " << error_;
334    }
335    return received;
336  }
337
338  int RecvFrom(void *pv, size_t cb, SocketAddress *paddr) {
339    sockaddr_in saddr;
340    socklen_t cbAddr = sizeof(saddr);
341    int received = ::recvfrom(s_, (char *)pv, (int)cb, 0, (sockaddr*)&saddr,
342                              &cbAddr);
343    UpdateLastError();
344    if ((received >= 0) && (paddr != NULL))
345      paddr->FromSockAddr(saddr);
346    bool success = (received >= 0) || IsBlockingError(error_);
347    if (udp_ || success) {
348      enabled_events_ |= DE_READ;
349    }
350    if (!success) {
351      LOG_F(LS_VERBOSE) << "Error = " << error_;
352    }
353    return received;
354  }
355
356  int Listen(int backlog) {
357    int err = ::listen(s_, backlog);
358    UpdateLastError();
359    if (err == 0) {
360      state_ = CS_CONNECTING;
361      enabled_events_ |= DE_ACCEPT;
362#ifdef _DEBUG
363      dbg_addr_ = "Listening @ ";
364      dbg_addr_.append(GetLocalAddress().ToString());
365#endif  // _DEBUG
366    }
367    return err;
368  }
369
370  AsyncSocket* Accept(SocketAddress *paddr) {
371    sockaddr_in saddr;
372    socklen_t cbAddr = sizeof(saddr);
373    SOCKET s = ::accept(s_, (sockaddr*)&saddr, &cbAddr);
374    UpdateLastError();
375    if (s == INVALID_SOCKET)
376      return NULL;
377    enabled_events_ |= DE_ACCEPT;
378    if (paddr != NULL)
379      paddr->FromSockAddr(saddr);
380    return ss_->WrapSocket(s);
381  }
382
383  int Close() {
384    if (s_ == INVALID_SOCKET)
385      return 0;
386    int err = ::closesocket(s_);
387    UpdateLastError();
388    s_ = INVALID_SOCKET;
389    state_ = CS_CLOSED;
390    enabled_events_ = 0;
391    if (resolver_) {
392      resolver_->Destroy(false);
393      resolver_ = NULL;
394    }
395    return err;
396  }
397
398  int EstimateMTU(uint16* mtu) {
399    SocketAddress addr = GetRemoteAddress();
400    if (addr.IsAny()) {
401      error_ = ENOTCONN;
402      return -1;
403    }
404
405#if defined(WIN32)
406    // Gets the interface MTU (TTL=1) for the interface used to reach |addr|.
407    WinPing ping;
408    if (!ping.IsValid()) {
409      error_ = EINVAL; // can't think of a better error ID
410      return -1;
411    }
412
413    for (int level = 0; PACKET_MAXIMUMS[level + 1] > 0; ++level) {
414      int32 size = PACKET_MAXIMUMS[level] - IP_HEADER_SIZE - ICMP_HEADER_SIZE;
415      WinPing::PingResult result = ping.Ping(addr.ip(), size, 0, 1, false);
416      if (result == WinPing::PING_FAIL) {
417        error_ = EINVAL; // can't think of a better error ID
418        return -1;
419      } else if (result != WinPing::PING_TOO_LARGE) {
420        *mtu = PACKET_MAXIMUMS[level];
421        return 0;
422      }
423    }
424
425    ASSERT(false);
426    return -1;
427#elif defined(IOS) || defined(OSX)
428    // No simple way to do this on Mac OS X.
429    // SIOCGIFMTU would work if we knew which interface would be used, but
430    // figuring that out is pretty complicated. For now we'll return an error
431    // and let the caller pick a default MTU.
432    error_ = EINVAL;
433    return -1;
434#elif defined(LINUX) || defined(ANDROID)
435    // Gets the path MTU.
436    int value;
437    socklen_t vlen = sizeof(value);
438    int err = getsockopt(s_, IPPROTO_IP, IP_MTU, &value, &vlen);
439    if (err < 0) {
440      UpdateLastError();
441      return err;
442    }
443
444    ASSERT((0 <= value) && (value <= 65536));
445    *mtu = value;
446    return 0;
447#endif
448  }
449
450  SocketServer* socketserver() { return ss_; }
451
452 protected:
453  void OnResolveResult(SignalThread* thread) {
454    if (thread != resolver_) {
455      return;
456    }
457
458    int error = resolver_->error();
459    if (error == 0) {
460      error = DoConnect(resolver_->address());
461    } else {
462      Close();
463    }
464
465    if (error) {
466      error_ = error;
467      SignalCloseEvent(this, error_);
468    }
469  }
470
471  void UpdateLastError() {
472    error_ = LAST_SYSTEM_ERROR;
473  }
474
475  static int TranslateOption(Option opt, int* slevel, int* sopt) {
476    switch (opt) {
477      case OPT_DONTFRAGMENT:
478#ifdef WIN32
479        *slevel = IPPROTO_IP;
480        *sopt = IP_DONTFRAGMENT;
481        break;
482#elif defined(IOS) || defined(OSX) || defined(BSD)
483        LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported.";
484        return -1;
485#elif defined(POSIX)
486        *slevel = IPPROTO_IP;
487        *sopt = IP_MTU_DISCOVER;
488        break;
489#endif
490      case OPT_RCVBUF:
491        *slevel = SOL_SOCKET;
492        *sopt = SO_RCVBUF;
493        break;
494      case OPT_SNDBUF:
495        *slevel = SOL_SOCKET;
496        *sopt = SO_SNDBUF;
497        break;
498      case OPT_NODELAY:
499        *slevel = IPPROTO_TCP;
500        *sopt = TCP_NODELAY;
501        break;
502      default:
503        ASSERT(false);
504        return -1;
505    }
506    return 0;
507  }
508
509  PhysicalSocketServer* ss_;
510  SOCKET s_;
511  uint8 enabled_events_;
512  bool udp_;
513  int error_;
514  ConnState state_;
515  AsyncResolver* resolver_;
516
517#ifdef _DEBUG
518  std::string dbg_addr_;
519#endif  // _DEBUG;
520};
521
522#ifdef POSIX
523class EventDispatcher : public Dispatcher {
524 public:
525  EventDispatcher(PhysicalSocketServer* ss) : ss_(ss), fSignaled_(false) {
526    if (pipe(afd_) < 0)
527      LOG(LERROR) << "pipe failed";
528    ss_->Add(this);
529  }
530
531  virtual ~EventDispatcher() {
532    ss_->Remove(this);
533    close(afd_[0]);
534    close(afd_[1]);
535  }
536
537  virtual void Signal() {
538    CritScope cs(&crit_);
539    if (!fSignaled_) {
540      const uint8 b[1] = { 0 };
541      if (VERIFY(1 == write(afd_[1], b, sizeof(b)))) {
542        fSignaled_ = true;
543      }
544    }
545  }
546
547  virtual uint32 GetRequestedEvents() {
548    return DE_READ;
549  }
550
551  virtual void OnPreEvent(uint32 ff) {
552    // It is not possible to perfectly emulate an auto-resetting event with
553    // pipes.  This simulates it by resetting before the event is handled.
554
555    CritScope cs(&crit_);
556    if (fSignaled_) {
557      uint8 b[4];  // Allow for reading more than 1 byte, but expect 1.
558      VERIFY(1 == read(afd_[0], b, sizeof(b)));
559      fSignaled_ = false;
560    }
561  }
562
563  virtual void OnEvent(uint32 ff, int err) {
564    ASSERT(false);
565  }
566
567  virtual int GetDescriptor() {
568    return afd_[0];
569  }
570
571  virtual bool IsDescriptorClosed() {
572    return false;
573  }
574
575 private:
576  PhysicalSocketServer *ss_;
577  int afd_[2];
578  bool fSignaled_;
579  CriticalSection crit_;
580};
581
582// These two classes use the self-pipe trick to deliver POSIX signals to our
583// select loop. This is the only safe, reliable, cross-platform way to do
584// non-trivial things with a POSIX signal in an event-driven program (until
585// proper pselect() implementations become ubiquitous).
586
587class PosixSignalHandler {
588 public:
589  // POSIX only specifies 32 signals, but in principle the system might have
590  // more and the programmer might choose to use them, so we size our array
591  // for 128.
592  static const int kNumPosixSignals = 128;
593
594  static PosixSignalHandler *Instance() { return &instance_; }
595
596  // Returns true if the given signal number is set.
597  bool IsSignalSet(int signum) const {
598    ASSERT(signum < ARRAY_SIZE(received_signal_));
599    if (signum < ARRAY_SIZE(received_signal_)) {
600      return received_signal_[signum];
601    } else {
602      return false;
603    }
604  }
605
606  // Clears the given signal number.
607  void ClearSignal(int signum) {
608    ASSERT(signum < ARRAY_SIZE(received_signal_));
609    if (signum < ARRAY_SIZE(received_signal_)) {
610      received_signal_[signum] = false;
611    }
612  }
613
614  // Returns the file descriptor to monitor for signal events.
615  int GetDescriptor() const {
616    return afd_[0];
617  }
618
619  // This is called directly from our real signal handler, so it must be
620  // signal-handler-safe. That means it cannot assume anything about the
621  // user-level state of the process, since the handler could be executed at any
622  // time on any thread.
623  void OnPosixSignalReceived(int signum) {
624    if (signum >= ARRAY_SIZE(received_signal_)) {
625      // We don't have space in our array for this.
626      return;
627    }
628    // Set a flag saying we've seen this signal.
629    received_signal_[signum] = true;
630    // Notify application code that we got a signal.
631    const uint8 b[1] = { 0 };
632    if (-1 == write(afd_[1], b, sizeof(b))) {
633      // Nothing we can do here. If there's an error somehow then there's
634      // nothing we can safely do from a signal handler.
635      // No, we can't even safely log it.
636      // But, we still have to check the return value here. Otherwise,
637      // GCC 4.4.1 complains ignoring return value. Even (void) doesn't help.
638      return;
639    }
640  }
641
642 private:
643  PosixSignalHandler() {
644    if (pipe(afd_) < 0) {
645      LOG_ERR(LS_ERROR) << "pipe failed";
646      return;
647    }
648    if (fcntl(afd_[0], F_SETFL, O_NONBLOCK) < 0) {
649      LOG_ERR(LS_WARNING) << "fcntl #1 failed";
650    }
651    if (fcntl(afd_[1], F_SETFL, O_NONBLOCK) < 0) {
652      LOG_ERR(LS_WARNING) << "fcntl #2 failed";
653    }
654    memset(const_cast<void *>(static_cast<volatile void *>(received_signal_)),
655           0,
656           sizeof(received_signal_));
657  }
658
659  ~PosixSignalHandler() {
660    int fd1 = afd_[0];
661    int fd2 = afd_[1];
662    // We clobber the stored file descriptor numbers here or else in principle
663    // a signal that happens to be delivered during application termination
664    // could erroneously write a zero byte to an unrelated file handle in
665    // OnPosixSignalReceived() if some other file happens to be opened later
666    // during shutdown and happens to be given the same file descriptor number
667    // as our pipe had. Unfortunately even with this precaution there is still a
668    // race where that could occur if said signal happens to be handled
669    // concurrently with this code and happens to have already read the value of
670    // afd_[1] from memory before we clobber it, but that's unlikely.
671    afd_[0] = -1;
672    afd_[1] = -1;
673    close(fd1);
674    close(fd2);
675  }
676
677  // There is just a single global instance. (Signal handlers do not get any
678  // sort of user-defined void * parameter, so they can't access anything that
679  // isn't global.)
680  static PosixSignalHandler instance_;
681
682  int afd_[2];
683  // These are boolean flags that will be set in our signal handler and read
684  // and cleared from Wait(). There is a race involved in this, but it is
685  // benign. The signal handler sets the flag before signaling the pipe, so
686  // we'll never end up blocking in select() while a flag is still true.
687  // However, if two of the same signal arrive close to each other then it's
688  // possible that the second time the handler may set the flag while it's still
689  // true, meaning that signal will be missed. But the first occurrence of it
690  // will still be handled, so this isn't a problem.
691  // Volatile is not necessary here for correctness, but this data _is_ volatile
692  // so I've marked it as such.
693  volatile uint8 received_signal_[kNumPosixSignals];
694};
695
696PosixSignalHandler PosixSignalHandler::instance_;
697
698class PosixSignalDispatcher : public Dispatcher {
699 public:
700  PosixSignalDispatcher(PhysicalSocketServer *owner) : owner_(owner) {
701    owner_->Add(this);
702  }
703
704  virtual ~PosixSignalDispatcher() {
705    owner_->Remove(this);
706  }
707
708  virtual uint32 GetRequestedEvents() {
709    return DE_READ;
710  }
711
712  virtual void OnPreEvent(uint32 ff) {
713    // Events might get grouped if signals come very fast, so we read out up to
714    // 16 bytes to make sure we keep the pipe empty.
715    uint8 b[16];
716    ssize_t ret = read(GetDescriptor(), b, sizeof(b));
717    if (ret < 0) {
718      LOG_ERR(LS_WARNING) << "Error in read()";
719    } else if (ret == 0) {
720      LOG(LS_WARNING) << "Should have read at least one byte";
721    }
722  }
723
724  virtual void OnEvent(uint32 ff, int err) {
725    for (int signum = 0; signum < PosixSignalHandler::kNumPosixSignals;
726         ++signum) {
727      if (PosixSignalHandler::Instance()->IsSignalSet(signum)) {
728        PosixSignalHandler::Instance()->ClearSignal(signum);
729        HandlerMap::iterator i = handlers_.find(signum);
730        if (i == handlers_.end()) {
731          // This can happen if a signal is delivered to our process at around
732          // the same time as we unset our handler for it. It is not an error
733          // condition, but it's unusual enough to be worth logging.
734          LOG(LS_INFO) << "Received signal with no handler: " << signum;
735        } else {
736          // Otherwise, execute our handler.
737          (*i->second)(signum);
738        }
739      }
740    }
741  }
742
743  virtual int GetDescriptor() {
744    return PosixSignalHandler::Instance()->GetDescriptor();
745  }
746
747  virtual bool IsDescriptorClosed() {
748    return false;
749  }
750
751  void SetHandler(int signum, void (*handler)(int)) {
752    handlers_[signum] = handler;
753  }
754
755  void ClearHandler(int signum) {
756    handlers_.erase(signum);
757  }
758
759  bool HasHandlers() {
760    return !handlers_.empty();
761  }
762
763 private:
764  typedef std::map<int, void (*)(int)> HandlerMap;
765
766  HandlerMap handlers_;
767  // Our owner.
768  PhysicalSocketServer *owner_;
769};
770
771class SocketDispatcher : public Dispatcher, public PhysicalSocket {
772 public:
773  explicit SocketDispatcher(PhysicalSocketServer *ss) : PhysicalSocket(ss) {
774  }
775  SocketDispatcher(SOCKET s, PhysicalSocketServer *ss) : PhysicalSocket(ss, s) {
776  }
777
778  virtual ~SocketDispatcher() {
779    Close();
780  }
781
782  bool Initialize() {
783    ss_->Add(this);
784    fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK);
785    return true;
786  }
787
788  virtual bool Create(int type) {
789    // Change the socket to be non-blocking.
790    if (!PhysicalSocket::Create(type))
791      return false;
792
793    return Initialize();
794  }
795
796  virtual int GetDescriptor() {
797    return s_;
798  }
799
800  virtual bool IsDescriptorClosed() {
801    // We don't have a reliable way of distinguishing end-of-stream
802    // from readability.  So test on each readable call.  Is this
803    // inefficient?  Probably.
804    char ch;
805    ssize_t res = ::recv(s_, &ch, 1, MSG_PEEK);
806    if (res > 0) {
807      // Data available, so not closed.
808      return false;
809    } else if (res == 0) {
810      // EOF, so closed.
811      return true;
812    } else {  // error
813      switch (errno) {
814        // Returned if we've already closed s_.
815        case EBADF:
816        // Returned during ungraceful peer shutdown.
817        case ECONNRESET:
818          return true;
819        default:
820          // Assume that all other errors are just blocking errors, meaning the
821          // connection is still good but we just can't read from it right now.
822          // This should only happen when connecting (and at most once), because
823          // in all other cases this function is only called if the file
824          // descriptor is already known to be in the readable state. However,
825          // it's not necessary a problem if we spuriously interpret a
826          // "connection lost"-type error as a blocking error, because typically
827          // the next recv() will get EOF, so we'll still eventually notice that
828          // the socket is closed.
829          LOG_ERR(LS_WARNING) << "Assuming benign blocking error";
830          return false;
831      }
832    }
833  }
834
835  virtual uint32 GetRequestedEvents() {
836    return enabled_events_;
837  }
838
839  virtual void OnPreEvent(uint32 ff) {
840    if ((ff & DE_CONNECT) != 0)
841      state_ = CS_CONNECTED;
842    if ((ff & DE_CLOSE) != 0)
843      state_ = CS_CLOSED;
844  }
845
846  virtual void OnEvent(uint32 ff, int err) {
847    if ((ff & DE_READ) != 0) {
848      enabled_events_ &= ~DE_READ;
849      SignalReadEvent(this);
850    }
851    if ((ff & DE_WRITE) != 0) {
852      enabled_events_ &= ~DE_WRITE;
853      SignalWriteEvent(this);
854    }
855    if ((ff & DE_CONNECT) != 0) {
856      enabled_events_ &= ~DE_CONNECT;
857      SignalConnectEvent(this);
858    }
859    if ((ff & DE_ACCEPT) != 0) {
860      enabled_events_ &= ~DE_ACCEPT;
861      SignalReadEvent(this);
862    }
863    if ((ff & DE_CLOSE) != 0) {
864      // The socket is now dead to us, so stop checking it.
865      enabled_events_ = 0;
866      SignalCloseEvent(this, err);
867    }
868  }
869
870  virtual int Close() {
871    if (s_ == INVALID_SOCKET)
872      return 0;
873
874    ss_->Remove(this);
875    return PhysicalSocket::Close();
876  }
877};
878
879class FileDispatcher: public Dispatcher, public AsyncFile {
880 public:
881  FileDispatcher(int fd, PhysicalSocketServer *ss) : ss_(ss), fd_(fd) {
882    set_readable(true);
883
884    ss_->Add(this);
885
886    fcntl(fd_, F_SETFL, fcntl(fd_, F_GETFL, 0) | O_NONBLOCK);
887  }
888
889  virtual ~FileDispatcher() {
890    ss_->Remove(this);
891  }
892
893  SocketServer* socketserver() { return ss_; }
894
895  virtual int GetDescriptor() {
896    return fd_;
897  }
898
899  virtual bool IsDescriptorClosed() {
900    return false;
901  }
902
903  virtual uint32 GetRequestedEvents() {
904    return flags_;
905  }
906
907  virtual void OnPreEvent(uint32 ff) {
908  }
909
910  virtual void OnEvent(uint32 ff, int err) {
911    if ((ff & DE_READ) != 0)
912      SignalReadEvent(this);
913    if ((ff & DE_WRITE) != 0)
914      SignalWriteEvent(this);
915    if ((ff & DE_CLOSE) != 0)
916      SignalCloseEvent(this, err);
917  }
918
919  virtual bool readable() {
920    return (flags_ & DE_READ) != 0;
921  }
922
923  virtual void set_readable(bool value) {
924    flags_ = value ? (flags_ | DE_READ) : (flags_ & ~DE_READ);
925  }
926
927  virtual bool writable() {
928    return (flags_ & DE_WRITE) != 0;
929  }
930
931  virtual void set_writable(bool value) {
932    flags_ = value ? (flags_ | DE_WRITE) : (flags_ & ~DE_WRITE);
933  }
934
935 private:
936  PhysicalSocketServer* ss_;
937  int fd_;
938  int flags_;
939};
940
941AsyncFile* PhysicalSocketServer::CreateFile(int fd) {
942  return new FileDispatcher(fd, this);
943}
944
945#endif // POSIX
946
947#ifdef WIN32
948static uint32 FlagsToEvents(uint32 events) {
949  uint32 ffFD = FD_CLOSE;
950  if (events & DE_READ)
951    ffFD |= FD_READ;
952  if (events & DE_WRITE)
953    ffFD |= FD_WRITE;
954  if (events & DE_CONNECT)
955    ffFD |= FD_CONNECT;
956  if (events & DE_ACCEPT)
957    ffFD |= FD_ACCEPT;
958  return ffFD;
959}
960
961class EventDispatcher : public Dispatcher {
962 public:
963  EventDispatcher(PhysicalSocketServer *ss) : ss_(ss) {
964    hev_ = WSACreateEvent();
965    if (hev_) {
966      ss_->Add(this);
967    }
968  }
969
970  ~EventDispatcher() {
971    if (hev_ != NULL) {
972      ss_->Remove(this);
973      WSACloseEvent(hev_);
974      hev_ = NULL;
975    }
976  }
977
978  virtual void Signal() {
979    if (hev_ != NULL)
980      WSASetEvent(hev_);
981  }
982
983  virtual uint32 GetRequestedEvents() {
984    return 0;
985  }
986
987  virtual void OnPreEvent(uint32 ff) {
988    WSAResetEvent(hev_);
989  }
990
991  virtual void OnEvent(uint32 ff, int err) {
992  }
993
994  virtual WSAEVENT GetWSAEvent() {
995    return hev_;
996  }
997
998  virtual SOCKET GetSocket() {
999    return INVALID_SOCKET;
1000  }
1001
1002  virtual bool CheckSignalClose() { return false; }
1003
1004private:
1005  PhysicalSocketServer* ss_;
1006  WSAEVENT hev_;
1007};
1008
1009class SocketDispatcher : public Dispatcher, public PhysicalSocket {
1010 public:
1011  static int next_id_;
1012  int id_;
1013  bool signal_close_;
1014  int signal_err_;
1015
1016  SocketDispatcher(PhysicalSocketServer* ss)
1017      : PhysicalSocket(ss),
1018        id_(0),
1019        signal_close_(false) {
1020  }
1021
1022  SocketDispatcher(SOCKET s, PhysicalSocketServer* ss)
1023      : PhysicalSocket(ss, s),
1024        id_(0),
1025        signal_close_(false) {
1026  }
1027
1028  virtual ~SocketDispatcher() {
1029    Close();
1030  }
1031
1032  bool Initialize() {
1033    ASSERT(s_ != INVALID_SOCKET);
1034    // Must be a non-blocking
1035    u_long argp = 1;
1036    ioctlsocket(s_, FIONBIO, &argp);
1037    ss_->Add(this);
1038    return true;
1039  }
1040
1041  virtual bool Create(int type) {
1042    // Create socket
1043    if (!PhysicalSocket::Create(type))
1044      return false;
1045
1046    if (!Initialize())
1047      return false;
1048
1049    do { id_ = ++next_id_; } while (id_ == 0);
1050    return true;
1051  }
1052
1053  virtual int Close() {
1054    if (s_ == INVALID_SOCKET)
1055      return 0;
1056
1057    id_ = 0;
1058    signal_close_ = false;
1059    ss_->Remove(this);
1060    return PhysicalSocket::Close();
1061  }
1062
1063  virtual uint32 GetRequestedEvents() {
1064    return enabled_events_;
1065  }
1066
1067  virtual void OnPreEvent(uint32 ff) {
1068    if ((ff & DE_CONNECT) != 0)
1069      state_ = CS_CONNECTED;
1070    // We set CS_CLOSED from CheckSignalClose.
1071  }
1072
1073  virtual void OnEvent(uint32 ff, int err) {
1074    int cache_id = id_;
1075    if ((ff & DE_READ) != 0) {
1076      enabled_events_ &= ~DE_READ;
1077      SignalReadEvent(this);
1078    }
1079    if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) {
1080      enabled_events_ &= ~DE_WRITE;
1081      SignalWriteEvent(this);
1082    }
1083    if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) {
1084      if (ff != DE_CONNECT)
1085        LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff;
1086      enabled_events_ &= ~DE_CONNECT;
1087#ifdef _DEBUG
1088      dbg_addr_ = "Connected @ ";
1089      dbg_addr_.append(GetRemoteAddress().ToString());
1090#endif  // _DEBUG
1091      SignalConnectEvent(this);
1092    }
1093    if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) {
1094      enabled_events_ &= ~DE_ACCEPT;
1095      SignalReadEvent(this);
1096    }
1097    if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) {
1098      signal_close_ = true;
1099      signal_err_ = err;
1100    }
1101  }
1102
1103  virtual WSAEVENT GetWSAEvent() {
1104    return WSA_INVALID_EVENT;
1105  }
1106
1107  virtual SOCKET GetSocket() {
1108    return s_;
1109  }
1110
1111  virtual bool CheckSignalClose() {
1112    if (!signal_close_)
1113      return false;
1114
1115    char ch;
1116    if (recv(s_, &ch, 1, MSG_PEEK) > 0)
1117      return false;
1118
1119    state_ = CS_CLOSED;
1120    signal_close_ = false;
1121    SignalCloseEvent(this, signal_err_);
1122    return true;
1123  }
1124};
1125
1126int SocketDispatcher::next_id_ = 0;
1127
1128#endif  // WIN32
1129
1130// Sets the value of a boolean value to false when signaled.
1131class Signaler : public EventDispatcher {
1132 public:
1133  Signaler(PhysicalSocketServer* ss, bool* pf)
1134      : EventDispatcher(ss), pf_(pf) {
1135  }
1136  virtual ~Signaler() { }
1137
1138  void OnEvent(uint32 ff, int err) {
1139    if (pf_)
1140      *pf_ = false;
1141  }
1142
1143 private:
1144  bool *pf_;
1145};
1146
1147PhysicalSocketServer::PhysicalSocketServer()
1148    : fWait_(false),
1149      last_tick_tracked_(0),
1150      last_tick_dispatch_count_(0) {
1151  signal_wakeup_ = new Signaler(this, &fWait_);
1152#ifdef WIN32
1153  socket_ev_ = WSACreateEvent();
1154#endif
1155}
1156
1157PhysicalSocketServer::~PhysicalSocketServer() {
1158#ifdef WIN32
1159  WSACloseEvent(socket_ev_);
1160#endif
1161#ifdef POSIX
1162  signal_dispatcher_.reset();
1163#endif
1164  delete signal_wakeup_;
1165  ASSERT(dispatchers_.empty());
1166}
1167
1168void PhysicalSocketServer::WakeUp() {
1169  signal_wakeup_->Signal();
1170}
1171
1172Socket* PhysicalSocketServer::CreateSocket(int type) {
1173  PhysicalSocket* socket = new PhysicalSocket(this);
1174  if (socket->Create(type)) {
1175    return socket;
1176  } else {
1177    delete socket;
1178    return 0;
1179  }
1180}
1181
1182AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int type) {
1183  SocketDispatcher* dispatcher = new SocketDispatcher(this);
1184  if (dispatcher->Create(type)) {
1185    return dispatcher;
1186  } else {
1187    delete dispatcher;
1188    return 0;
1189  }
1190}
1191
1192AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) {
1193  SocketDispatcher* dispatcher = new SocketDispatcher(s, this);
1194  if (dispatcher->Initialize()) {
1195    return dispatcher;
1196  } else {
1197    delete dispatcher;
1198    return 0;
1199  }
1200}
1201
1202void PhysicalSocketServer::Add(Dispatcher *pdispatcher) {
1203  CritScope cs(&crit_);
1204  // Prevent duplicates. This can cause dead dispatchers to stick around.
1205  DispatcherList::iterator pos = std::find(dispatchers_.begin(),
1206                                           dispatchers_.end(),
1207                                           pdispatcher);
1208  if (pos != dispatchers_.end())
1209    return;
1210  dispatchers_.push_back(pdispatcher);
1211}
1212
1213void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) {
1214  CritScope cs(&crit_);
1215  DispatcherList::iterator pos = std::find(dispatchers_.begin(),
1216                                           dispatchers_.end(),
1217                                           pdispatcher);
1218  ASSERT(pos != dispatchers_.end());
1219  size_t index = pos - dispatchers_.begin();
1220  dispatchers_.erase(pos);
1221  for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end();
1222       ++it) {
1223    if (index < **it) {
1224      --**it;
1225    }
1226  }
1227}
1228
1229#ifdef POSIX
1230bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1231  // Calculate timing information
1232
1233  struct timeval *ptvWait = NULL;
1234  struct timeval tvWait;
1235  struct timeval tvStop;
1236  if (cmsWait != kForever) {
1237    // Calculate wait timeval
1238    tvWait.tv_sec = cmsWait / 1000;
1239    tvWait.tv_usec = (cmsWait % 1000) * 1000;
1240    ptvWait = &tvWait;
1241
1242    // Calculate when to return in a timeval
1243    gettimeofday(&tvStop, NULL);
1244    tvStop.tv_sec += tvWait.tv_sec;
1245    tvStop.tv_usec += tvWait.tv_usec;
1246    if (tvStop.tv_usec >= 1000000) {
1247      tvStop.tv_usec -= 1000000;
1248      tvStop.tv_sec += 1;
1249    }
1250  }
1251
1252  // Zero all fd_sets. Don't need to do this inside the loop since
1253  // select() zeros the descriptors not signaled
1254
1255  fd_set fdsRead;
1256  FD_ZERO(&fdsRead);
1257  fd_set fdsWrite;
1258  FD_ZERO(&fdsWrite);
1259
1260  fWait_ = true;
1261
1262  while (fWait_) {
1263    int fdmax = -1;
1264    {
1265      CritScope cr(&crit_);
1266      for (size_t i = 0; i < dispatchers_.size(); ++i) {
1267        // Query dispatchers for read and write wait state
1268        Dispatcher *pdispatcher = dispatchers_[i];
1269        ASSERT(pdispatcher);
1270        if (!process_io && (pdispatcher != signal_wakeup_))
1271          continue;
1272        int fd = pdispatcher->GetDescriptor();
1273        if (fd > fdmax)
1274          fdmax = fd;
1275
1276        uint32 ff = pdispatcher->GetRequestedEvents();
1277        if (ff & (DE_READ | DE_ACCEPT))
1278          FD_SET(fd, &fdsRead);
1279        if (ff & (DE_WRITE | DE_CONNECT))
1280          FD_SET(fd, &fdsWrite);
1281      }
1282    }
1283
1284    // Wait then call handlers as appropriate
1285    // < 0 means error
1286    // 0 means timeout
1287    // > 0 means count of descriptors ready
1288    int n = select(fdmax + 1, &fdsRead, &fdsWrite, NULL, ptvWait);
1289
1290    // If error, return error.
1291    if (n < 0) {
1292      if (errno != EINTR) {
1293        LOG_E(LS_ERROR, EN, errno) << "select";
1294        return false;
1295      }
1296      // Else ignore the error and keep going. If this EINTR was for one of the
1297      // signals managed by this PhysicalSocketServer, the
1298      // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1299      // iteration.
1300    } else if (n == 0) {
1301      // If timeout, return success
1302      return true;
1303    } else {
1304      // We have signaled descriptors
1305      CritScope cr(&crit_);
1306      for (size_t i = 0; i < dispatchers_.size(); ++i) {
1307        Dispatcher *pdispatcher = dispatchers_[i];
1308        int fd = pdispatcher->GetDescriptor();
1309        uint32 ff = 0;
1310        int errcode = 0;
1311
1312        // Reap any error code, which can be signaled through reads or writes.
1313        // TODO: Should we set errcode if getsockopt fails?
1314        if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) {
1315          socklen_t len = sizeof(errcode);
1316          ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len);
1317        }
1318
1319        // Check readable descriptors. If we're waiting on an accept, signal
1320        // that. Otherwise we're waiting for data, check to see if we're
1321        // readable or really closed.
1322        // TODO: Only peek at TCP descriptors.
1323        if (FD_ISSET(fd, &fdsRead)) {
1324          FD_CLR(fd, &fdsRead);
1325          if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) {
1326            ff |= DE_ACCEPT;
1327          } else if (errcode || pdispatcher->IsDescriptorClosed()) {
1328            ff |= DE_CLOSE;
1329          } else {
1330            ff |= DE_READ;
1331          }
1332        }
1333
1334        // Check writable descriptors. If we're waiting on a connect, detect
1335        // success versus failure by the reaped error code.
1336        if (FD_ISSET(fd, &fdsWrite)) {
1337          FD_CLR(fd, &fdsWrite);
1338          if (pdispatcher->GetRequestedEvents() & DE_CONNECT) {
1339            if (!errcode) {
1340              ff |= DE_CONNECT;
1341            } else {
1342              ff |= DE_CLOSE;
1343            }
1344          } else {
1345            ff |= DE_WRITE;
1346          }
1347        }
1348
1349        // Tell the descriptor about the event.
1350        if (ff != 0) {
1351          pdispatcher->OnPreEvent(ff);
1352          pdispatcher->OnEvent(ff, errcode);
1353        }
1354      }
1355    }
1356
1357    // Recalc the time remaining to wait. Doing it here means it doesn't get
1358    // calced twice the first time through the loop
1359
1360    if (cmsWait != kForever) {
1361      ptvWait->tv_sec = 0;
1362      ptvWait->tv_usec = 0;
1363      struct timeval tvT;
1364      gettimeofday(&tvT, NULL);
1365      if ((tvStop.tv_sec > tvT.tv_sec)
1366          || ((tvStop.tv_sec == tvT.tv_sec)
1367              && (tvStop.tv_usec > tvT.tv_usec))) {
1368        ptvWait->tv_sec = tvStop.tv_sec - tvT.tv_sec;
1369        ptvWait->tv_usec = tvStop.tv_usec - tvT.tv_usec;
1370        if (ptvWait->tv_usec < 0) {
1371          ASSERT(ptvWait->tv_sec > 0);
1372          ptvWait->tv_usec += 1000000;
1373          ptvWait->tv_sec -= 1;
1374        }
1375      }
1376    }
1377  }
1378
1379  return true;
1380}
1381
1382static void GlobalSignalHandler(int signum) {
1383  PosixSignalHandler::Instance()->OnPosixSignalReceived(signum);
1384}
1385
1386bool PhysicalSocketServer::SetPosixSignalHandler(int signum,
1387                                                 void (*handler)(int)) {
1388  // If handler is SIG_IGN or SIG_DFL then clear our user-level handler,
1389  // otherwise set one.
1390  if (handler == SIG_IGN || handler == SIG_DFL) {
1391    if (!InstallSignal(signum, handler)) {
1392      return false;
1393    }
1394    if (signal_dispatcher_.get()) {
1395      signal_dispatcher_->ClearHandler(signum);
1396      if (!signal_dispatcher_->HasHandlers()) {
1397        signal_dispatcher_.reset();
1398      }
1399    }
1400  } else {
1401    if (!signal_dispatcher_.get()) {
1402      signal_dispatcher_.reset(new PosixSignalDispatcher(this));
1403    }
1404    signal_dispatcher_->SetHandler(signum, handler);
1405    if (!InstallSignal(signum, &GlobalSignalHandler)) {
1406      return false;
1407    }
1408  }
1409  return true;
1410}
1411
1412bool PhysicalSocketServer::InstallSignal(int signum, void (*handler)(int)) {
1413  struct sigaction act;
1414  // It doesn't really matter what we set this mask to.
1415  if (sigemptyset(&act.sa_mask) != 0) {
1416    LOG_ERR(LS_ERROR) << "Couldn't set mask";
1417    return false;
1418  }
1419  act.sa_handler = handler;
1420  // Use SA_RESTART so that our syscalls don't get EINTR, since we don't need it
1421  // and it's a nuisance. Though some syscalls still return EINTR and there's no
1422  // real standard for which ones. :(
1423  act.sa_flags = SA_RESTART;
1424  if (sigaction(signum, &act, NULL) != 0) {
1425    LOG_ERR(LS_ERROR) << "Couldn't set sigaction";
1426    return false;
1427  }
1428  return true;
1429}
1430#endif  // POSIX
1431
1432#ifdef WIN32
1433bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1434  int cmsTotal = cmsWait;
1435  int cmsElapsed = 0;
1436  uint32 msStart = Time();
1437
1438#if LOGGING
1439  if (last_tick_dispatch_count_ == 0) {
1440    last_tick_tracked_ = msStart;
1441  }
1442#endif
1443
1444  fWait_ = true;
1445  while (fWait_) {
1446    std::vector<WSAEVENT> events;
1447    std::vector<Dispatcher *> event_owners;
1448
1449    events.push_back(socket_ev_);
1450
1451    {
1452      CritScope cr(&crit_);
1453      size_t i = 0;
1454      iterators_.push_back(&i);
1455      // Don't track dispatchers_.size(), because we want to pick up any new
1456      // dispatchers that were added while processing the loop.
1457      while (i < dispatchers_.size()) {
1458        Dispatcher* disp = dispatchers_[i++];
1459        if (!process_io && (disp != signal_wakeup_))
1460          continue;
1461        SOCKET s = disp->GetSocket();
1462        if (disp->CheckSignalClose()) {
1463          // We just signalled close, don't poll this socket
1464        } else if (s != INVALID_SOCKET) {
1465          WSAEventSelect(s,
1466                         events[0],
1467                         FlagsToEvents(disp->GetRequestedEvents()));
1468        } else {
1469          events.push_back(disp->GetWSAEvent());
1470          event_owners.push_back(disp);
1471        }
1472      }
1473      ASSERT(iterators_.back() == &i);
1474      iterators_.pop_back();
1475    }
1476
1477    // Which is shorter, the delay wait or the asked wait?
1478
1479    int cmsNext;
1480    if (cmsWait == kForever) {
1481      cmsNext = cmsWait;
1482    } else {
1483      cmsNext = _max(0, cmsTotal - cmsElapsed);
1484    }
1485
1486    // Wait for one of the events to signal
1487    DWORD dw = WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()),
1488                                        &events[0],
1489                                        false,
1490                                        cmsNext,
1491                                        false);
1492
1493#if 0  // LOGGING
1494    // we track this information purely for logging purposes.
1495    last_tick_dispatch_count_++;
1496    if (last_tick_dispatch_count_ >= 1000) {
1497      int32 elapsed = TimeSince(last_tick_tracked_);
1498      LOG(INFO) << "PhysicalSocketServer took " << elapsed
1499                << "ms for 1000 events";
1500
1501      // If we get more than 1000 events in a second, we are spinning badly
1502      // (normally it should take about 8-20 seconds).
1503      ASSERT(elapsed > 1000);
1504
1505      last_tick_tracked_ = Time();
1506      last_tick_dispatch_count_ = 0;
1507    }
1508#endif
1509
1510    if (dw == WSA_WAIT_FAILED) {
1511      // Failed?
1512      // TODO: need a better strategy than this!
1513      int error = WSAGetLastError();
1514      ASSERT(false);
1515      return false;
1516    } else if (dw == WSA_WAIT_TIMEOUT) {
1517      // Timeout?
1518      return true;
1519    } else {
1520      // Figure out which one it is and call it
1521      CritScope cr(&crit_);
1522      int index = dw - WSA_WAIT_EVENT_0;
1523      if (index > 0) {
1524        --index; // The first event is the socket event
1525        event_owners[index]->OnPreEvent(0);
1526        event_owners[index]->OnEvent(0, 0);
1527      } else if (process_io) {
1528        size_t i = 0, end = dispatchers_.size();
1529        iterators_.push_back(&i);
1530        iterators_.push_back(&end);  // Don't iterate over new dispatchers.
1531        while (i < end) {
1532          Dispatcher* disp = dispatchers_[i++];
1533          SOCKET s = disp->GetSocket();
1534          if (s == INVALID_SOCKET)
1535            continue;
1536
1537          WSANETWORKEVENTS wsaEvents;
1538          int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents);
1539          if (err == 0) {
1540
1541#if LOGGING
1542            {
1543              if ((wsaEvents.lNetworkEvents & FD_READ) &&
1544                  wsaEvents.iErrorCode[FD_READ_BIT] != 0) {
1545                LOG(WARNING) << "PhysicalSocketServer got FD_READ_BIT error "
1546                             << wsaEvents.iErrorCode[FD_READ_BIT];
1547              }
1548              if ((wsaEvents.lNetworkEvents & FD_WRITE) &&
1549                  wsaEvents.iErrorCode[FD_WRITE_BIT] != 0) {
1550                LOG(WARNING) << "PhysicalSocketServer got FD_WRITE_BIT error "
1551                             << wsaEvents.iErrorCode[FD_WRITE_BIT];
1552              }
1553              if ((wsaEvents.lNetworkEvents & FD_CONNECT) &&
1554                  wsaEvents.iErrorCode[FD_CONNECT_BIT] != 0) {
1555                LOG(WARNING) << "PhysicalSocketServer got FD_CONNECT_BIT error "
1556                             << wsaEvents.iErrorCode[FD_CONNECT_BIT];
1557              }
1558              if ((wsaEvents.lNetworkEvents & FD_ACCEPT) &&
1559                  wsaEvents.iErrorCode[FD_ACCEPT_BIT] != 0) {
1560                LOG(WARNING) << "PhysicalSocketServer got FD_ACCEPT_BIT error "
1561                             << wsaEvents.iErrorCode[FD_ACCEPT_BIT];
1562              }
1563              if ((wsaEvents.lNetworkEvents & FD_CLOSE) &&
1564                  wsaEvents.iErrorCode[FD_CLOSE_BIT] != 0) {
1565                LOG(WARNING) << "PhysicalSocketServer got FD_CLOSE_BIT error "
1566                             << wsaEvents.iErrorCode[FD_CLOSE_BIT];
1567              }
1568            }
1569#endif
1570            uint32 ff = 0;
1571            int errcode = 0;
1572            if (wsaEvents.lNetworkEvents & FD_READ)
1573              ff |= DE_READ;
1574            if (wsaEvents.lNetworkEvents & FD_WRITE)
1575              ff |= DE_WRITE;
1576            if (wsaEvents.lNetworkEvents & FD_CONNECT) {
1577              if (wsaEvents.iErrorCode[FD_CONNECT_BIT] == 0) {
1578                ff |= DE_CONNECT;
1579              } else {
1580                ff |= DE_CLOSE;
1581                errcode = wsaEvents.iErrorCode[FD_CONNECT_BIT];
1582              }
1583            }
1584            if (wsaEvents.lNetworkEvents & FD_ACCEPT)
1585              ff |= DE_ACCEPT;
1586            if (wsaEvents.lNetworkEvents & FD_CLOSE) {
1587              ff |= DE_CLOSE;
1588              errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT];
1589            }
1590            if (ff != 0) {
1591              disp->OnPreEvent(ff);
1592              disp->OnEvent(ff, errcode);
1593            }
1594          }
1595        }
1596        ASSERT(iterators_.back() == &end);
1597        iterators_.pop_back();
1598        ASSERT(iterators_.back() == &i);
1599        iterators_.pop_back();
1600      }
1601
1602      // Reset the network event until new activity occurs
1603      WSAResetEvent(socket_ev_);
1604    }
1605
1606    // Break?
1607    if (!fWait_)
1608      break;
1609    cmsElapsed = TimeSince(msStart);
1610    if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) {
1611       break;
1612    }
1613  }
1614
1615  // Done
1616  return true;
1617}
1618#endif  // WIN32
1619
1620}  // namespace talk_base
1621