1/*
2 *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 *  Use of this source code is governed by a BSD-style license
5 *  that can be found in the LICENSE file in the root of the source
6 *  tree. An additional intellectual property rights grant can be found
7 *  in the file PATENTS.  All contributing project authors may
8 *  be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "webrtc/base/virtualsocketserver.h"
12
13#include <errno.h>
14#include <math.h>
15
16#include <algorithm>
17#include <map>
18#include <vector>
19
20#include "webrtc/base/common.h"
21#include "webrtc/base/logging.h"
22#include "webrtc/base/physicalsocketserver.h"
23#include "webrtc/base/socketaddresspair.h"
24#include "webrtc/base/thread.h"
25#include "webrtc/base/timeutils.h"
26
27namespace rtc {
28#if defined(WEBRTC_WIN)
29const in_addr kInitialNextIPv4 = { {0x01, 0, 0, 0} };
30#else
31// This value is entirely arbitrary, hence the lack of concern about endianness.
32const in_addr kInitialNextIPv4 = { 0x01000000 };
33#endif
34// Starts at ::2 so as to not cause confusion with ::1.
35const in6_addr kInitialNextIPv6 = { { {
36      0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2
37    } } };
38
39const uint16 kFirstEphemeralPort = 49152;
40const uint16 kLastEphemeralPort = 65535;
41const uint16 kEphemeralPortCount = kLastEphemeralPort - kFirstEphemeralPort + 1;
42const uint32 kDefaultNetworkCapacity = 64 * 1024;
43const uint32 kDefaultTcpBufferSize = 32 * 1024;
44
45const uint32 UDP_HEADER_SIZE = 28;  // IP + UDP headers
46const uint32 TCP_HEADER_SIZE = 40;  // IP + TCP headers
47const uint32 TCP_MSS = 1400;  // Maximum segment size
48
49// Note: The current algorithm doesn't work for sample sizes smaller than this.
50const int NUM_SAMPLES = 1000;
51
52enum {
53  MSG_ID_PACKET,
54  MSG_ID_CONNECT,
55  MSG_ID_DISCONNECT,
56};
57
58// Packets are passed between sockets as messages.  We copy the data just like
59// the kernel does.
60class Packet : public MessageData {
61 public:
62  Packet(const char* data, size_t size, const SocketAddress& from)
63        : size_(size), consumed_(0), from_(from) {
64    ASSERT(NULL != data);
65    data_ = new char[size_];
66    memcpy(data_, data, size_);
67  }
68
69  virtual ~Packet() {
70    delete[] data_;
71  }
72
73  const char* data() const { return data_ + consumed_; }
74  size_t size() const { return size_ - consumed_; }
75  const SocketAddress& from() const { return from_; }
76
77  // Remove the first size bytes from the data.
78  void Consume(size_t size) {
79    ASSERT(size + consumed_ < size_);
80    consumed_ += size;
81  }
82
83 private:
84  char* data_;
85  size_t size_, consumed_;
86  SocketAddress from_;
87};
88
89struct MessageAddress : public MessageData {
90  explicit MessageAddress(const SocketAddress& a) : addr(a) { }
91  SocketAddress addr;
92};
93
94// Implements the socket interface using the virtual network.  Packets are
95// passed as messages using the message queue of the socket server.
96class VirtualSocket : public AsyncSocket, public MessageHandler {
97 public:
98  VirtualSocket(VirtualSocketServer* server, int family, int type, bool async)
99      : server_(server), family_(family), type_(type), async_(async),
100        state_(CS_CLOSED), error_(0), listen_queue_(NULL),
101        write_enabled_(false),
102        network_size_(0), recv_buffer_size_(0), bound_(false), was_any_(false) {
103    ASSERT((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM));
104    ASSERT(async_ || (type_ != SOCK_STREAM));  // We only support async streams
105  }
106
107  virtual ~VirtualSocket() {
108    Close();
109
110    for (RecvBuffer::iterator it = recv_buffer_.begin();
111         it != recv_buffer_.end(); ++it) {
112      delete *it;
113    }
114  }
115
116  virtual SocketAddress GetLocalAddress() const {
117    return local_addr_;
118  }
119
120  virtual SocketAddress GetRemoteAddress() const {
121    return remote_addr_;
122  }
123
124  // Used by server sockets to set the local address without binding.
125  void SetLocalAddress(const SocketAddress& addr) {
126    local_addr_ = addr;
127  }
128
129  virtual int Bind(const SocketAddress& addr) {
130    if (!local_addr_.IsNil()) {
131      error_ = EINVAL;
132      return -1;
133    }
134    local_addr_ = addr;
135    int result = server_->Bind(this, &local_addr_);
136    if (result != 0) {
137      local_addr_.Clear();
138      error_ = EADDRINUSE;
139    } else {
140      bound_ = true;
141      was_any_ = addr.IsAnyIP();
142    }
143    return result;
144  }
145
146  virtual int Connect(const SocketAddress& addr) {
147    return InitiateConnect(addr, true);
148  }
149
150  virtual int Close() {
151    if (!local_addr_.IsNil() && bound_) {
152      // Remove from the binding table.
153      server_->Unbind(local_addr_, this);
154      bound_ = false;
155    }
156
157    if (SOCK_STREAM == type_) {
158      // Cancel pending sockets
159      if (listen_queue_) {
160        while (!listen_queue_->empty()) {
161          SocketAddress addr = listen_queue_->front();
162
163          // Disconnect listening socket.
164          server_->Disconnect(server_->LookupBinding(addr));
165          listen_queue_->pop_front();
166        }
167        delete listen_queue_;
168        listen_queue_ = NULL;
169      }
170      // Disconnect stream sockets
171      if (CS_CONNECTED == state_) {
172        // Disconnect remote socket, check if it is a child of a server socket.
173        VirtualSocket* socket =
174            server_->LookupConnection(local_addr_, remote_addr_);
175        if (!socket) {
176          // Not a server socket child, then see if it is bound.
177          // TODO: If this is indeed a server socket that has no
178          // children this will cause the server socket to be
179          // closed. This might lead to unexpected results, how to fix this?
180          socket = server_->LookupBinding(remote_addr_);
181        }
182        server_->Disconnect(socket);
183
184        // Remove mapping for both directions.
185        server_->RemoveConnection(remote_addr_, local_addr_);
186        server_->RemoveConnection(local_addr_, remote_addr_);
187      }
188      // Cancel potential connects
189      MessageList msgs;
190      if (server_->msg_queue_) {
191        server_->msg_queue_->Clear(this, MSG_ID_CONNECT, &msgs);
192      }
193      for (MessageList::iterator it = msgs.begin(); it != msgs.end(); ++it) {
194        ASSERT(NULL != it->pdata);
195        MessageAddress* data = static_cast<MessageAddress*>(it->pdata);
196
197        // Lookup remote side.
198        VirtualSocket* socket = server_->LookupConnection(local_addr_,
199                                                          data->addr);
200        if (socket) {
201          // Server socket, remote side is a socket retreived by
202          // accept. Accepted sockets are not bound so we will not
203          // find it by looking in the bindings table.
204          server_->Disconnect(socket);
205          server_->RemoveConnection(local_addr_, data->addr);
206        } else {
207          server_->Disconnect(server_->LookupBinding(data->addr));
208        }
209        delete data;
210      }
211      // Clear incoming packets and disconnect messages
212      if (server_->msg_queue_) {
213        server_->msg_queue_->Clear(this);
214      }
215    }
216
217    state_ = CS_CLOSED;
218    local_addr_.Clear();
219    remote_addr_.Clear();
220    return 0;
221  }
222
223  virtual int Send(const void *pv, size_t cb) {
224    if (CS_CONNECTED != state_) {
225      error_ = ENOTCONN;
226      return -1;
227    }
228    if (SOCK_DGRAM == type_) {
229      return SendUdp(pv, cb, remote_addr_);
230    } else {
231      return SendTcp(pv, cb);
232    }
233  }
234
235  virtual int SendTo(const void *pv, size_t cb, const SocketAddress& addr) {
236    if (SOCK_DGRAM == type_) {
237      return SendUdp(pv, cb, addr);
238    } else {
239      if (CS_CONNECTED != state_) {
240        error_ = ENOTCONN;
241        return -1;
242      }
243      return SendTcp(pv, cb);
244    }
245  }
246
247  virtual int Recv(void *pv, size_t cb) {
248    SocketAddress addr;
249    return RecvFrom(pv, cb, &addr);
250  }
251
252  virtual int RecvFrom(void *pv, size_t cb, SocketAddress *paddr) {
253    // If we don't have a packet, then either error or wait for one to arrive.
254    if (recv_buffer_.empty()) {
255      if (async_) {
256        error_ = EAGAIN;
257        return -1;
258      }
259      while (recv_buffer_.empty()) {
260        Message msg;
261        server_->msg_queue_->Get(&msg);
262        server_->msg_queue_->Dispatch(&msg);
263      }
264    }
265
266    // Return the packet at the front of the queue.
267    Packet* packet = recv_buffer_.front();
268    size_t data_read = _min(cb, packet->size());
269    memcpy(pv, packet->data(), data_read);
270    *paddr = packet->from();
271
272    if (data_read < packet->size()) {
273      packet->Consume(data_read);
274    } else {
275      recv_buffer_.pop_front();
276      delete packet;
277    }
278
279    if (SOCK_STREAM == type_) {
280      bool was_full = (recv_buffer_size_ == server_->recv_buffer_capacity_);
281      recv_buffer_size_ -= data_read;
282      if (was_full) {
283        VirtualSocket* sender = server_->LookupBinding(remote_addr_);
284        ASSERT(NULL != sender);
285        server_->SendTcp(sender);
286      }
287    }
288
289    return static_cast<int>(data_read);
290  }
291
292  virtual int Listen(int backlog) {
293    ASSERT(SOCK_STREAM == type_);
294    ASSERT(CS_CLOSED == state_);
295    if (local_addr_.IsNil()) {
296      error_ = EINVAL;
297      return -1;
298    }
299    ASSERT(NULL == listen_queue_);
300    listen_queue_ = new ListenQueue;
301    state_ = CS_CONNECTING;
302    return 0;
303  }
304
305  virtual VirtualSocket* Accept(SocketAddress *paddr) {
306    if (NULL == listen_queue_) {
307      error_ = EINVAL;
308      return NULL;
309    }
310    while (!listen_queue_->empty()) {
311      VirtualSocket* socket = new VirtualSocket(server_, AF_INET, type_,
312                                                async_);
313
314      // Set the new local address to the same as this server socket.
315      socket->SetLocalAddress(local_addr_);
316      // Sockets made from a socket that 'was Any' need to inherit that.
317      socket->set_was_any(was_any_);
318      SocketAddress remote_addr(listen_queue_->front());
319      int result = socket->InitiateConnect(remote_addr, false);
320      listen_queue_->pop_front();
321      if (result != 0) {
322        delete socket;
323        continue;
324      }
325      socket->CompleteConnect(remote_addr, false);
326      if (paddr) {
327        *paddr = remote_addr;
328      }
329      return socket;
330    }
331    error_ = EWOULDBLOCK;
332    return NULL;
333  }
334
335  virtual int GetError() const {
336    return error_;
337  }
338
339  virtual void SetError(int error) {
340    error_ = error;
341  }
342
343  virtual ConnState GetState() const {
344    return state_;
345  }
346
347  virtual int GetOption(Option opt, int* value) {
348    OptionsMap::const_iterator it = options_map_.find(opt);
349    if (it == options_map_.end()) {
350      return -1;
351    }
352    *value = it->second;
353    return 0;  // 0 is success to emulate getsockopt()
354  }
355
356  virtual int SetOption(Option opt, int value) {
357    options_map_[opt] = value;
358    return 0;  // 0 is success to emulate setsockopt()
359  }
360
361  virtual int EstimateMTU(uint16* mtu) {
362    if (CS_CONNECTED != state_)
363      return ENOTCONN;
364    else
365      return 65536;
366  }
367
368  void OnMessage(Message *pmsg) {
369    if (pmsg->message_id == MSG_ID_PACKET) {
370      //ASSERT(!local_addr_.IsAny());
371      ASSERT(NULL != pmsg->pdata);
372      Packet* packet = static_cast<Packet*>(pmsg->pdata);
373
374      recv_buffer_.push_back(packet);
375
376      if (async_) {
377        SignalReadEvent(this);
378      }
379    } else if (pmsg->message_id == MSG_ID_CONNECT) {
380      ASSERT(NULL != pmsg->pdata);
381      MessageAddress* data = static_cast<MessageAddress*>(pmsg->pdata);
382      if (listen_queue_ != NULL) {
383        listen_queue_->push_back(data->addr);
384        if (async_) {
385          SignalReadEvent(this);
386        }
387      } else if ((SOCK_STREAM == type_) && (CS_CONNECTING == state_)) {
388        CompleteConnect(data->addr, true);
389      } else {
390        LOG(LS_VERBOSE) << "Socket at " << local_addr_ << " is not listening";
391        server_->Disconnect(server_->LookupBinding(data->addr));
392      }
393      delete data;
394    } else if (pmsg->message_id == MSG_ID_DISCONNECT) {
395      ASSERT(SOCK_STREAM == type_);
396      if (CS_CLOSED != state_) {
397        int error = (CS_CONNECTING == state_) ? ECONNREFUSED : 0;
398        state_ = CS_CLOSED;
399        remote_addr_.Clear();
400        if (async_) {
401          SignalCloseEvent(this, error);
402        }
403      }
404    } else {
405      ASSERT(false);
406    }
407  }
408
409  bool was_any() { return was_any_; }
410  void set_was_any(bool was_any) { was_any_ = was_any; }
411
412 private:
413  struct NetworkEntry {
414    size_t size;
415    uint32 done_time;
416  };
417
418  typedef std::deque<SocketAddress> ListenQueue;
419  typedef std::deque<NetworkEntry> NetworkQueue;
420  typedef std::vector<char> SendBuffer;
421  typedef std::list<Packet*> RecvBuffer;
422  typedef std::map<Option, int> OptionsMap;
423
424  int InitiateConnect(const SocketAddress& addr, bool use_delay) {
425    if (!remote_addr_.IsNil()) {
426      error_ = (CS_CONNECTED == state_) ? EISCONN : EINPROGRESS;
427      return -1;
428    }
429    if (local_addr_.IsNil()) {
430      // If there's no local address set, grab a random one in the correct AF.
431      int result = 0;
432      if (addr.ipaddr().family() == AF_INET) {
433        result = Bind(SocketAddress("0.0.0.0", 0));
434      } else if (addr.ipaddr().family() == AF_INET6) {
435        result = Bind(SocketAddress("::", 0));
436      }
437      if (result != 0) {
438        return result;
439      }
440    }
441    if (type_ == SOCK_DGRAM) {
442      remote_addr_ = addr;
443      state_ = CS_CONNECTED;
444    } else {
445      int result = server_->Connect(this, addr, use_delay);
446      if (result != 0) {
447        error_ = EHOSTUNREACH;
448        return -1;
449      }
450      state_ = CS_CONNECTING;
451    }
452    return 0;
453  }
454
455  void CompleteConnect(const SocketAddress& addr, bool notify) {
456    ASSERT(CS_CONNECTING == state_);
457    remote_addr_ = addr;
458    state_ = CS_CONNECTED;
459    server_->AddConnection(remote_addr_, local_addr_, this);
460    if (async_ && notify) {
461      SignalConnectEvent(this);
462    }
463  }
464
465  int SendUdp(const void* pv, size_t cb, const SocketAddress& addr) {
466    // If we have not been assigned a local port, then get one.
467    if (local_addr_.IsNil()) {
468      local_addr_ = EmptySocketAddressWithFamily(addr.ipaddr().family());
469      int result = server_->Bind(this, &local_addr_);
470      if (result != 0) {
471        local_addr_.Clear();
472        error_ = EADDRINUSE;
473        return result;
474      }
475    }
476
477    // Send the data in a message to the appropriate socket.
478    return server_->SendUdp(this, static_cast<const char*>(pv), cb, addr);
479  }
480
481  int SendTcp(const void* pv, size_t cb) {
482    size_t capacity = server_->send_buffer_capacity_ - send_buffer_.size();
483    if (0 == capacity) {
484      write_enabled_ = true;
485      error_ = EWOULDBLOCK;
486      return -1;
487    }
488    size_t consumed = _min(cb, capacity);
489    const char* cpv = static_cast<const char*>(pv);
490    send_buffer_.insert(send_buffer_.end(), cpv, cpv + consumed);
491    server_->SendTcp(this);
492    return static_cast<int>(consumed);
493  }
494
495  VirtualSocketServer* server_;
496  int family_;
497  int type_;
498  bool async_;
499  ConnState state_;
500  int error_;
501  SocketAddress local_addr_;
502  SocketAddress remote_addr_;
503
504  // Pending sockets which can be Accepted
505  ListenQueue* listen_queue_;
506
507  // Data which tcp has buffered for sending
508  SendBuffer send_buffer_;
509  bool write_enabled_;
510
511  // Critical section to protect the recv_buffer and queue_
512  CriticalSection crit_;
513
514  // Network model that enforces bandwidth and capacity constraints
515  NetworkQueue network_;
516  size_t network_size_;
517
518  // Data which has been received from the network
519  RecvBuffer recv_buffer_;
520  // The amount of data which is in flight or in recv_buffer_
521  size_t recv_buffer_size_;
522
523  // Is this socket bound?
524  bool bound_;
525
526  // When we bind a socket to Any, VSS's Bind gives it another address. For
527  // dual-stack sockets, we want to distinguish between sockets that were
528  // explicitly given a particular address and sockets that had one picked
529  // for them by VSS.
530  bool was_any_;
531
532  // Store the options that are set
533  OptionsMap options_map_;
534
535  friend class VirtualSocketServer;
536};
537
538VirtualSocketServer::VirtualSocketServer(SocketServer* ss)
539    : server_(ss), server_owned_(false), msg_queue_(NULL), stop_on_idle_(false),
540      network_delay_(Time()), next_ipv4_(kInitialNextIPv4),
541      next_ipv6_(kInitialNextIPv6), next_port_(kFirstEphemeralPort),
542      bindings_(new AddressMap()), connections_(new ConnectionMap()),
543      bandwidth_(0), network_capacity_(kDefaultNetworkCapacity),
544      send_buffer_capacity_(kDefaultTcpBufferSize),
545      recv_buffer_capacity_(kDefaultTcpBufferSize),
546      delay_mean_(0), delay_stddev_(0), delay_samples_(NUM_SAMPLES),
547      delay_dist_(NULL), drop_prob_(0.0) {
548  if (!server_) {
549    server_ = new PhysicalSocketServer();
550    server_owned_ = true;
551  }
552  UpdateDelayDistribution();
553}
554
555VirtualSocketServer::~VirtualSocketServer() {
556  delete bindings_;
557  delete connections_;
558  delete delay_dist_;
559  if (server_owned_) {
560    delete server_;
561  }
562}
563
564IPAddress VirtualSocketServer::GetNextIP(int family) {
565  if (family == AF_INET) {
566    IPAddress next_ip(next_ipv4_);
567    next_ipv4_.s_addr =
568        HostToNetwork32(NetworkToHost32(next_ipv4_.s_addr) + 1);
569    return next_ip;
570  } else if (family == AF_INET6) {
571    IPAddress next_ip(next_ipv6_);
572    uint32* as_ints = reinterpret_cast<uint32*>(&next_ipv6_.s6_addr);
573    as_ints[3] += 1;
574    return next_ip;
575  }
576  return IPAddress();
577}
578
579uint16 VirtualSocketServer::GetNextPort() {
580  uint16 port = next_port_;
581  if (next_port_ < kLastEphemeralPort) {
582    ++next_port_;
583  } else {
584    next_port_ = kFirstEphemeralPort;
585  }
586  return port;
587}
588
589Socket* VirtualSocketServer::CreateSocket(int type) {
590  return CreateSocket(AF_INET, type);
591}
592
593Socket* VirtualSocketServer::CreateSocket(int family, int type) {
594  return CreateSocketInternal(family, type);
595}
596
597AsyncSocket* VirtualSocketServer::CreateAsyncSocket(int type) {
598  return CreateAsyncSocket(AF_INET, type);
599}
600
601AsyncSocket* VirtualSocketServer::CreateAsyncSocket(int family, int type) {
602  return CreateSocketInternal(family, type);
603}
604
605VirtualSocket* VirtualSocketServer::CreateSocketInternal(int family, int type) {
606  return new VirtualSocket(this, family, type, true);
607}
608
609void VirtualSocketServer::SetMessageQueue(MessageQueue* msg_queue) {
610  msg_queue_ = msg_queue;
611  if (msg_queue_) {
612    msg_queue_->SignalQueueDestroyed.connect(this,
613        &VirtualSocketServer::OnMessageQueueDestroyed);
614  }
615}
616
617bool VirtualSocketServer::Wait(int cmsWait, bool process_io) {
618  ASSERT(msg_queue_ == Thread::Current());
619  if (stop_on_idle_ && Thread::Current()->empty()) {
620    return false;
621  }
622  return socketserver()->Wait(cmsWait, process_io);
623}
624
625void VirtualSocketServer::WakeUp() {
626  socketserver()->WakeUp();
627}
628
629bool VirtualSocketServer::ProcessMessagesUntilIdle() {
630  ASSERT(msg_queue_ == Thread::Current());
631  stop_on_idle_ = true;
632  while (!msg_queue_->empty()) {
633    Message msg;
634    if (msg_queue_->Get(&msg, kForever)) {
635      msg_queue_->Dispatch(&msg);
636    }
637  }
638  stop_on_idle_ = false;
639  return !msg_queue_->IsQuitting();
640}
641
642void VirtualSocketServer::SetNextPortForTesting(uint16 port) {
643  next_port_ = port;
644}
645
646int VirtualSocketServer::Bind(VirtualSocket* socket,
647                              const SocketAddress& addr) {
648  ASSERT(NULL != socket);
649  // Address must be completely specified at this point
650  ASSERT(!IPIsUnspec(addr.ipaddr()));
651  ASSERT(addr.port() != 0);
652
653  // Normalize the address (turns v6-mapped addresses into v4-addresses).
654  SocketAddress normalized(addr.ipaddr().Normalized(), addr.port());
655
656  AddressMap::value_type entry(normalized, socket);
657  return bindings_->insert(entry).second ? 0 : -1;
658}
659
660int VirtualSocketServer::Bind(VirtualSocket* socket, SocketAddress* addr) {
661  ASSERT(NULL != socket);
662
663  if (IPIsAny(addr->ipaddr())) {
664    addr->SetIP(GetNextIP(addr->ipaddr().family()));
665  } else if (!IPIsUnspec(addr->ipaddr())) {
666    addr->SetIP(addr->ipaddr().Normalized());
667  } else {
668    ASSERT(false);
669  }
670
671  if (addr->port() == 0) {
672    for (int i = 0; i < kEphemeralPortCount; ++i) {
673      addr->SetPort(GetNextPort());
674      if (bindings_->find(*addr) == bindings_->end()) {
675        break;
676      }
677    }
678  }
679
680  return Bind(socket, *addr);
681}
682
683VirtualSocket* VirtualSocketServer::LookupBinding(const SocketAddress& addr) {
684  SocketAddress normalized(addr.ipaddr().Normalized(),
685                           addr.port());
686  AddressMap::iterator it = bindings_->find(normalized);
687  return (bindings_->end() != it) ? it->second : NULL;
688}
689
690int VirtualSocketServer::Unbind(const SocketAddress& addr,
691                                VirtualSocket* socket) {
692  SocketAddress normalized(addr.ipaddr().Normalized(),
693                           addr.port());
694  ASSERT((*bindings_)[normalized] == socket);
695  bindings_->erase(bindings_->find(normalized));
696  return 0;
697}
698
699void VirtualSocketServer::AddConnection(const SocketAddress& local,
700                                        const SocketAddress& remote,
701                                        VirtualSocket* remote_socket) {
702  // Add this socket pair to our routing table. This will allow
703  // multiple clients to connect to the same server address.
704  SocketAddress local_normalized(local.ipaddr().Normalized(),
705                                 local.port());
706  SocketAddress remote_normalized(remote.ipaddr().Normalized(),
707                                  remote.port());
708  SocketAddressPair address_pair(local_normalized, remote_normalized);
709  connections_->insert(std::pair<SocketAddressPair,
710                       VirtualSocket*>(address_pair, remote_socket));
711}
712
713VirtualSocket* VirtualSocketServer::LookupConnection(
714    const SocketAddress& local,
715    const SocketAddress& remote) {
716  SocketAddress local_normalized(local.ipaddr().Normalized(),
717                                 local.port());
718  SocketAddress remote_normalized(remote.ipaddr().Normalized(),
719                                  remote.port());
720  SocketAddressPair address_pair(local_normalized, remote_normalized);
721  ConnectionMap::iterator it = connections_->find(address_pair);
722  return (connections_->end() != it) ? it->second : NULL;
723}
724
725void VirtualSocketServer::RemoveConnection(const SocketAddress& local,
726                                           const SocketAddress& remote) {
727  SocketAddress local_normalized(local.ipaddr().Normalized(),
728                                local.port());
729  SocketAddress remote_normalized(remote.ipaddr().Normalized(),
730                                 remote.port());
731  SocketAddressPair address_pair(local_normalized, remote_normalized);
732  connections_->erase(address_pair);
733}
734
735static double Random() {
736  return static_cast<double>(rand()) / RAND_MAX;
737}
738
739int VirtualSocketServer::Connect(VirtualSocket* socket,
740                                 const SocketAddress& remote_addr,
741                                 bool use_delay) {
742  uint32 delay = use_delay ? GetRandomTransitDelay() : 0;
743  VirtualSocket* remote = LookupBinding(remote_addr);
744  if (!CanInteractWith(socket, remote)) {
745    LOG(LS_INFO) << "Address family mismatch between "
746                 << socket->GetLocalAddress() << " and " << remote_addr;
747    return -1;
748  }
749  if (remote != NULL) {
750    SocketAddress addr = socket->GetLocalAddress();
751    msg_queue_->PostDelayed(delay, remote, MSG_ID_CONNECT,
752                            new MessageAddress(addr));
753  } else {
754    LOG(LS_INFO) << "No one listening at " << remote_addr;
755    msg_queue_->PostDelayed(delay, socket, MSG_ID_DISCONNECT);
756  }
757  return 0;
758}
759
760bool VirtualSocketServer::Disconnect(VirtualSocket* socket) {
761  if (socket) {
762    // Remove the mapping.
763    msg_queue_->Post(socket, MSG_ID_DISCONNECT);
764    return true;
765  }
766  return false;
767}
768
769int VirtualSocketServer::SendUdp(VirtualSocket* socket,
770                                 const char* data, size_t data_size,
771                                 const SocketAddress& remote_addr) {
772  // See if we want to drop this packet.
773  if (Random() < drop_prob_) {
774    LOG(LS_VERBOSE) << "Dropping packet: bad luck";
775    return static_cast<int>(data_size);
776  }
777
778  VirtualSocket* recipient = LookupBinding(remote_addr);
779  if (!recipient) {
780    // Make a fake recipient for address family checking.
781    scoped_ptr<VirtualSocket> dummy_socket(
782        CreateSocketInternal(AF_INET, SOCK_DGRAM));
783    dummy_socket->SetLocalAddress(remote_addr);
784    if (!CanInteractWith(socket, dummy_socket.get())) {
785      LOG(LS_VERBOSE) << "Incompatible address families: "
786                      << socket->GetLocalAddress() << " and " << remote_addr;
787      return -1;
788    }
789    LOG(LS_VERBOSE) << "No one listening at " << remote_addr;
790    return static_cast<int>(data_size);
791  }
792
793  if (!CanInteractWith(socket, recipient)) {
794    LOG(LS_VERBOSE) << "Incompatible address families: "
795                    << socket->GetLocalAddress() << " and " << remote_addr;
796    return -1;
797  }
798
799  CritScope cs(&socket->crit_);
800
801  uint32 cur_time = Time();
802  PurgeNetworkPackets(socket, cur_time);
803
804  // Determine whether we have enough bandwidth to accept this packet.  To do
805  // this, we need to update the send queue.  Once we know it's current size,
806  // we know whether we can fit this packet.
807  //
808  // NOTE: There are better algorithms for maintaining such a queue (such as
809  // "Derivative Random Drop"); however, this algorithm is a more accurate
810  // simulation of what a normal network would do.
811
812  size_t packet_size = data_size + UDP_HEADER_SIZE;
813  if (socket->network_size_ + packet_size > network_capacity_) {
814    LOG(LS_VERBOSE) << "Dropping packet: network capacity exceeded";
815    return static_cast<int>(data_size);
816  }
817
818  AddPacketToNetwork(socket, recipient, cur_time, data, data_size,
819                     UDP_HEADER_SIZE, false);
820
821  return static_cast<int>(data_size);
822}
823
824void VirtualSocketServer::SendTcp(VirtualSocket* socket) {
825  // TCP can't send more data than will fill up the receiver's buffer.
826  // We track the data that is in the buffer plus data in flight using the
827  // recipient's recv_buffer_size_.  Anything beyond that must be stored in the
828  // sender's buffer.  We will trigger the buffered data to be sent when data
829  // is read from the recv_buffer.
830
831  // Lookup the local/remote pair in the connections table.
832  VirtualSocket* recipient = LookupConnection(socket->local_addr_,
833                                              socket->remote_addr_);
834  if (!recipient) {
835    LOG(LS_VERBOSE) << "Sending data to no one.";
836    return;
837  }
838
839  CritScope cs(&socket->crit_);
840
841  uint32 cur_time = Time();
842  PurgeNetworkPackets(socket, cur_time);
843
844  while (true) {
845    size_t available = recv_buffer_capacity_ - recipient->recv_buffer_size_;
846    size_t max_data_size = _min<size_t>(available, TCP_MSS - TCP_HEADER_SIZE);
847    size_t data_size = _min(socket->send_buffer_.size(), max_data_size);
848    if (0 == data_size)
849      break;
850
851    AddPacketToNetwork(socket, recipient, cur_time, &socket->send_buffer_[0],
852                       data_size, TCP_HEADER_SIZE, true);
853    recipient->recv_buffer_size_ += data_size;
854
855    size_t new_buffer_size = socket->send_buffer_.size() - data_size;
856    // Avoid undefined access beyond the last element of the vector.
857    // This only happens when new_buffer_size is 0.
858    if (data_size < socket->send_buffer_.size()) {
859      // memmove is required for potentially overlapping source/destination.
860      memmove(&socket->send_buffer_[0], &socket->send_buffer_[data_size],
861              new_buffer_size);
862    }
863    socket->send_buffer_.resize(new_buffer_size);
864  }
865
866  if (socket->write_enabled_
867      && (socket->send_buffer_.size() < send_buffer_capacity_)) {
868    socket->write_enabled_ = false;
869    socket->SignalWriteEvent(socket);
870  }
871}
872
873void VirtualSocketServer::AddPacketToNetwork(VirtualSocket* sender,
874                                             VirtualSocket* recipient,
875                                             uint32 cur_time,
876                                             const char* data,
877                                             size_t data_size,
878                                             size_t header_size,
879                                             bool ordered) {
880  VirtualSocket::NetworkEntry entry;
881  entry.size = data_size + header_size;
882
883  sender->network_size_ += entry.size;
884  uint32 send_delay = SendDelay(static_cast<uint32>(sender->network_size_));
885  entry.done_time = cur_time + send_delay;
886  sender->network_.push_back(entry);
887
888  // Find the delay for crossing the many virtual hops of the network.
889  uint32 transit_delay = GetRandomTransitDelay();
890
891  // Post the packet as a message to be delivered (on our own thread)
892  Packet* p = new Packet(data, data_size, sender->local_addr_);
893  uint32 ts = TimeAfter(send_delay + transit_delay);
894  if (ordered) {
895    // Ensure that new packets arrive after previous ones
896    // TODO: consider ordering on a per-socket basis, since this
897    // introduces artifical delay.
898    ts = TimeMax(ts, network_delay_);
899  }
900  msg_queue_->PostAt(ts, recipient, MSG_ID_PACKET, p);
901  network_delay_ = TimeMax(ts, network_delay_);
902}
903
904void VirtualSocketServer::PurgeNetworkPackets(VirtualSocket* socket,
905                                              uint32 cur_time) {
906  while (!socket->network_.empty() &&
907         (socket->network_.front().done_time <= cur_time)) {
908    ASSERT(socket->network_size_ >= socket->network_.front().size);
909    socket->network_size_ -= socket->network_.front().size;
910    socket->network_.pop_front();
911  }
912}
913
914uint32 VirtualSocketServer::SendDelay(uint32 size) {
915  if (bandwidth_ == 0)
916    return 0;
917  else
918    return 1000 * size / bandwidth_;
919}
920
921#if 0
922void PrintFunction(std::vector<std::pair<double, double> >* f) {
923  return;
924  double sum = 0;
925  for (uint32 i = 0; i < f->size(); ++i) {
926    std::cout << (*f)[i].first << '\t' << (*f)[i].second << std::endl;
927    sum += (*f)[i].second;
928  }
929  if (!f->empty()) {
930    const double mean = sum / f->size();
931    double sum_sq_dev = 0;
932    for (uint32 i = 0; i < f->size(); ++i) {
933      double dev = (*f)[i].second - mean;
934      sum_sq_dev += dev * dev;
935    }
936    std::cout << "Mean = " << mean << " StdDev = "
937              << sqrt(sum_sq_dev / f->size()) << std::endl;
938  }
939}
940#endif  // <unused>
941
942void VirtualSocketServer::UpdateDelayDistribution() {
943  Function* dist = CreateDistribution(delay_mean_, delay_stddev_,
944                                      delay_samples_);
945  // We take a lock just to make sure we don't leak memory.
946  {
947    CritScope cs(&delay_crit_);
948    delete delay_dist_;
949    delay_dist_ = dist;
950  }
951}
952
953static double PI = 4 * atan(1.0);
954
955static double Normal(double x, double mean, double stddev) {
956  double a = (x - mean) * (x - mean) / (2 * stddev * stddev);
957  return exp(-a) / (stddev * sqrt(2 * PI));
958}
959
960#if 0  // static unused gives a warning
961static double Pareto(double x, double min, double k) {
962  if (x < min)
963    return 0;
964  else
965    return k * std::pow(min, k) / std::pow(x, k+1);
966}
967#endif
968
969VirtualSocketServer::Function* VirtualSocketServer::CreateDistribution(
970    uint32 mean, uint32 stddev, uint32 samples) {
971  Function* f = new Function();
972
973  if (0 == stddev) {
974    f->push_back(Point(mean, 1.0));
975  } else {
976    double start = 0;
977    if (mean >= 4 * static_cast<double>(stddev))
978      start = mean - 4 * static_cast<double>(stddev);
979    double end = mean + 4 * static_cast<double>(stddev);
980
981    for (uint32 i = 0; i < samples; i++) {
982      double x = start + (end - start) * i / (samples - 1);
983      double y = Normal(x, mean, stddev);
984      f->push_back(Point(x, y));
985    }
986  }
987  return Resample(Invert(Accumulate(f)), 0, 1, samples);
988}
989
990uint32 VirtualSocketServer::GetRandomTransitDelay() {
991  size_t index = rand() % delay_dist_->size();
992  double delay = (*delay_dist_)[index].second;
993  //LOG_F(LS_INFO) << "random[" << index << "] = " << delay;
994  return static_cast<uint32>(delay);
995}
996
997struct FunctionDomainCmp {
998  bool operator()(const VirtualSocketServer::Point& p1,
999                   const VirtualSocketServer::Point& p2) {
1000    return p1.first < p2.first;
1001  }
1002  bool operator()(double v1, const VirtualSocketServer::Point& p2) {
1003    return v1 < p2.first;
1004  }
1005  bool operator()(const VirtualSocketServer::Point& p1, double v2) {
1006    return p1.first < v2;
1007  }
1008};
1009
1010VirtualSocketServer::Function* VirtualSocketServer::Accumulate(Function* f) {
1011  ASSERT(f->size() >= 1);
1012  double v = 0;
1013  for (Function::size_type i = 0; i < f->size() - 1; ++i) {
1014    double dx = (*f)[i + 1].first - (*f)[i].first;
1015    double avgy = ((*f)[i + 1].second + (*f)[i].second) / 2;
1016    (*f)[i].second = v;
1017    v = v + dx * avgy;
1018  }
1019  (*f)[f->size()-1].second = v;
1020  return f;
1021}
1022
1023VirtualSocketServer::Function* VirtualSocketServer::Invert(Function* f) {
1024  for (Function::size_type i = 0; i < f->size(); ++i)
1025    std::swap((*f)[i].first, (*f)[i].second);
1026
1027  std::sort(f->begin(), f->end(), FunctionDomainCmp());
1028  return f;
1029}
1030
1031VirtualSocketServer::Function* VirtualSocketServer::Resample(
1032    Function* f, double x1, double x2, uint32 samples) {
1033  Function* g = new Function();
1034
1035  for (size_t i = 0; i < samples; i++) {
1036    double x = x1 + (x2 - x1) * i / (samples - 1);
1037    double y = Evaluate(f, x);
1038    g->push_back(Point(x, y));
1039  }
1040
1041  delete f;
1042  return g;
1043}
1044
1045double VirtualSocketServer::Evaluate(Function* f, double x) {
1046  Function::iterator iter =
1047      std::lower_bound(f->begin(), f->end(), x, FunctionDomainCmp());
1048  if (iter == f->begin()) {
1049    return (*f)[0].second;
1050  } else if (iter == f->end()) {
1051    ASSERT(f->size() >= 1);
1052    return (*f)[f->size() - 1].second;
1053  } else if (iter->first == x) {
1054    return iter->second;
1055  } else {
1056    double x1 = (iter - 1)->first;
1057    double y1 = (iter - 1)->second;
1058    double x2 = iter->first;
1059    double y2 = iter->second;
1060    return y1 + (y2 - y1) * (x - x1) / (x2 - x1);
1061  }
1062}
1063
1064bool VirtualSocketServer::CanInteractWith(VirtualSocket* local,
1065                                          VirtualSocket* remote) {
1066  if (!local || !remote) {
1067    return false;
1068  }
1069  IPAddress local_ip = local->GetLocalAddress().ipaddr();
1070  IPAddress remote_ip = remote->GetLocalAddress().ipaddr();
1071  IPAddress local_normalized = local_ip.Normalized();
1072  IPAddress remote_normalized = remote_ip.Normalized();
1073  // Check if the addresses are the same family after Normalization (turns
1074  // mapped IPv6 address into IPv4 addresses).
1075  // This will stop unmapped V6 addresses from talking to mapped V6 addresses.
1076  if (local_normalized.family() == remote_normalized.family()) {
1077    return true;
1078  }
1079
1080  // If ip1 is IPv4 and ip2 is :: and ip2 is not IPV6_V6ONLY.
1081  int remote_v6_only = 0;
1082  remote->GetOption(Socket::OPT_IPV6_V6ONLY, &remote_v6_only);
1083  if (local_ip.family() == AF_INET && !remote_v6_only && IPIsAny(remote_ip)) {
1084    return true;
1085  }
1086  // Same check, backwards.
1087  int local_v6_only = 0;
1088  local->GetOption(Socket::OPT_IPV6_V6ONLY, &local_v6_only);
1089  if (remote_ip.family() == AF_INET && !local_v6_only && IPIsAny(local_ip)) {
1090    return true;
1091  }
1092
1093  // Check to see if either socket was explicitly bound to IPv6-any.
1094  // These sockets can talk with anyone.
1095  if (local_ip.family() == AF_INET6 && local->was_any()) {
1096    return true;
1097  }
1098  if (remote_ip.family() == AF_INET6 && remote->was_any()) {
1099    return true;
1100  }
1101
1102  return false;
1103}
1104
1105}  // namespace rtc
1106