1/*
2 *  Copyright 2006 The WebRTC Project Authors. All rights reserved.
3 *
4 *  Use of this source code is governed by a BSD-style license
5 *  that can be found in the LICENSE file in the root of the source
6 *  tree. An additional intellectual property rights grant can be found
7 *  in the file PATENTS.  All contributing project authors may
8 *  be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include <math.h>
12#include <time.h>
13#if defined(WEBRTC_POSIX)
14#include <netinet/in.h>
15#endif
16
17#include "webrtc/base/logging.h"
18#include "webrtc/base/gunit.h"
19#include "webrtc/base/testclient.h"
20#include "webrtc/base/testutils.h"
21#include "webrtc/base/thread.h"
22#include "webrtc/base/timeutils.h"
23#include "webrtc/base/virtualsocketserver.h"
24#include "webrtc/test/testsupport/gtest_disable.h"
25
26using namespace rtc;
27
28// Sends at a constant rate but with random packet sizes.
29struct Sender : public MessageHandler {
30  Sender(Thread* th, AsyncSocket* s, uint32 rt)
31      : thread(th), socket(new AsyncUDPSocket(s)),
32        done(false), rate(rt), count(0) {
33    last_send = rtc::Time();
34    thread->PostDelayed(NextDelay(), this, 1);
35  }
36
37  uint32 NextDelay() {
38    uint32 size = (rand() % 4096) + 1;
39    return 1000 * size / rate;
40  }
41
42  void OnMessage(Message* pmsg) {
43    ASSERT_EQ(1u, pmsg->message_id);
44
45    if (done)
46      return;
47
48    uint32 cur_time = rtc::Time();
49    uint32 delay = cur_time - last_send;
50    uint32 size = rate * delay / 1000;
51    size = std::min<uint32>(size, 4096);
52    size = std::max<uint32>(size, sizeof(uint32));
53
54    count += size;
55    memcpy(dummy, &cur_time, sizeof(cur_time));
56    socket->Send(dummy, size, options);
57
58    last_send = cur_time;
59    thread->PostDelayed(NextDelay(), this, 1);
60  }
61
62  Thread* thread;
63  scoped_ptr<AsyncUDPSocket> socket;
64  rtc::PacketOptions options;
65  bool done;
66  uint32 rate;  // bytes per second
67  uint32 count;
68  uint32 last_send;
69  char dummy[4096];
70};
71
72struct Receiver : public MessageHandler, public sigslot::has_slots<> {
73  Receiver(Thread* th, AsyncSocket* s, uint32 bw)
74      : thread(th), socket(new AsyncUDPSocket(s)), bandwidth(bw), done(false),
75        count(0), sec_count(0), sum(0), sum_sq(0), samples(0) {
76    socket->SignalReadPacket.connect(this, &Receiver::OnReadPacket);
77    thread->PostDelayed(1000, this, 1);
78  }
79
80  ~Receiver() {
81    thread->Clear(this);
82  }
83
84  void OnReadPacket(AsyncPacketSocket* s, const char* data, size_t size,
85                    const SocketAddress& remote_addr,
86                    const PacketTime& packet_time) {
87    ASSERT_EQ(socket.get(), s);
88    ASSERT_GE(size, 4U);
89
90    count += size;
91    sec_count += size;
92
93    uint32 send_time = *reinterpret_cast<const uint32*>(data);
94    uint32 recv_time = rtc::Time();
95    uint32 delay = recv_time - send_time;
96    sum += delay;
97    sum_sq += delay * delay;
98    samples += 1;
99  }
100
101  void OnMessage(Message* pmsg) {
102    ASSERT_EQ(1u, pmsg->message_id);
103
104    if (done)
105      return;
106
107    // It is always possible for us to receive more than expected because
108    // packets can be further delayed in delivery.
109    if (bandwidth > 0)
110      ASSERT_TRUE(sec_count <= 5 * bandwidth / 4);
111    sec_count = 0;
112    thread->PostDelayed(1000, this, 1);
113  }
114
115  Thread* thread;
116  scoped_ptr<AsyncUDPSocket> socket;
117  uint32 bandwidth;
118  bool done;
119  size_t count;
120  size_t sec_count;
121  double sum;
122  double sum_sq;
123  uint32 samples;
124};
125
126class VirtualSocketServerTest : public testing::Test {
127 public:
128  VirtualSocketServerTest() : ss_(new VirtualSocketServer(NULL)),
129                              kIPv4AnyAddress(IPAddress(INADDR_ANY), 0),
130                              kIPv6AnyAddress(IPAddress(in6addr_any), 0) {
131  }
132
133  void CheckAddressIncrementalization(const SocketAddress& post,
134                                      const SocketAddress& pre) {
135    EXPECT_EQ(post.port(), pre.port() + 1);
136    IPAddress post_ip = post.ipaddr();
137    IPAddress pre_ip = pre.ipaddr();
138    EXPECT_EQ(pre_ip.family(), post_ip.family());
139    if (post_ip.family() == AF_INET) {
140      in_addr pre_ipv4 = pre_ip.ipv4_address();
141      in_addr post_ipv4 = post_ip.ipv4_address();
142      int difference = ntohl(post_ipv4.s_addr) - ntohl(pre_ipv4.s_addr);
143      EXPECT_EQ(1, difference);
144    } else if (post_ip.family() == AF_INET6) {
145      in6_addr post_ip6 = post_ip.ipv6_address();
146      in6_addr pre_ip6 = pre_ip.ipv6_address();
147      uint32* post_as_ints = reinterpret_cast<uint32*>(&post_ip6.s6_addr);
148      uint32* pre_as_ints = reinterpret_cast<uint32*>(&pre_ip6.s6_addr);
149      EXPECT_EQ(post_as_ints[3], pre_as_ints[3] + 1);
150    }
151  }
152
153  void BasicTest(const SocketAddress& initial_addr) {
154    AsyncSocket* socket = ss_->CreateAsyncSocket(initial_addr.family(),
155                                                 SOCK_DGRAM);
156    socket->Bind(initial_addr);
157    SocketAddress server_addr = socket->GetLocalAddress();
158    // Make sure VSS didn't switch families on us.
159    EXPECT_EQ(server_addr.family(), initial_addr.family());
160
161    TestClient* client1 = new TestClient(new AsyncUDPSocket(socket));
162    AsyncSocket* socket2 =
163        ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
164    TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2));
165
166    SocketAddress client2_addr;
167    EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr));
168    EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr));
169
170    SocketAddress client1_addr;
171    EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr));
172    EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr));
173    EXPECT_EQ(client1_addr, server_addr);
174
175    SocketAddress empty = EmptySocketAddressWithFamily(initial_addr.family());
176    for (int i = 0; i < 10; i++) {
177      client2 = new TestClient(AsyncUDPSocket::Create(ss_, empty));
178
179      SocketAddress next_client2_addr;
180      EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr));
181      EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &next_client2_addr));
182      CheckAddressIncrementalization(next_client2_addr, client2_addr);
183      // EXPECT_EQ(next_client2_addr.port(), client2_addr.port() + 1);
184
185      SocketAddress server_addr2;
186      EXPECT_EQ(6, client1->SendTo("bizbaz", 6, next_client2_addr));
187      EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &server_addr2));
188      EXPECT_EQ(server_addr2, server_addr);
189
190      client2_addr = next_client2_addr;
191    }
192  }
193
194  // initial_addr should be made from either INADDR_ANY or in6addr_any.
195  void ConnectTest(const SocketAddress& initial_addr) {
196    testing::StreamSink sink;
197    SocketAddress accept_addr;
198    const SocketAddress kEmptyAddr =
199        EmptySocketAddressWithFamily(initial_addr.family());
200
201    // Create client
202    AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(),
203                                                 SOCK_STREAM);
204    sink.Monitor(client);
205    EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
206    EXPECT_TRUE(client->GetLocalAddress().IsNil());
207
208    // Create server
209    AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(),
210                                                 SOCK_STREAM);
211    sink.Monitor(server);
212    EXPECT_NE(0, server->Listen(5));  // Bind required
213    EXPECT_EQ(0, server->Bind(initial_addr));
214    EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
215    EXPECT_EQ(0, server->Listen(5));
216    EXPECT_EQ(server->GetState(), AsyncSocket::CS_CONNECTING);
217
218    // No pending server connections
219    EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
220    EXPECT_TRUE(NULL == server->Accept(&accept_addr));
221    EXPECT_EQ(AF_UNSPEC, accept_addr.family());
222
223    // Attempt connect to listening socket
224    EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
225    EXPECT_NE(client->GetLocalAddress(), kEmptyAddr);  // Implicit Bind
226    EXPECT_NE(AF_UNSPEC, client->GetLocalAddress().family());  // Implicit Bind
227    EXPECT_NE(client->GetLocalAddress(), server->GetLocalAddress());
228
229    // Client is connecting
230    EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
231    EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
232    EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
233
234    ss_->ProcessMessagesUntilIdle();
235
236    // Client still connecting
237    EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
238    EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
239    EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
240
241    // Server has pending connection
242    EXPECT_TRUE(sink.Check(server, testing::SSE_READ));
243    Socket* accepted = server->Accept(&accept_addr);
244    EXPECT_TRUE(NULL != accepted);
245    EXPECT_NE(accept_addr, kEmptyAddr);
246    EXPECT_EQ(accepted->GetRemoteAddress(), accept_addr);
247
248    EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED);
249    EXPECT_EQ(accepted->GetLocalAddress(), server->GetLocalAddress());
250    EXPECT_EQ(accepted->GetRemoteAddress(), client->GetLocalAddress());
251
252    ss_->ProcessMessagesUntilIdle();
253
254    // Client has connected
255    EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTED);
256    EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN));
257    EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
258    EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
259    EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress());
260  }
261
262  void ConnectToNonListenerTest(const SocketAddress& initial_addr) {
263    testing::StreamSink sink;
264    SocketAddress accept_addr;
265    const SocketAddress nil_addr;
266    const SocketAddress empty_addr =
267        EmptySocketAddressWithFamily(initial_addr.family());
268
269    // Create client
270    AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(),
271                                                 SOCK_STREAM);
272    sink.Monitor(client);
273
274    // Create server
275    AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(),
276                                                 SOCK_STREAM);
277    sink.Monitor(server);
278    EXPECT_EQ(0, server->Bind(initial_addr));
279    EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
280    // Attempt connect to non-listening socket
281    EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
282
283    ss_->ProcessMessagesUntilIdle();
284
285    // No pending server connections
286    EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
287    EXPECT_TRUE(NULL == server->Accept(&accept_addr));
288    EXPECT_EQ(accept_addr, nil_addr);
289
290    // Connection failed
291    EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
292    EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
293    EXPECT_TRUE(sink.Check(client, testing::SSE_ERROR));
294    EXPECT_EQ(client->GetRemoteAddress(), nil_addr);
295  }
296
297  void CloseDuringConnectTest(const SocketAddress& initial_addr) {
298    testing::StreamSink sink;
299    SocketAddress accept_addr;
300    const SocketAddress empty_addr =
301        EmptySocketAddressWithFamily(initial_addr.family());
302
303    // Create client and server
304    scoped_ptr<AsyncSocket> client(ss_->CreateAsyncSocket(initial_addr.family(),
305                                                          SOCK_STREAM));
306    sink.Monitor(client.get());
307    scoped_ptr<AsyncSocket> server(ss_->CreateAsyncSocket(initial_addr.family(),
308                                                          SOCK_STREAM));
309    sink.Monitor(server.get());
310
311    // Initiate connect
312    EXPECT_EQ(0, server->Bind(initial_addr));
313    EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
314
315    EXPECT_EQ(0, server->Listen(5));
316    EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
317
318    // Server close before socket enters accept queue
319    EXPECT_FALSE(sink.Check(server.get(), testing::SSE_READ));
320    server->Close();
321
322    ss_->ProcessMessagesUntilIdle();
323
324    // Result: connection failed
325    EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
326    EXPECT_TRUE(sink.Check(client.get(), testing::SSE_ERROR));
327
328    server.reset(ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM));
329    sink.Monitor(server.get());
330
331    // Initiate connect
332    EXPECT_EQ(0, server->Bind(initial_addr));
333    EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
334
335    EXPECT_EQ(0, server->Listen(5));
336    EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
337
338    ss_->ProcessMessagesUntilIdle();
339
340    // Server close while socket is in accept queue
341    EXPECT_TRUE(sink.Check(server.get(), testing::SSE_READ));
342    server->Close();
343
344    ss_->ProcessMessagesUntilIdle();
345
346    // Result: connection failed
347    EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
348    EXPECT_TRUE(sink.Check(client.get(), testing::SSE_ERROR));
349
350    // New server
351    server.reset(ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM));
352    sink.Monitor(server.get());
353
354    // Initiate connect
355    EXPECT_EQ(0, server->Bind(initial_addr));
356    EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
357
358    EXPECT_EQ(0, server->Listen(5));
359    EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
360
361    ss_->ProcessMessagesUntilIdle();
362
363    // Server accepts connection
364    EXPECT_TRUE(sink.Check(server.get(), testing::SSE_READ));
365    scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr));
366    ASSERT_TRUE(NULL != accepted.get());
367    sink.Monitor(accepted.get());
368
369    // Client closes before connection complets
370    EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED);
371
372    // Connected message has not been processed yet.
373    EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
374    client->Close();
375
376    ss_->ProcessMessagesUntilIdle();
377
378    // Result: accepted socket closes
379    EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CLOSED);
380    EXPECT_TRUE(sink.Check(accepted.get(), testing::SSE_CLOSE));
381    EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE));
382  }
383
384  void CloseTest(const SocketAddress& initial_addr) {
385    testing::StreamSink sink;
386    const SocketAddress kEmptyAddr;
387
388    // Create clients
389    AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
390    sink.Monitor(a);
391    a->Bind(initial_addr);
392    EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family());
393
394
395    scoped_ptr<AsyncSocket> b(ss_->CreateAsyncSocket(initial_addr.family(),
396                                                     SOCK_STREAM));
397    sink.Monitor(b.get());
398    b->Bind(initial_addr);
399    EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family());
400
401    EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
402    EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
403
404    ss_->ProcessMessagesUntilIdle();
405
406    EXPECT_TRUE(sink.Check(a, testing::SSE_OPEN));
407    EXPECT_EQ(a->GetState(), AsyncSocket::CS_CONNECTED);
408    EXPECT_EQ(a->GetRemoteAddress(), b->GetLocalAddress());
409
410    EXPECT_TRUE(sink.Check(b.get(), testing::SSE_OPEN));
411    EXPECT_EQ(b->GetState(), AsyncSocket::CS_CONNECTED);
412    EXPECT_EQ(b->GetRemoteAddress(), a->GetLocalAddress());
413
414    EXPECT_EQ(1, a->Send("a", 1));
415    b->Close();
416    EXPECT_EQ(1, a->Send("b", 1));
417
418    ss_->ProcessMessagesUntilIdle();
419
420    char buffer[10];
421    EXPECT_FALSE(sink.Check(b.get(), testing::SSE_READ));
422    EXPECT_EQ(-1, b->Recv(buffer, 10));
423
424    EXPECT_TRUE(sink.Check(a, testing::SSE_CLOSE));
425    EXPECT_EQ(a->GetState(), AsyncSocket::CS_CLOSED);
426    EXPECT_EQ(a->GetRemoteAddress(), kEmptyAddr);
427
428    // No signal for Closer
429    EXPECT_FALSE(sink.Check(b.get(), testing::SSE_CLOSE));
430    EXPECT_EQ(b->GetState(), AsyncSocket::CS_CLOSED);
431    EXPECT_EQ(b->GetRemoteAddress(), kEmptyAddr);
432  }
433
434  void TcpSendTest(const SocketAddress& initial_addr) {
435    testing::StreamSink sink;
436    const SocketAddress kEmptyAddr;
437
438    // Connect two sockets
439    AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
440    sink.Monitor(a);
441    a->Bind(initial_addr);
442    EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family());
443
444    AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
445    sink.Monitor(b);
446    b->Bind(initial_addr);
447    EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family());
448
449    EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
450    EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
451
452    ss_->ProcessMessagesUntilIdle();
453
454    const size_t kBufferSize = 2000;
455    ss_->set_send_buffer_capacity(kBufferSize);
456    ss_->set_recv_buffer_capacity(kBufferSize);
457
458    const size_t kDataSize = 5000;
459    char send_buffer[kDataSize], recv_buffer[kDataSize];
460    for (size_t i = 0; i < kDataSize; ++i)
461      send_buffer[i] = static_cast<char>(i % 256);
462    memset(recv_buffer, 0, sizeof(recv_buffer));
463    size_t send_pos = 0, recv_pos = 0;
464
465    // Can't send more than send buffer in one write
466    int result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
467    EXPECT_EQ(static_cast<int>(kBufferSize), result);
468    send_pos += result;
469
470    ss_->ProcessMessagesUntilIdle();
471    EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE));
472    EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
473
474    // Receive buffer is already filled, fill send buffer again
475    result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
476    EXPECT_EQ(static_cast<int>(kBufferSize), result);
477    send_pos += result;
478
479    ss_->ProcessMessagesUntilIdle();
480    EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE));
481    EXPECT_FALSE(sink.Check(b, testing::SSE_READ));
482
483    // No more room in send or receive buffer
484    result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
485    EXPECT_EQ(-1, result);
486    EXPECT_TRUE(a->IsBlocking());
487
488    // Read a subset of the data
489    result = b->Recv(recv_buffer + recv_pos, 500);
490    EXPECT_EQ(500, result);
491    recv_pos += result;
492
493    ss_->ProcessMessagesUntilIdle();
494    EXPECT_TRUE(sink.Check(a, testing::SSE_WRITE));
495    EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
496
497    // Room for more on the sending side
498    result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
499    EXPECT_EQ(500, result);
500    send_pos += result;
501
502    // Empty the recv buffer
503    while (true) {
504      result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
505      if (result < 0) {
506        EXPECT_EQ(-1, result);
507        EXPECT_TRUE(b->IsBlocking());
508        break;
509      }
510      recv_pos += result;
511    }
512
513    ss_->ProcessMessagesUntilIdle();
514    EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
515
516    // Continue to empty the recv buffer
517    while (true) {
518      result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
519      if (result < 0) {
520        EXPECT_EQ(-1, result);
521        EXPECT_TRUE(b->IsBlocking());
522        break;
523      }
524      recv_pos += result;
525    }
526
527    // Send last of the data
528    result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
529    EXPECT_EQ(500, result);
530    send_pos += result;
531
532    ss_->ProcessMessagesUntilIdle();
533    EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
534
535    // Receive the last of the data
536    while (true) {
537      result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
538      if (result < 0) {
539        EXPECT_EQ(-1, result);
540        EXPECT_TRUE(b->IsBlocking());
541        break;
542      }
543      recv_pos += result;
544    }
545
546    ss_->ProcessMessagesUntilIdle();
547    EXPECT_FALSE(sink.Check(b, testing::SSE_READ));
548
549    // The received data matches the sent data
550    EXPECT_EQ(kDataSize, send_pos);
551    EXPECT_EQ(kDataSize, recv_pos);
552    EXPECT_EQ(0, memcmp(recv_buffer, send_buffer, kDataSize));
553  }
554
555  void TcpSendsPacketsInOrderTest(const SocketAddress& initial_addr) {
556    const SocketAddress kEmptyAddr;
557
558    // Connect two sockets
559    AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(),
560                                            SOCK_STREAM);
561    AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(),
562                                            SOCK_STREAM);
563    a->Bind(initial_addr);
564    EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family());
565
566    b->Bind(initial_addr);
567    EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family());
568
569    EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
570    EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
571    ss_->ProcessMessagesUntilIdle();
572
573    // First, deliver all packets in 0 ms.
574    char buffer[2] = { 0, 0 };
575    const char cNumPackets = 10;
576    for (char i = 0; i < cNumPackets; ++i) {
577      buffer[0] = '0' + i;
578      EXPECT_EQ(1, a->Send(buffer, 1));
579    }
580
581    ss_->ProcessMessagesUntilIdle();
582
583    for (char i = 0; i < cNumPackets; ++i) {
584      EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer)));
585      EXPECT_EQ(static_cast<char>('0' + i), buffer[0]);
586    }
587
588    // Next, deliver packets at random intervals
589    const uint32 mean = 50;
590    const uint32 stddev = 50;
591
592    ss_->set_delay_mean(mean);
593    ss_->set_delay_stddev(stddev);
594    ss_->UpdateDelayDistribution();
595
596    for (char i = 0; i < cNumPackets; ++i) {
597      buffer[0] = 'A' + i;
598      EXPECT_EQ(1, a->Send(buffer, 1));
599    }
600
601    ss_->ProcessMessagesUntilIdle();
602
603    for (char i = 0; i < cNumPackets; ++i) {
604      EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer)));
605      EXPECT_EQ(static_cast<char>('A' + i), buffer[0]);
606    }
607  }
608
609  void BandwidthTest(const SocketAddress& initial_addr) {
610    AsyncSocket* send_socket =
611        ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
612    AsyncSocket* recv_socket =
613        ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
614    ASSERT_EQ(0, send_socket->Bind(initial_addr));
615    ASSERT_EQ(0, recv_socket->Bind(initial_addr));
616    EXPECT_EQ(send_socket->GetLocalAddress().family(), initial_addr.family());
617    EXPECT_EQ(recv_socket->GetLocalAddress().family(), initial_addr.family());
618    ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress()));
619
620    uint32 bandwidth = 64 * 1024;
621    ss_->set_bandwidth(bandwidth);
622
623    Thread* pthMain = Thread::Current();
624    Sender sender(pthMain, send_socket, 80 * 1024);
625    Receiver receiver(pthMain, recv_socket, bandwidth);
626
627    pthMain->ProcessMessages(5000);
628    sender.done = true;
629    pthMain->ProcessMessages(5000);
630
631    ASSERT_TRUE(receiver.count >= 5 * 3 * bandwidth / 4);
632    ASSERT_TRUE(receiver.count <= 6 * bandwidth);  // queue could drain for 1s
633
634    ss_->set_bandwidth(0);
635  }
636
637  void DelayTest(const SocketAddress& initial_addr) {
638    time_t seed = ::time(NULL);
639    LOG(LS_VERBOSE) << "seed = " << seed;
640    srand(static_cast<unsigned int>(seed));
641
642    const uint32 mean = 2000;
643    const uint32 stddev = 500;
644
645    ss_->set_delay_mean(mean);
646    ss_->set_delay_stddev(stddev);
647    ss_->UpdateDelayDistribution();
648
649    AsyncSocket* send_socket =
650        ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
651    AsyncSocket* recv_socket =
652        ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
653    ASSERT_EQ(0, send_socket->Bind(initial_addr));
654    ASSERT_EQ(0, recv_socket->Bind(initial_addr));
655    EXPECT_EQ(send_socket->GetLocalAddress().family(), initial_addr.family());
656    EXPECT_EQ(recv_socket->GetLocalAddress().family(), initial_addr.family());
657    ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress()));
658
659    Thread* pthMain = Thread::Current();
660    // Avg packet size is 2K, so at 200KB/s for 10s, we should see about
661    // 1000 packets, which is necessary to get a good distribution.
662    Sender sender(pthMain, send_socket, 100 * 2 * 1024);
663    Receiver receiver(pthMain, recv_socket, 0);
664
665    pthMain->ProcessMessages(10000);
666    sender.done = receiver.done = true;
667    ss_->ProcessMessagesUntilIdle();
668
669    const double sample_mean = receiver.sum / receiver.samples;
670    double num =
671        receiver.samples * receiver.sum_sq - receiver.sum * receiver.sum;
672    double den = receiver.samples * (receiver.samples - 1);
673    const double sample_stddev = sqrt(num / den);
674    LOG(LS_VERBOSE) << "mean=" << sample_mean << " stddev=" << sample_stddev;
675
676    EXPECT_LE(500u, receiver.samples);
677    // We initially used a 0.1 fudge factor, but on the build machine, we
678    // have seen the value differ by as much as 0.13.
679    EXPECT_NEAR(mean, sample_mean, 0.15 * mean);
680    EXPECT_NEAR(stddev, sample_stddev, 0.15 * stddev);
681
682    ss_->set_delay_mean(0);
683    ss_->set_delay_stddev(0);
684    ss_->UpdateDelayDistribution();
685  }
686
687  // Test cross-family communication between a client bound to client_addr and a
688  // server bound to server_addr. shouldSucceed indicates if communication is
689  // expected to work or not.
690  void CrossFamilyConnectionTest(const SocketAddress& client_addr,
691                                 const SocketAddress& server_addr,
692                                 bool shouldSucceed) {
693    testing::StreamSink sink;
694    SocketAddress accept_address;
695    const SocketAddress kEmptyAddr;
696
697    // Client gets a IPv4 address
698    AsyncSocket* client = ss_->CreateAsyncSocket(client_addr.family(),
699                                                 SOCK_STREAM);
700    sink.Monitor(client);
701    EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
702    EXPECT_EQ(client->GetLocalAddress(), kEmptyAddr);
703    client->Bind(client_addr);
704
705    // Server gets a non-mapped non-any IPv6 address.
706    // IPv4 sockets should not be able to connect to this.
707    AsyncSocket* server = ss_->CreateAsyncSocket(server_addr.family(),
708                                                 SOCK_STREAM);
709    sink.Monitor(server);
710    server->Bind(server_addr);
711    server->Listen(5);
712
713    if (shouldSucceed) {
714      EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
715      ss_->ProcessMessagesUntilIdle();
716      EXPECT_TRUE(sink.Check(server, testing::SSE_READ));
717      Socket* accepted = server->Accept(&accept_address);
718      EXPECT_TRUE(NULL != accepted);
719      EXPECT_NE(kEmptyAddr, accept_address);
720      ss_->ProcessMessagesUntilIdle();
721      EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN));
722      EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
723    } else {
724      // Check that the connection failed.
725      EXPECT_EQ(-1, client->Connect(server->GetLocalAddress()));
726      ss_->ProcessMessagesUntilIdle();
727
728      EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
729      EXPECT_TRUE(NULL == server->Accept(&accept_address));
730      EXPECT_EQ(accept_address, kEmptyAddr);
731      EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
732      EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
733      EXPECT_EQ(client->GetRemoteAddress(), kEmptyAddr);
734    }
735  }
736
737  // Test cross-family datagram sending between a client bound to client_addr
738  // and a server bound to server_addr. shouldSucceed indicates if sending is
739  // expected to succed or not.
740  void CrossFamilyDatagramTest(const SocketAddress& client_addr,
741                               const SocketAddress& server_addr,
742                               bool shouldSucceed) {
743    AsyncSocket* socket = ss_->CreateAsyncSocket(SOCK_DGRAM);
744    socket->Bind(server_addr);
745    SocketAddress bound_server_addr = socket->GetLocalAddress();
746    TestClient* client1 = new TestClient(new AsyncUDPSocket(socket));
747
748    AsyncSocket* socket2 = ss_->CreateAsyncSocket(SOCK_DGRAM);
749    socket2->Bind(client_addr);
750    TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2));
751    SocketAddress client2_addr;
752
753    if (shouldSucceed) {
754      EXPECT_EQ(3, client2->SendTo("foo", 3, bound_server_addr));
755      EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr));
756      SocketAddress client1_addr;
757      EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr));
758      EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr));
759      EXPECT_EQ(client1_addr, bound_server_addr);
760    } else {
761      EXPECT_EQ(-1, client2->SendTo("foo", 3, bound_server_addr));
762      EXPECT_FALSE(client1->CheckNextPacket("foo", 3, 0));
763    }
764  }
765
766 protected:
767  virtual void SetUp() {
768    Thread::Current()->set_socketserver(ss_);
769  }
770  virtual void TearDown() {
771    Thread::Current()->set_socketserver(NULL);
772  }
773
774  VirtualSocketServer* ss_;
775  const SocketAddress kIPv4AnyAddress;
776  const SocketAddress kIPv6AnyAddress;
777};
778
779TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(basic_v4)) {
780  SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 5000);
781  BasicTest(ipv4_test_addr);
782}
783
784TEST_F(VirtualSocketServerTest,DISABLED_ON_MAC( basic_v6)) {
785  SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 5000);
786  BasicTest(ipv6_test_addr);
787}
788
789TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(connect_v4)) {
790  ConnectTest(kIPv4AnyAddress);
791}
792
793TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(connect_v6)) {
794  ConnectTest(kIPv6AnyAddress);
795}
796
797TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(connect_to_non_listener_v4)) {
798  ConnectToNonListenerTest(kIPv4AnyAddress);
799}
800
801TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(connect_to_non_listener_v6)) {
802  ConnectToNonListenerTest(kIPv6AnyAddress);
803}
804
805TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(close_during_connect_v4)) {
806  CloseDuringConnectTest(kIPv4AnyAddress);
807}
808
809TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(close_during_connect_v6)) {
810  CloseDuringConnectTest(kIPv6AnyAddress);
811}
812
813TEST_F(VirtualSocketServerTest, close_v4) {
814  CloseTest(kIPv4AnyAddress);
815}
816
817TEST_F(VirtualSocketServerTest, close_v6) {
818  CloseTest(kIPv6AnyAddress);
819}
820
821TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(tcp_send_v4)) {
822  TcpSendTest(kIPv4AnyAddress);
823}
824
825TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(tcp_send_v6)) {
826  TcpSendTest(kIPv6AnyAddress);
827}
828
829TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v4) {
830  TcpSendsPacketsInOrderTest(kIPv4AnyAddress);
831}
832
833TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v6) {
834  TcpSendsPacketsInOrderTest(kIPv6AnyAddress);
835}
836
837TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(bandwidth_v4)) {
838  SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 1000);
839  BandwidthTest(ipv4_test_addr);
840}
841
842TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(bandwidth_v6)) {
843  SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 1000);
844  BandwidthTest(ipv6_test_addr);
845}
846
847TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(delay_v4)) {
848  SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 1000);
849  DelayTest(ipv4_test_addr);
850}
851
852// See: https://code.google.com/p/webrtc/issues/detail?id=2409
853TEST_F(VirtualSocketServerTest, DISABLED_delay_v6) {
854  SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 1000);
855  DelayTest(ipv6_test_addr);
856}
857
858// Works, receiving socket sees 127.0.0.2.
859TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CanConnectFromMappedIPv6ToIPv4Any)) {
860  CrossFamilyConnectionTest(SocketAddress("::ffff:127.0.0.2", 0),
861                            SocketAddress("0.0.0.0", 5000),
862                            true);
863}
864
865// Fails.
866TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CantConnectFromUnMappedIPv6ToIPv4Any)) {
867  CrossFamilyConnectionTest(SocketAddress("::2", 0),
868                            SocketAddress("0.0.0.0", 5000),
869                            false);
870}
871
872// Fails.
873TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CantConnectFromUnMappedIPv6ToMappedIPv6)) {
874  CrossFamilyConnectionTest(SocketAddress("::2", 0),
875                            SocketAddress("::ffff:127.0.0.1", 5000),
876                            false);
877}
878
879// Works. receiving socket sees ::ffff:127.0.0.2.
880TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CanConnectFromIPv4ToIPv6Any)) {
881  CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0),
882                            SocketAddress("::", 5000),
883                            true);
884}
885
886// Fails.
887TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CantConnectFromIPv4ToUnMappedIPv6)) {
888  CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0),
889                            SocketAddress("::1", 5000),
890                            false);
891}
892
893// Works. Receiving socket sees ::ffff:127.0.0.1.
894TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CanConnectFromIPv4ToMappedIPv6)) {
895  CrossFamilyConnectionTest(SocketAddress("127.0.0.1", 0),
896                            SocketAddress("::ffff:127.0.0.2", 5000),
897                            true);
898}
899
900// Works, receiving socket sees a result from GetNextIP.
901TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CanConnectFromUnboundIPv6ToIPv4Any)) {
902  CrossFamilyConnectionTest(SocketAddress("::", 0),
903                            SocketAddress("0.0.0.0", 5000),
904                            true);
905}
906
907// Works, receiving socket sees whatever GetNextIP gave the client.
908TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CanConnectFromUnboundIPv4ToIPv6Any)) {
909  CrossFamilyConnectionTest(SocketAddress("0.0.0.0", 0),
910                            SocketAddress("::", 5000),
911                            true);
912}
913
914TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CanSendDatagramFromUnboundIPv4ToIPv6Any)) {
915  CrossFamilyDatagramTest(SocketAddress("0.0.0.0", 0),
916                          SocketAddress("::", 5000),
917                          true);
918}
919
920TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CanSendDatagramFromMappedIPv6ToIPv4Any)) {
921  CrossFamilyDatagramTest(SocketAddress("::ffff:127.0.0.1", 0),
922                          SocketAddress("0.0.0.0", 5000),
923                          true);
924}
925
926TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CantSendDatagramFromUnMappedIPv6ToIPv4Any)) {
927  CrossFamilyDatagramTest(SocketAddress("::2", 0),
928                          SocketAddress("0.0.0.0", 5000),
929                          false);
930}
931
932TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CantSendDatagramFromUnMappedIPv6ToMappedIPv6)) {
933  CrossFamilyDatagramTest(SocketAddress("::2", 0),
934                          SocketAddress("::ffff:127.0.0.1", 5000),
935                          false);
936}
937
938TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CanSendDatagramFromIPv4ToIPv6Any)) {
939  CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0),
940                          SocketAddress("::", 5000),
941                          true);
942}
943
944TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CantSendDatagramFromIPv4ToUnMappedIPv6)) {
945  CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0),
946                          SocketAddress("::1", 5000),
947                          false);
948}
949
950TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CanSendDatagramFromIPv4ToMappedIPv6)) {
951  CrossFamilyDatagramTest(SocketAddress("127.0.0.1", 0),
952                          SocketAddress("::ffff:127.0.0.2", 5000),
953                          true);
954}
955
956TEST_F(VirtualSocketServerTest, DISABLED_ON_MAC(CanSendDatagramFromUnboundIPv6ToIPv4Any)) {
957  CrossFamilyDatagramTest(SocketAddress("::", 0),
958                          SocketAddress("0.0.0.0", 5000),
959                          true);
960}
961
962TEST_F(VirtualSocketServerTest, CreatesStandardDistribution) {
963  const uint32 kTestMean[] = { 10, 100, 333, 1000 };
964  const double kTestDev[] = { 0.25, 0.1, 0.01 };
965  // TODO: The current code only works for 1000 data points or more.
966  const uint32 kTestSamples[] = { /*10, 100,*/ 1000 };
967  for (size_t midx = 0; midx < ARRAY_SIZE(kTestMean); ++midx) {
968    for (size_t didx = 0; didx < ARRAY_SIZE(kTestDev); ++didx) {
969      for (size_t sidx = 0; sidx < ARRAY_SIZE(kTestSamples); ++sidx) {
970        ASSERT_LT(0u, kTestSamples[sidx]);
971        const uint32 kStdDev =
972            static_cast<uint32>(kTestDev[didx] * kTestMean[midx]);
973        VirtualSocketServer::Function* f =
974            VirtualSocketServer::CreateDistribution(kTestMean[midx],
975                                                    kStdDev,
976                                                    kTestSamples[sidx]);
977        ASSERT_TRUE(NULL != f);
978        ASSERT_EQ(kTestSamples[sidx], f->size());
979        double sum = 0;
980        for (uint32 i = 0; i < f->size(); ++i) {
981          sum += (*f)[i].second;
982        }
983        const double mean = sum / f->size();
984        double sum_sq_dev = 0;
985        for (uint32 i = 0; i < f->size(); ++i) {
986          double dev = (*f)[i].second - mean;
987          sum_sq_dev += dev * dev;
988        }
989        const double stddev = sqrt(sum_sq_dev / f->size());
990        EXPECT_NEAR(kTestMean[midx], mean, 0.1 * kTestMean[midx])
991          << "M=" << kTestMean[midx]
992          << " SD=" << kStdDev
993          << " N=" << kTestSamples[sidx];
994        EXPECT_NEAR(kStdDev, stddev, 0.1 * kStdDev)
995          << "M=" << kTestMean[midx]
996          << " SD=" << kStdDev
997          << " N=" << kTestSamples[sidx];
998        delete f;
999      }
1000    }
1001  }
1002}
1003