1/*
2 * libjingle
3 * Copyright 2004--2005, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 *  1. Redistributions of source code must retain the above copyright notice,
9 *     this list of conditions and the following disclaimer.
10 *  2. Redistributions in binary form must reproduce the above copyright notice,
11 *     this list of conditions and the following disclaimer in the documentation
12 *     and/or other materials provided with the distribution.
13 *  3. The name of the author may not be used to endorse or promote products
14 *     derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#include "talk/base/virtualsocketserver.h"
29
30#include <errno.h>
31
32#include <algorithm>
33#include <cmath>
34#include <map>
35#include <vector>
36
37#include "talk/base/common.h"
38#include "talk/base/logging.h"
39#include "talk/base/physicalsocketserver.h"
40#include "talk/base/socketaddresspair.h"
41#include "talk/base/thread.h"
42#include "talk/base/timeutils.h"
43
44namespace talk_base {
45#ifdef WIN32
46const in_addr kInitialNextIPv4 = { {0x01, 0, 0, 0} };
47#else
48// This value is entirely arbitrary, hence the lack of concern about endianness.
49const in_addr kInitialNextIPv4 = { 0x01000000 };
50#endif
51// Starts at ::2 so as to not cause confusion with ::1.
52const in6_addr kInitialNextIPv6 = { { {
53      0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2
54    } } };
55
56const uint16 kFirstEphemeralPort = 49152;
57const uint16 kLastEphemeralPort = 65535;
58const uint16 kEphemeralPortCount = kLastEphemeralPort - kFirstEphemeralPort + 1;
59const uint32 kDefaultNetworkCapacity = 64 * 1024;
60const uint32 kDefaultTcpBufferSize = 32 * 1024;
61
62const uint32 UDP_HEADER_SIZE = 28;  // IP + UDP headers
63const uint32 TCP_HEADER_SIZE = 40;  // IP + TCP headers
64const uint32 TCP_MSS = 1400;  // Maximum segment size
65
66// Note: The current algorithm doesn't work for sample sizes smaller than this.
67const int NUM_SAMPLES = 1000;
68
69enum {
70  MSG_ID_PACKET,
71  MSG_ID_CONNECT,
72  MSG_ID_DISCONNECT,
73};
74
75// Packets are passed between sockets as messages.  We copy the data just like
76// the kernel does.
77class Packet : public MessageData {
78 public:
79  Packet(const char* data, size_t size, const SocketAddress& from)
80        : size_(size), consumed_(0), from_(from) {
81    ASSERT(NULL != data);
82    data_ = new char[size_];
83    std::memcpy(data_, data, size_);
84  }
85
86  virtual ~Packet() {
87    delete[] data_;
88  }
89
90  const char* data() const { return data_ + consumed_; }
91  size_t size() const { return size_ - consumed_; }
92  const SocketAddress& from() const { return from_; }
93
94  // Remove the first size bytes from the data.
95  void Consume(size_t size) {
96    ASSERT(size + consumed_ < size_);
97    consumed_ += size;
98  }
99
100 private:
101  char* data_;
102  size_t size_, consumed_;
103  SocketAddress from_;
104};
105
106struct MessageAddress : public MessageData {
107  explicit MessageAddress(const SocketAddress& a) : addr(a) { }
108  SocketAddress addr;
109};
110
111// Implements the socket interface using the virtual network.  Packets are
112// passed as messages using the message queue of the socket server.
113class VirtualSocket : public AsyncSocket, public MessageHandler {
114 public:
115  VirtualSocket(VirtualSocketServer* server, int family, int type, bool async)
116      : server_(server), family_(family), type_(type), async_(async),
117        state_(CS_CLOSED), error_(0), listen_queue_(NULL),
118        write_enabled_(false),
119        network_size_(0), recv_buffer_size_(0), bound_(false), was_any_(false) {
120    ASSERT((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM));
121    ASSERT(async_ || (type_ != SOCK_STREAM));  // We only support async streams
122  }
123
124  virtual ~VirtualSocket() {
125    Close();
126
127    for (RecvBuffer::iterator it = recv_buffer_.begin();
128         it != recv_buffer_.end(); ++it) {
129      delete *it;
130    }
131  }
132
133  virtual SocketAddress GetLocalAddress() const {
134    return local_addr_;
135  }
136
137  virtual SocketAddress GetRemoteAddress() const {
138    return remote_addr_;
139  }
140
141  // Used by server sockets to set the local address without binding.
142  void SetLocalAddress(const SocketAddress& addr) {
143    local_addr_ = addr;
144  }
145
146  virtual int Bind(const SocketAddress& addr) {
147    if (!local_addr_.IsNil()) {
148      error_ = EINVAL;
149      return -1;
150    }
151    local_addr_ = addr;
152    int result = server_->Bind(this, &local_addr_);
153    if (result != 0) {
154      local_addr_.Clear();
155      error_ = EADDRINUSE;
156    } else {
157      bound_ = true;
158      was_any_ = addr.IsAnyIP();
159    }
160    return result;
161  }
162
163  virtual int Connect(const SocketAddress& addr) {
164    return InitiateConnect(addr, true);
165  }
166
167  virtual int Close() {
168    if (!local_addr_.IsNil() && bound_) {
169      // Remove from the binding table.
170      server_->Unbind(local_addr_, this);
171      bound_ = false;
172    }
173
174    if (SOCK_STREAM == type_) {
175      // Cancel pending sockets
176      if (listen_queue_) {
177        while (!listen_queue_->empty()) {
178          SocketAddress addr = listen_queue_->front();
179
180          // Disconnect listening socket.
181          server_->Disconnect(server_->LookupBinding(addr));
182          listen_queue_->pop_front();
183        }
184        delete listen_queue_;
185        listen_queue_ = NULL;
186      }
187      // Disconnect stream sockets
188      if (CS_CONNECTED == state_) {
189        // Disconnect remote socket, check if it is a child of a server socket.
190        VirtualSocket* socket =
191            server_->LookupConnection(local_addr_, remote_addr_);
192        if (!socket) {
193          // Not a server socket child, then see if it is bound.
194          // TODO: If this is indeed a server socket that has no
195          // children this will cause the server socket to be
196          // closed. This might lead to unexpected results, how to fix this?
197          socket = server_->LookupBinding(remote_addr_);
198        }
199        server_->Disconnect(socket);
200
201        // Remove mapping for both directions.
202        server_->RemoveConnection(remote_addr_, local_addr_);
203        server_->RemoveConnection(local_addr_, remote_addr_);
204      }
205      // Cancel potential connects
206      MessageList msgs;
207      if (server_->msg_queue_) {
208        server_->msg_queue_->Clear(this, MSG_ID_CONNECT, &msgs);
209      }
210      for (MessageList::iterator it = msgs.begin(); it != msgs.end(); ++it) {
211        ASSERT(NULL != it->pdata);
212        MessageAddress* data = static_cast<MessageAddress*>(it->pdata);
213
214        // Lookup remote side.
215        VirtualSocket* socket = server_->LookupConnection(local_addr_,
216                                                          data->addr);
217        if (socket) {
218          // Server socket, remote side is a socket retreived by
219          // accept. Accepted sockets are not bound so we will not
220          // find it by looking in the bindings table.
221          server_->Disconnect(socket);
222          server_->RemoveConnection(local_addr_, data->addr);
223        } else {
224          server_->Disconnect(server_->LookupBinding(data->addr));
225        }
226        delete data;
227      }
228      // Clear incoming packets and disconnect messages
229      if (server_->msg_queue_) {
230        server_->msg_queue_->Clear(this);
231      }
232    }
233
234    state_ = CS_CLOSED;
235    local_addr_.Clear();
236    remote_addr_.Clear();
237    return 0;
238  }
239
240  virtual int Send(const void *pv, size_t cb) {
241    if (CS_CONNECTED != state_) {
242      error_ = ENOTCONN;
243      return -1;
244    }
245    if (SOCK_DGRAM == type_) {
246      return SendUdp(pv, cb, remote_addr_);
247    } else {
248      return SendTcp(pv, cb);
249    }
250  }
251
252  virtual int SendTo(const void *pv, size_t cb, const SocketAddress& addr) {
253    if (SOCK_DGRAM == type_) {
254      return SendUdp(pv, cb, addr);
255    } else {
256      if (CS_CONNECTED != state_) {
257        error_ = ENOTCONN;
258        return -1;
259      }
260      return SendTcp(pv, cb);
261    }
262  }
263
264  virtual int Recv(void *pv, size_t cb) {
265    SocketAddress addr;
266    return RecvFrom(pv, cb, &addr);
267  }
268
269  virtual int RecvFrom(void *pv, size_t cb, SocketAddress *paddr) {
270    // If we don't have a packet, then either error or wait for one to arrive.
271    if (recv_buffer_.empty()) {
272      if (async_) {
273        error_ = EAGAIN;
274        return -1;
275      }
276      while (recv_buffer_.empty()) {
277        Message msg;
278        server_->msg_queue_->Get(&msg);
279        server_->msg_queue_->Dispatch(&msg);
280      }
281    }
282
283    // Return the packet at the front of the queue.
284    Packet* packet = recv_buffer_.front();
285    size_t data_read = _min(cb, packet->size());
286    std::memcpy(pv, packet->data(), data_read);
287    *paddr = packet->from();
288
289    if (data_read < packet->size()) {
290      packet->Consume(data_read);
291    } else {
292      recv_buffer_.pop_front();
293      delete packet;
294    }
295
296    if (SOCK_STREAM == type_) {
297      bool was_full = (recv_buffer_size_ == server_->recv_buffer_capacity_);
298      recv_buffer_size_ -= data_read;
299      if (was_full) {
300        VirtualSocket* sender = server_->LookupBinding(remote_addr_);
301        ASSERT(NULL != sender);
302        server_->SendTcp(sender);
303      }
304    }
305
306    return static_cast<int>(data_read);
307  }
308
309  virtual int Listen(int backlog) {
310    ASSERT(SOCK_STREAM == type_);
311    ASSERT(CS_CLOSED == state_);
312    if (local_addr_.IsNil()) {
313      error_ = EINVAL;
314      return -1;
315    }
316    ASSERT(NULL == listen_queue_);
317    listen_queue_ = new ListenQueue;
318    state_ = CS_CONNECTING;
319    return 0;
320  }
321
322  virtual VirtualSocket* Accept(SocketAddress *paddr) {
323    if (NULL == listen_queue_) {
324      error_ = EINVAL;
325      return NULL;
326    }
327    while (!listen_queue_->empty()) {
328      VirtualSocket* socket = new VirtualSocket(server_, AF_INET, type_,
329                                                async_);
330
331      // Set the new local address to the same as this server socket.
332      socket->SetLocalAddress(local_addr_);
333      // Sockets made from a socket that 'was Any' need to inherit that.
334      socket->set_was_any(was_any_);
335      SocketAddress remote_addr(listen_queue_->front());
336      int result = socket->InitiateConnect(remote_addr, false);
337      listen_queue_->pop_front();
338      if (result != 0) {
339        delete socket;
340        continue;
341      }
342      socket->CompleteConnect(remote_addr, false);
343      if (paddr) {
344        *paddr = remote_addr;
345      }
346      return socket;
347    }
348    error_ = EWOULDBLOCK;
349    return NULL;
350  }
351
352  virtual int GetError() const {
353    return error_;
354  }
355
356  virtual void SetError(int error) {
357    error_ = error;
358  }
359
360  virtual ConnState GetState() const {
361    return state_;
362  }
363
364  virtual int GetOption(Option opt, int* value) {
365    OptionsMap::const_iterator it = options_map_.find(opt);
366    if (it == options_map_.end()) {
367      return -1;
368    }
369    *value = it->second;
370    return 0;  // 0 is success to emulate getsockopt()
371  }
372
373  virtual int SetOption(Option opt, int value) {
374    options_map_[opt] = value;
375    return 0;  // 0 is success to emulate setsockopt()
376  }
377
378  virtual int EstimateMTU(uint16* mtu) {
379    if (CS_CONNECTED != state_)
380      return ENOTCONN;
381    else
382      return 65536;
383  }
384
385  void OnMessage(Message *pmsg) {
386    if (pmsg->message_id == MSG_ID_PACKET) {
387      //ASSERT(!local_addr_.IsAny());
388      ASSERT(NULL != pmsg->pdata);
389      Packet* packet = static_cast<Packet*>(pmsg->pdata);
390
391      recv_buffer_.push_back(packet);
392
393      if (async_) {
394        SignalReadEvent(this);
395      }
396    } else if (pmsg->message_id == MSG_ID_CONNECT) {
397      ASSERT(NULL != pmsg->pdata);
398      MessageAddress* data = static_cast<MessageAddress*>(pmsg->pdata);
399      if (listen_queue_ != NULL) {
400        listen_queue_->push_back(data->addr);
401        if (async_) {
402          SignalReadEvent(this);
403        }
404      } else if ((SOCK_STREAM == type_) && (CS_CONNECTING == state_)) {
405        CompleteConnect(data->addr, true);
406      } else {
407        LOG(LS_VERBOSE) << "Socket at " << local_addr_ << " is not listening";
408        server_->Disconnect(server_->LookupBinding(data->addr));
409      }
410      delete data;
411    } else if (pmsg->message_id == MSG_ID_DISCONNECT) {
412      ASSERT(SOCK_STREAM == type_);
413      if (CS_CLOSED != state_) {
414        int error = (CS_CONNECTING == state_) ? ECONNREFUSED : 0;
415        state_ = CS_CLOSED;
416        remote_addr_.Clear();
417        if (async_) {
418          SignalCloseEvent(this, error);
419        }
420      }
421    } else {
422      ASSERT(false);
423    }
424  }
425
426  bool was_any() { return was_any_; }
427  void set_was_any(bool was_any) { was_any_ = was_any; }
428
429 private:
430  struct NetworkEntry {
431    size_t size;
432    uint32 done_time;
433  };
434
435  typedef std::deque<SocketAddress> ListenQueue;
436  typedef std::deque<NetworkEntry> NetworkQueue;
437  typedef std::vector<char> SendBuffer;
438  typedef std::list<Packet*> RecvBuffer;
439  typedef std::map<Option, int> OptionsMap;
440
441  int InitiateConnect(const SocketAddress& addr, bool use_delay) {
442    if (!remote_addr_.IsNil()) {
443      error_ = (CS_CONNECTED == state_) ? EISCONN : EINPROGRESS;
444      return -1;
445    }
446    if (local_addr_.IsNil()) {
447      // If there's no local address set, grab a random one in the correct AF.
448      int result = 0;
449      if (addr.ipaddr().family() == AF_INET) {
450        result = Bind(SocketAddress("0.0.0.0", 0));
451      } else if (addr.ipaddr().family() == AF_INET6) {
452        result = Bind(SocketAddress("::", 0));
453      }
454      if (result != 0) {
455        return result;
456      }
457    }
458    if (type_ == SOCK_DGRAM) {
459      remote_addr_ = addr;
460      state_ = CS_CONNECTED;
461    } else {
462      int result = server_->Connect(this, addr, use_delay);
463      if (result != 0) {
464        error_ = EHOSTUNREACH;
465        return -1;
466      }
467      state_ = CS_CONNECTING;
468    }
469    return 0;
470  }
471
472  void CompleteConnect(const SocketAddress& addr, bool notify) {
473    ASSERT(CS_CONNECTING == state_);
474    remote_addr_ = addr;
475    state_ = CS_CONNECTED;
476    server_->AddConnection(remote_addr_, local_addr_, this);
477    if (async_ && notify) {
478      SignalConnectEvent(this);
479    }
480  }
481
482  int SendUdp(const void* pv, size_t cb, const SocketAddress& addr) {
483    // If we have not been assigned a local port, then get one.
484    if (local_addr_.IsNil()) {
485      local_addr_ = EmptySocketAddressWithFamily(addr.ipaddr().family());
486      int result = server_->Bind(this, &local_addr_);
487      if (result != 0) {
488        local_addr_.Clear();
489        error_ = EADDRINUSE;
490        return result;
491      }
492    }
493
494    // Send the data in a message to the appropriate socket.
495    return server_->SendUdp(this, static_cast<const char*>(pv), cb, addr);
496  }
497
498  int SendTcp(const void* pv, size_t cb) {
499    size_t capacity = server_->send_buffer_capacity_ - send_buffer_.size();
500    if (0 == capacity) {
501      write_enabled_ = true;
502      error_ = EWOULDBLOCK;
503      return -1;
504    }
505    size_t consumed = _min(cb, capacity);
506    const char* cpv = static_cast<const char*>(pv);
507    send_buffer_.insert(send_buffer_.end(), cpv, cpv + consumed);
508    server_->SendTcp(this);
509    return static_cast<int>(consumed);
510  }
511
512  VirtualSocketServer* server_;
513  int family_;
514  int type_;
515  bool async_;
516  ConnState state_;
517  int error_;
518  SocketAddress local_addr_;
519  SocketAddress remote_addr_;
520
521  // Pending sockets which can be Accepted
522  ListenQueue* listen_queue_;
523
524  // Data which tcp has buffered for sending
525  SendBuffer send_buffer_;
526  bool write_enabled_;
527
528  // Critical section to protect the recv_buffer and queue_
529  CriticalSection crit_;
530
531  // Network model that enforces bandwidth and capacity constraints
532  NetworkQueue network_;
533  size_t network_size_;
534
535  // Data which has been received from the network
536  RecvBuffer recv_buffer_;
537  // The amount of data which is in flight or in recv_buffer_
538  size_t recv_buffer_size_;
539
540  // Is this socket bound?
541  bool bound_;
542
543  // When we bind a socket to Any, VSS's Bind gives it another address. For
544  // dual-stack sockets, we want to distinguish between sockets that were
545  // explicitly given a particular address and sockets that had one picked
546  // for them by VSS.
547  bool was_any_;
548
549  // Store the options that are set
550  OptionsMap options_map_;
551
552  friend class VirtualSocketServer;
553};
554
555VirtualSocketServer::VirtualSocketServer(SocketServer* ss)
556    : server_(ss), server_owned_(false), msg_queue_(NULL), stop_on_idle_(false),
557      network_delay_(Time()), next_ipv4_(kInitialNextIPv4),
558      next_ipv6_(kInitialNextIPv6), next_port_(kFirstEphemeralPort),
559      bindings_(new AddressMap()), connections_(new ConnectionMap()),
560      bandwidth_(0), network_capacity_(kDefaultNetworkCapacity),
561      send_buffer_capacity_(kDefaultTcpBufferSize),
562      recv_buffer_capacity_(kDefaultTcpBufferSize),
563      delay_mean_(0), delay_stddev_(0), delay_samples_(NUM_SAMPLES),
564      delay_dist_(NULL), drop_prob_(0.0) {
565  if (!server_) {
566    server_ = new PhysicalSocketServer();
567    server_owned_ = true;
568  }
569  UpdateDelayDistribution();
570}
571
572VirtualSocketServer::~VirtualSocketServer() {
573  delete bindings_;
574  delete connections_;
575  delete delay_dist_;
576  if (server_owned_) {
577    delete server_;
578  }
579}
580
581IPAddress VirtualSocketServer::GetNextIP(int family) {
582  if (family == AF_INET) {
583    IPAddress next_ip(next_ipv4_);
584    next_ipv4_.s_addr =
585        HostToNetwork32(NetworkToHost32(next_ipv4_.s_addr) + 1);
586    return next_ip;
587  } else if (family == AF_INET6) {
588    IPAddress next_ip(next_ipv6_);
589    uint32* as_ints = reinterpret_cast<uint32*>(&next_ipv6_.s6_addr);
590    as_ints[3] += 1;
591    return next_ip;
592  }
593  return IPAddress();
594}
595
596uint16 VirtualSocketServer::GetNextPort() {
597  uint16 port = next_port_;
598  if (next_port_ < kLastEphemeralPort) {
599    ++next_port_;
600  } else {
601    next_port_ = kFirstEphemeralPort;
602  }
603  return port;
604}
605
606Socket* VirtualSocketServer::CreateSocket(int type) {
607  return CreateSocket(AF_INET, type);
608}
609
610Socket* VirtualSocketServer::CreateSocket(int family, int type) {
611  return CreateSocketInternal(family, type);
612}
613
614AsyncSocket* VirtualSocketServer::CreateAsyncSocket(int type) {
615  return CreateAsyncSocket(AF_INET, type);
616}
617
618AsyncSocket* VirtualSocketServer::CreateAsyncSocket(int family, int type) {
619  return CreateSocketInternal(family, type);
620}
621
622VirtualSocket* VirtualSocketServer::CreateSocketInternal(int family, int type) {
623  return new VirtualSocket(this, family, type, true);
624}
625
626void VirtualSocketServer::SetMessageQueue(MessageQueue* msg_queue) {
627  msg_queue_ = msg_queue;
628  if (msg_queue_) {
629    msg_queue_->SignalQueueDestroyed.connect(this,
630        &VirtualSocketServer::OnMessageQueueDestroyed);
631  }
632}
633
634bool VirtualSocketServer::Wait(int cmsWait, bool process_io) {
635  ASSERT(msg_queue_ == Thread::Current());
636  if (stop_on_idle_ && Thread::Current()->empty()) {
637    return false;
638  }
639  return socketserver()->Wait(cmsWait, process_io);
640}
641
642void VirtualSocketServer::WakeUp() {
643  socketserver()->WakeUp();
644}
645
646bool VirtualSocketServer::ProcessMessagesUntilIdle() {
647  ASSERT(msg_queue_ == Thread::Current());
648  stop_on_idle_ = true;
649  while (!msg_queue_->empty()) {
650    Message msg;
651    if (msg_queue_->Get(&msg, kForever)) {
652      msg_queue_->Dispatch(&msg);
653    }
654  }
655  stop_on_idle_ = false;
656  return !msg_queue_->IsQuitting();
657}
658
659int VirtualSocketServer::Bind(VirtualSocket* socket,
660                              const SocketAddress& addr) {
661  ASSERT(NULL != socket);
662  // Address must be completely specified at this point
663  ASSERT(!IPIsUnspec(addr.ipaddr()));
664  ASSERT(addr.port() != 0);
665
666  // Normalize the address (turns v6-mapped addresses into v4-addresses).
667  SocketAddress normalized(addr.ipaddr().Normalized(), addr.port());
668
669  AddressMap::value_type entry(normalized, socket);
670  return bindings_->insert(entry).second ? 0 : -1;
671}
672
673int VirtualSocketServer::Bind(VirtualSocket* socket, SocketAddress* addr) {
674  ASSERT(NULL != socket);
675
676  if (IPIsAny(addr->ipaddr())) {
677    addr->SetIP(GetNextIP(addr->ipaddr().family()));
678  } else if (!IPIsUnspec(addr->ipaddr())) {
679    addr->SetIP(addr->ipaddr().Normalized());
680  } else {
681    ASSERT(false);
682  }
683
684  if (addr->port() == 0) {
685    for (int i = 0; i < kEphemeralPortCount; ++i) {
686      addr->SetPort(GetNextPort());
687      if (bindings_->find(*addr) == bindings_->end()) {
688        break;
689      }
690    }
691  }
692
693  return Bind(socket, *addr);
694}
695
696VirtualSocket* VirtualSocketServer::LookupBinding(const SocketAddress& addr) {
697  SocketAddress normalized(addr.ipaddr().Normalized(),
698                           addr.port());
699  AddressMap::iterator it = bindings_->find(normalized);
700  return (bindings_->end() != it) ? it->second : NULL;
701}
702
703int VirtualSocketServer::Unbind(const SocketAddress& addr,
704                                VirtualSocket* socket) {
705  SocketAddress normalized(addr.ipaddr().Normalized(),
706                           addr.port());
707  ASSERT((*bindings_)[normalized] == socket);
708  bindings_->erase(bindings_->find(normalized));
709  return 0;
710}
711
712void VirtualSocketServer::AddConnection(const SocketAddress& local,
713                                        const SocketAddress& remote,
714                                        VirtualSocket* remote_socket) {
715  // Add this socket pair to our routing table. This will allow
716  // multiple clients to connect to the same server address.
717  SocketAddress local_normalized(local.ipaddr().Normalized(),
718                                 local.port());
719  SocketAddress remote_normalized(remote.ipaddr().Normalized(),
720                                  remote.port());
721  SocketAddressPair address_pair(local_normalized, remote_normalized);
722  connections_->insert(std::pair<SocketAddressPair,
723                       VirtualSocket*>(address_pair, remote_socket));
724}
725
726VirtualSocket* VirtualSocketServer::LookupConnection(
727    const SocketAddress& local,
728    const SocketAddress& remote) {
729  SocketAddress local_normalized(local.ipaddr().Normalized(),
730                                 local.port());
731  SocketAddress remote_normalized(remote.ipaddr().Normalized(),
732                                  remote.port());
733  SocketAddressPair address_pair(local_normalized, remote_normalized);
734  ConnectionMap::iterator it = connections_->find(address_pair);
735  return (connections_->end() != it) ? it->second : NULL;
736}
737
738void VirtualSocketServer::RemoveConnection(const SocketAddress& local,
739                                           const SocketAddress& remote) {
740  SocketAddress local_normalized(local.ipaddr().Normalized(),
741                                local.port());
742  SocketAddress remote_normalized(remote.ipaddr().Normalized(),
743                                 remote.port());
744  SocketAddressPair address_pair(local_normalized, remote_normalized);
745  connections_->erase(address_pair);
746}
747
748static double Random() {
749  return static_cast<double>(rand()) / RAND_MAX;
750}
751
752int VirtualSocketServer::Connect(VirtualSocket* socket,
753                                 const SocketAddress& remote_addr,
754                                 bool use_delay) {
755  uint32 delay = use_delay ? GetRandomTransitDelay() : 0;
756  VirtualSocket* remote = LookupBinding(remote_addr);
757  if (!CanInteractWith(socket, remote)) {
758    LOG(LS_INFO) << "Address family mismatch between "
759                 << socket->GetLocalAddress() << " and " << remote_addr;
760    return -1;
761  }
762  if (remote != NULL) {
763    SocketAddress addr = socket->GetLocalAddress();
764    msg_queue_->PostDelayed(delay, remote, MSG_ID_CONNECT,
765                            new MessageAddress(addr));
766  } else {
767    LOG(LS_INFO) << "No one listening at " << remote_addr;
768    msg_queue_->PostDelayed(delay, socket, MSG_ID_DISCONNECT);
769  }
770  return 0;
771}
772
773bool VirtualSocketServer::Disconnect(VirtualSocket* socket) {
774  if (socket) {
775    // Remove the mapping.
776    msg_queue_->Post(socket, MSG_ID_DISCONNECT);
777    return true;
778  }
779  return false;
780}
781
782int VirtualSocketServer::SendUdp(VirtualSocket* socket,
783                                 const char* data, size_t data_size,
784                                 const SocketAddress& remote_addr) {
785  // See if we want to drop this packet.
786  if (Random() < drop_prob_) {
787    LOG(LS_VERBOSE) << "Dropping packet: bad luck";
788    return static_cast<int>(data_size);
789  }
790
791  VirtualSocket* recipient = LookupBinding(remote_addr);
792  if (!recipient) {
793    // Make a fake recipient for address family checking.
794    scoped_ptr<VirtualSocket> dummy_socket(
795        CreateSocketInternal(AF_INET, SOCK_DGRAM));
796    dummy_socket->SetLocalAddress(remote_addr);
797    if (!CanInteractWith(socket, dummy_socket.get())) {
798      LOG(LS_VERBOSE) << "Incompatible address families: "
799                      << socket->GetLocalAddress() << " and " << remote_addr;
800      return -1;
801    }
802    LOG(LS_VERBOSE) << "No one listening at " << remote_addr;
803    return static_cast<int>(data_size);
804  }
805
806  if (!CanInteractWith(socket, recipient)) {
807    LOG(LS_VERBOSE) << "Incompatible address families: "
808                    << socket->GetLocalAddress() << " and " << remote_addr;
809    return -1;
810  }
811
812  CritScope cs(&socket->crit_);
813
814  uint32 cur_time = Time();
815  PurgeNetworkPackets(socket, cur_time);
816
817  // Determine whether we have enough bandwidth to accept this packet.  To do
818  // this, we need to update the send queue.  Once we know it's current size,
819  // we know whether we can fit this packet.
820  //
821  // NOTE: There are better algorithms for maintaining such a queue (such as
822  // "Derivative Random Drop"); however, this algorithm is a more accurate
823  // simulation of what a normal network would do.
824
825  size_t packet_size = data_size + UDP_HEADER_SIZE;
826  if (socket->network_size_ + packet_size > network_capacity_) {
827    LOG(LS_VERBOSE) << "Dropping packet: network capacity exceeded";
828    return static_cast<int>(data_size);
829  }
830
831  AddPacketToNetwork(socket, recipient, cur_time, data, data_size,
832                     UDP_HEADER_SIZE, false);
833
834  return static_cast<int>(data_size);
835}
836
837void VirtualSocketServer::SendTcp(VirtualSocket* socket) {
838  // TCP can't send more data than will fill up the receiver's buffer.
839  // We track the data that is in the buffer plus data in flight using the
840  // recipient's recv_buffer_size_.  Anything beyond that must be stored in the
841  // sender's buffer.  We will trigger the buffered data to be sent when data
842  // is read from the recv_buffer.
843
844  // Lookup the local/remote pair in the connections table.
845  VirtualSocket* recipient = LookupConnection(socket->local_addr_,
846                                              socket->remote_addr_);
847  if (!recipient) {
848    LOG(LS_VERBOSE) << "Sending data to no one.";
849    return;
850  }
851
852  CritScope cs(&socket->crit_);
853
854  uint32 cur_time = Time();
855  PurgeNetworkPackets(socket, cur_time);
856
857  while (true) {
858    size_t available = recv_buffer_capacity_ - recipient->recv_buffer_size_;
859    size_t max_data_size = _min<size_t>(available, TCP_MSS - TCP_HEADER_SIZE);
860    size_t data_size = _min(socket->send_buffer_.size(), max_data_size);
861    if (0 == data_size)
862      break;
863
864    AddPacketToNetwork(socket, recipient, cur_time, &socket->send_buffer_[0],
865                       data_size, TCP_HEADER_SIZE, true);
866    recipient->recv_buffer_size_ += data_size;
867
868    size_t new_buffer_size = socket->send_buffer_.size() - data_size;
869    // Avoid undefined access beyond the last element of the vector.
870    // This only happens when new_buffer_size is 0.
871    if (data_size < socket->send_buffer_.size()) {
872      // memmove is required for potentially overlapping source/destination.
873      memmove(&socket->send_buffer_[0], &socket->send_buffer_[data_size],
874              new_buffer_size);
875    }
876    socket->send_buffer_.resize(new_buffer_size);
877  }
878
879  if (socket->write_enabled_
880      && (socket->send_buffer_.size() < send_buffer_capacity_)) {
881    socket->write_enabled_ = false;
882    socket->SignalWriteEvent(socket);
883  }
884}
885
886void VirtualSocketServer::AddPacketToNetwork(VirtualSocket* sender,
887                                             VirtualSocket* recipient,
888                                             uint32 cur_time,
889                                             const char* data,
890                                             size_t data_size,
891                                             size_t header_size,
892                                             bool ordered) {
893  VirtualSocket::NetworkEntry entry;
894  entry.size = data_size + header_size;
895
896  sender->network_size_ += entry.size;
897  uint32 send_delay = SendDelay(static_cast<uint32>(sender->network_size_));
898  entry.done_time = cur_time + send_delay;
899  sender->network_.push_back(entry);
900
901  // Find the delay for crossing the many virtual hops of the network.
902  uint32 transit_delay = GetRandomTransitDelay();
903
904  // Post the packet as a message to be delivered (on our own thread)
905  Packet* p = new Packet(data, data_size, sender->local_addr_);
906  uint32 ts = TimeAfter(send_delay + transit_delay);
907  if (ordered) {
908    // Ensure that new packets arrive after previous ones
909    // TODO: consider ordering on a per-socket basis, since this
910    // introduces artifical delay.
911    ts = TimeMax(ts, network_delay_);
912  }
913  msg_queue_->PostAt(ts, recipient, MSG_ID_PACKET, p);
914  network_delay_ = TimeMax(ts, network_delay_);
915}
916
917void VirtualSocketServer::PurgeNetworkPackets(VirtualSocket* socket,
918                                              uint32 cur_time) {
919  while (!socket->network_.empty() &&
920         (socket->network_.front().done_time <= cur_time)) {
921    ASSERT(socket->network_size_ >= socket->network_.front().size);
922    socket->network_size_ -= socket->network_.front().size;
923    socket->network_.pop_front();
924  }
925}
926
927uint32 VirtualSocketServer::SendDelay(uint32 size) {
928  if (bandwidth_ == 0)
929    return 0;
930  else
931    return 1000 * size / bandwidth_;
932}
933
934#if 0
935void PrintFunction(std::vector<std::pair<double, double> >* f) {
936  return;
937  double sum = 0;
938  for (uint32 i = 0; i < f->size(); ++i) {
939    std::cout << (*f)[i].first << '\t' << (*f)[i].second << std::endl;
940    sum += (*f)[i].second;
941  }
942  if (!f->empty()) {
943    const double mean = sum / f->size();
944    double sum_sq_dev = 0;
945    for (uint32 i = 0; i < f->size(); ++i) {
946      double dev = (*f)[i].second - mean;
947      sum_sq_dev += dev * dev;
948    }
949    std::cout << "Mean = " << mean << " StdDev = "
950              << sqrt(sum_sq_dev / f->size()) << std::endl;
951  }
952}
953#endif  // <unused>
954
955void VirtualSocketServer::UpdateDelayDistribution() {
956  Function* dist = CreateDistribution(delay_mean_, delay_stddev_,
957                                      delay_samples_);
958  // We take a lock just to make sure we don't leak memory.
959  {
960    CritScope cs(&delay_crit_);
961    delete delay_dist_;
962    delay_dist_ = dist;
963  }
964}
965
966static double PI = 4 * std::atan(1.0);
967
968static double Normal(double x, double mean, double stddev) {
969  double a = (x - mean) * (x - mean) / (2 * stddev * stddev);
970  return std::exp(-a) / (stddev * sqrt(2 * PI));
971}
972
973#if 0  // static unused gives a warning
974static double Pareto(double x, double min, double k) {
975  if (x < min)
976    return 0;
977  else
978    return k * std::pow(min, k) / std::pow(x, k+1);
979}
980#endif
981
982VirtualSocketServer::Function* VirtualSocketServer::CreateDistribution(
983    uint32 mean, uint32 stddev, uint32 samples) {
984  Function* f = new Function();
985
986  if (0 == stddev) {
987    f->push_back(Point(mean, 1.0));
988  } else {
989    double start = 0;
990    if (mean >= 4 * static_cast<double>(stddev))
991      start = mean - 4 * static_cast<double>(stddev);
992    double end = mean + 4 * static_cast<double>(stddev);
993
994    for (uint32 i = 0; i < samples; i++) {
995      double x = start + (end - start) * i / (samples - 1);
996      double y = Normal(x, mean, stddev);
997      f->push_back(Point(x, y));
998    }
999  }
1000  return Resample(Invert(Accumulate(f)), 0, 1, samples);
1001}
1002
1003uint32 VirtualSocketServer::GetRandomTransitDelay() {
1004  size_t index = rand() % delay_dist_->size();
1005  double delay = (*delay_dist_)[index].second;
1006  //LOG_F(LS_INFO) << "random[" << index << "] = " << delay;
1007  return static_cast<uint32>(delay);
1008}
1009
1010struct FunctionDomainCmp {
1011  bool operator()(const VirtualSocketServer::Point& p1,
1012                   const VirtualSocketServer::Point& p2) {
1013    return p1.first < p2.first;
1014  }
1015  bool operator()(double v1, const VirtualSocketServer::Point& p2) {
1016    return v1 < p2.first;
1017  }
1018  bool operator()(const VirtualSocketServer::Point& p1, double v2) {
1019    return p1.first < v2;
1020  }
1021};
1022
1023VirtualSocketServer::Function* VirtualSocketServer::Accumulate(Function* f) {
1024  ASSERT(f->size() >= 1);
1025  double v = 0;
1026  for (Function::size_type i = 0; i < f->size() - 1; ++i) {
1027    double dx = (*f)[i + 1].first - (*f)[i].first;
1028    double avgy = ((*f)[i + 1].second + (*f)[i].second) / 2;
1029    (*f)[i].second = v;
1030    v = v + dx * avgy;
1031  }
1032  (*f)[f->size()-1].second = v;
1033  return f;
1034}
1035
1036VirtualSocketServer::Function* VirtualSocketServer::Invert(Function* f) {
1037  for (Function::size_type i = 0; i < f->size(); ++i)
1038    std::swap((*f)[i].first, (*f)[i].second);
1039
1040  std::sort(f->begin(), f->end(), FunctionDomainCmp());
1041  return f;
1042}
1043
1044VirtualSocketServer::Function* VirtualSocketServer::Resample(
1045    Function* f, double x1, double x2, uint32 samples) {
1046  Function* g = new Function();
1047
1048  for (size_t i = 0; i < samples; i++) {
1049    double x = x1 + (x2 - x1) * i / (samples - 1);
1050    double y = Evaluate(f, x);
1051    g->push_back(Point(x, y));
1052  }
1053
1054  delete f;
1055  return g;
1056}
1057
1058double VirtualSocketServer::Evaluate(Function* f, double x) {
1059  Function::iterator iter =
1060      std::lower_bound(f->begin(), f->end(), x, FunctionDomainCmp());
1061  if (iter == f->begin()) {
1062    return (*f)[0].second;
1063  } else if (iter == f->end()) {
1064    ASSERT(f->size() >= 1);
1065    return (*f)[f->size() - 1].second;
1066  } else if (iter->first == x) {
1067    return iter->second;
1068  } else {
1069    double x1 = (iter - 1)->first;
1070    double y1 = (iter - 1)->second;
1071    double x2 = iter->first;
1072    double y2 = iter->second;
1073    return y1 + (y2 - y1) * (x - x1) / (x2 - x1);
1074  }
1075}
1076
1077bool VirtualSocketServer::CanInteractWith(VirtualSocket* local,
1078                                          VirtualSocket* remote) {
1079  if (!local || !remote) {
1080    return false;
1081  }
1082  IPAddress local_ip = local->GetLocalAddress().ipaddr();
1083  IPAddress remote_ip = remote->GetLocalAddress().ipaddr();
1084  IPAddress local_normalized = local_ip.Normalized();
1085  IPAddress remote_normalized = remote_ip.Normalized();
1086  // Check if the addresses are the same family after Normalization (turns
1087  // mapped IPv6 address into IPv4 addresses).
1088  // This will stop unmapped V6 addresses from talking to mapped V6 addresses.
1089  if (local_normalized.family() == remote_normalized.family()) {
1090    return true;
1091  }
1092
1093  // If ip1 is IPv4 and ip2 is :: and ip2 is not IPV6_V6ONLY.
1094  int remote_v6_only = 0;
1095  remote->GetOption(Socket::OPT_IPV6_V6ONLY, &remote_v6_only);
1096  if (local_ip.family() == AF_INET && !remote_v6_only && IPIsAny(remote_ip)) {
1097    return true;
1098  }
1099  // Same check, backwards.
1100  int local_v6_only = 0;
1101  local->GetOption(Socket::OPT_IPV6_V6ONLY, &local_v6_only);
1102  if (remote_ip.family() == AF_INET && !local_v6_only && IPIsAny(local_ip)) {
1103    return true;
1104  }
1105
1106  // Check to see if either socket was explicitly bound to IPv6-any.
1107  // These sockets can talk with anyone.
1108  if (local_ip.family() == AF_INET6 && local->was_any()) {
1109    return true;
1110  }
1111  if (remote_ip.family() == AF_INET6 && remote->was_any()) {
1112    return true;
1113  }
1114
1115  return false;
1116}
1117
1118}  // namespace talk_base
1119