1/*
2 * libjingle
3 * Copyright 2007, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 *  1. Redistributions of source code must retain the above copyright notice,
9 *     this list of conditions and the following disclaimer.
10 *  2. Redistributions in binary form must reproduce the above copyright notice,
11 *     this list of conditions and the following disclaimer in the documentation
12 *     and/or other materials provided with the distribution.
13 *  3. The name of the author may not be used to endorse or promote products
14 *     derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#include "talk/base/socket_unittest.h"
29
30#include "talk/base/asyncudpsocket.h"
31#include "talk/base/gunit.h"
32#include "talk/base/nethelpers.h"
33#include "talk/base/socketserver.h"
34#include "talk/base/testclient.h"
35#include "talk/base/testutils.h"
36#include "talk/base/thread.h"
37
38namespace talk_base {
39
40#define MAYBE_SKIP_IPV6                             \
41  if (!HasIPv6Enabled()) {                          \
42    LOG(LS_INFO) << "No IPv6... skipping";          \
43    return;                                         \
44  }
45
46
47void SocketTest::TestConnectIPv4() {
48  ConnectInternal(kIPv4Loopback);
49}
50
51void SocketTest::TestConnectIPv6() {
52  MAYBE_SKIP_IPV6;
53  ConnectInternal(kIPv6Loopback);
54}
55
56void SocketTest::TestConnectWithDnsLookupIPv4() {
57  ConnectWithDnsLookupInternal(kIPv4Loopback, "localhost");
58}
59
60void SocketTest::TestConnectWithDnsLookupIPv6() {
61  // TODO: Enable this when DNS resolution supports IPv6.
62  LOG(LS_INFO) << "Skipping IPv6 DNS test";
63  // ConnectWithDnsLookupInternal(kIPv6Loopback, "localhost6");
64}
65
66void SocketTest::TestConnectFailIPv4() {
67  ConnectFailInternal(kIPv4Loopback);
68}
69
70void SocketTest::TestConnectFailIPv6() {
71  MAYBE_SKIP_IPV6;
72  ConnectFailInternal(kIPv6Loopback);
73}
74
75void SocketTest::TestConnectWithDnsLookupFailIPv4() {
76  ConnectWithDnsLookupFailInternal(kIPv4Loopback);
77}
78
79void SocketTest::TestConnectWithDnsLookupFailIPv6() {
80  MAYBE_SKIP_IPV6;
81  ConnectWithDnsLookupFailInternal(kIPv6Loopback);
82}
83
84void SocketTest::TestConnectWithClosedSocketIPv4() {
85  ConnectWithClosedSocketInternal(kIPv4Loopback);
86}
87
88void SocketTest::TestConnectWithClosedSocketIPv6() {
89  MAYBE_SKIP_IPV6;
90  ConnectWithClosedSocketInternal(kIPv6Loopback);
91}
92
93void SocketTest::TestConnectWhileNotClosedIPv4() {
94  ConnectWhileNotClosedInternal(kIPv4Loopback);
95}
96
97void SocketTest::TestConnectWhileNotClosedIPv6() {
98  MAYBE_SKIP_IPV6;
99  ConnectWhileNotClosedInternal(kIPv6Loopback);
100}
101
102void SocketTest::TestServerCloseDuringConnectIPv4() {
103  ServerCloseDuringConnectInternal(kIPv4Loopback);
104}
105
106void SocketTest::TestServerCloseDuringConnectIPv6() {
107  MAYBE_SKIP_IPV6;
108  ServerCloseDuringConnectInternal(kIPv6Loopback);
109}
110
111void SocketTest::TestClientCloseDuringConnectIPv4() {
112  ClientCloseDuringConnectInternal(kIPv4Loopback);
113}
114
115void SocketTest::TestClientCloseDuringConnectIPv6() {
116  MAYBE_SKIP_IPV6;
117  ClientCloseDuringConnectInternal(kIPv6Loopback);
118}
119
120void SocketTest::TestServerCloseIPv4() {
121  ServerCloseInternal(kIPv4Loopback);
122}
123
124void SocketTest::TestServerCloseIPv6() {
125  MAYBE_SKIP_IPV6;
126  ServerCloseInternal(kIPv6Loopback);
127}
128
129void SocketTest::TestCloseInClosedCallbackIPv4() {
130  CloseInClosedCallbackInternal(kIPv4Loopback);
131}
132
133void SocketTest::TestCloseInClosedCallbackIPv6() {
134  MAYBE_SKIP_IPV6;
135  CloseInClosedCallbackInternal(kIPv6Loopback);
136}
137
138void SocketTest::TestSocketServerWaitIPv4() {
139  SocketServerWaitInternal(kIPv4Loopback);
140}
141
142void SocketTest::TestSocketServerWaitIPv6() {
143  MAYBE_SKIP_IPV6;
144  SocketServerWaitInternal(kIPv6Loopback);
145}
146
147void SocketTest::TestTcpIPv4() {
148  TcpInternal(kIPv4Loopback);
149}
150
151void SocketTest::TestTcpIPv6() {
152  MAYBE_SKIP_IPV6;
153  TcpInternal(kIPv6Loopback);
154}
155
156void SocketTest::TestSingleFlowControlCallbackIPv4() {
157  SingleFlowControlCallbackInternal(kIPv4Loopback);
158}
159
160void SocketTest::TestSingleFlowControlCallbackIPv6() {
161  MAYBE_SKIP_IPV6;
162  SingleFlowControlCallbackInternal(kIPv6Loopback);
163}
164
165void SocketTest::TestUdpIPv4() {
166  UdpInternal(kIPv4Loopback);
167}
168
169void SocketTest::TestUdpIPv6() {
170  MAYBE_SKIP_IPV6;
171  UdpInternal(kIPv6Loopback);
172}
173
174void SocketTest::TestUdpReadyToSendIPv4() {
175#if !defined(OSX)
176  // TODO(ronghuawu): Enable this test (currently failed on build bots) on mac.
177  UdpReadyToSend(kIPv4Loopback);
178#endif
179}
180
181void SocketTest::TestUdpReadyToSendIPv6() {
182#if defined(WIN32)
183  // TODO(ronghuawu): Enable this test (currently flakey) on mac and linux.
184  MAYBE_SKIP_IPV6;
185  UdpReadyToSend(kIPv6Loopback);
186#endif
187}
188
189void SocketTest::TestGetSetOptionsIPv4() {
190  GetSetOptionsInternal(kIPv4Loopback);
191}
192
193void SocketTest::TestGetSetOptionsIPv6() {
194  MAYBE_SKIP_IPV6;
195  GetSetOptionsInternal(kIPv6Loopback);
196}
197
198// For unbound sockets, GetLocalAddress / GetRemoteAddress return AF_UNSPEC
199// values on Windows, but an empty address of the same family on Linux/MacOS X.
200bool IsUnspecOrEmptyIP(const IPAddress& address) {
201#ifndef WIN32
202  return IPIsAny(address);
203#else
204  return address.family() == AF_UNSPEC;
205#endif
206}
207
208void SocketTest::ConnectInternal(const IPAddress& loopback) {
209  testing::StreamSink sink;
210  SocketAddress accept_addr;
211
212  // Create client.
213  scoped_ptr<AsyncSocket> client(ss_->CreateAsyncSocket(loopback.family(),
214                                                        SOCK_STREAM));
215  sink.Monitor(client.get());
216  EXPECT_EQ(AsyncSocket::CS_CLOSED, client->GetState());
217  EXPECT_PRED1(IsUnspecOrEmptyIP, client->GetLocalAddress().ipaddr());
218
219  // Create server and listen.
220  scoped_ptr<AsyncSocket> server(
221      ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
222  sink.Monitor(server.get());
223  EXPECT_EQ(0, server->Bind(SocketAddress(loopback, 0)));
224  EXPECT_EQ(0, server->Listen(5));
225  EXPECT_EQ(AsyncSocket::CS_CONNECTING, server->GetState());
226
227  // Ensure no pending server connections, since we haven't done anything yet.
228  EXPECT_FALSE(sink.Check(server.get(), testing::SSE_READ));
229  EXPECT_TRUE(NULL == server->Accept(&accept_addr));
230  EXPECT_TRUE(accept_addr.IsNil());
231
232  // Attempt connect to listening socket.
233  EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
234  EXPECT_FALSE(client->GetLocalAddress().IsNil());
235  EXPECT_NE(server->GetLocalAddress(), client->GetLocalAddress());
236
237  // Client is connecting, outcome not yet determined.
238  EXPECT_EQ(AsyncSocket::CS_CONNECTING, client->GetState());
239  EXPECT_FALSE(sink.Check(client.get(), testing::SSE_OPEN));
240  EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE));
241
242  // Server has pending connection, accept it.
243  EXPECT_TRUE_WAIT((sink.Check(server.get(), testing::SSE_READ)), kTimeout);
244  scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr));
245  ASSERT_TRUE(accepted);
246  EXPECT_FALSE(accept_addr.IsNil());
247  EXPECT_EQ(accepted->GetRemoteAddress(), accept_addr);
248
249  // Connected from server perspective, check the addresses are correct.
250  EXPECT_EQ(AsyncSocket::CS_CONNECTED, accepted->GetState());
251  EXPECT_EQ(server->GetLocalAddress(), accepted->GetLocalAddress());
252  EXPECT_EQ(client->GetLocalAddress(), accepted->GetRemoteAddress());
253
254  // Connected from client perspective, check the addresses are correct.
255  EXPECT_EQ_WAIT(AsyncSocket::CS_CONNECTED, client->GetState(), kTimeout);
256  EXPECT_TRUE(sink.Check(client.get(), testing::SSE_OPEN));
257  EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE));
258  EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
259  EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress());
260}
261
262void SocketTest::ConnectWithDnsLookupInternal(const IPAddress& loopback,
263                                              const std::string& host) {
264  testing::StreamSink sink;
265  SocketAddress accept_addr;
266
267  // Create client.
268  scoped_ptr<AsyncSocket> client(
269      ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
270  sink.Monitor(client.get());
271
272  // Create server and listen.
273  scoped_ptr<AsyncSocket> server(
274      ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
275  sink.Monitor(server.get());
276  EXPECT_EQ(0, server->Bind(SocketAddress(loopback, 0)));
277  EXPECT_EQ(0, server->Listen(5));
278
279  // Attempt connect to listening socket.
280  SocketAddress dns_addr(server->GetLocalAddress());
281  dns_addr.SetIP(host);
282  EXPECT_EQ(0, client->Connect(dns_addr));
283  // TODO: Bind when doing DNS lookup.
284  //EXPECT_NE(kEmptyAddr, client->GetLocalAddress());  // Implicit Bind
285
286  // Client is connecting, outcome not yet determined.
287  EXPECT_EQ(AsyncSocket::CS_CONNECTING, client->GetState());
288  EXPECT_FALSE(sink.Check(client.get(), testing::SSE_OPEN));
289  EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE));
290
291  // Server has pending connection, accept it.
292  EXPECT_TRUE_WAIT((sink.Check(server.get(), testing::SSE_READ)), kTimeout);
293  scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr));
294  ASSERT_TRUE(accepted);
295  EXPECT_FALSE(accept_addr.IsNil());
296  EXPECT_EQ(accepted->GetRemoteAddress(), accept_addr);
297
298  // Connected from server perspective, check the addresses are correct.
299  EXPECT_EQ(AsyncSocket::CS_CONNECTED, accepted->GetState());
300  EXPECT_EQ(server->GetLocalAddress(), accepted->GetLocalAddress());
301  EXPECT_EQ(client->GetLocalAddress(), accepted->GetRemoteAddress());
302
303  // Connected from client perspective, check the addresses are correct.
304  EXPECT_EQ_WAIT(AsyncSocket::CS_CONNECTED, client->GetState(), kTimeout);
305  EXPECT_TRUE(sink.Check(client.get(), testing::SSE_OPEN));
306  EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE));
307  EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
308  EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress());
309}
310
311void SocketTest::ConnectFailInternal(const IPAddress& loopback) {
312  testing::StreamSink sink;
313  SocketAddress accept_addr;
314
315  // Create client.
316  scoped_ptr<AsyncSocket> client(
317      ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
318  sink.Monitor(client.get());
319
320  // Create server, but don't listen yet.
321  scoped_ptr<AsyncSocket> server(
322      ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
323  sink.Monitor(server.get());
324  EXPECT_EQ(0, server->Bind(SocketAddress(loopback, 0)));
325
326  // Attempt connect to a non-existent socket.
327  // We don't connect to the server socket created above, since on
328  // MacOS it takes about 75 seconds to get back an error!
329  SocketAddress bogus_addr(loopback, 65535);
330  EXPECT_EQ(0, client->Connect(bogus_addr));
331
332  // Wait for connection to fail (ECONNREFUSED).
333  EXPECT_EQ_WAIT(AsyncSocket::CS_CLOSED, client->GetState(), kTimeout);
334  EXPECT_FALSE(sink.Check(client.get(), testing::SSE_OPEN));
335  EXPECT_TRUE(sink.Check(client.get(), testing::SSE_ERROR));
336  EXPECT_TRUE(client->GetRemoteAddress().IsNil());
337
338  // Should be no pending server connections.
339  EXPECT_FALSE(sink.Check(server.get(), testing::SSE_READ));
340  EXPECT_TRUE(NULL == server->Accept(&accept_addr));
341  EXPECT_EQ(IPAddress(), accept_addr.ipaddr());
342}
343
344void SocketTest::ConnectWithDnsLookupFailInternal(const IPAddress& loopback) {
345  testing::StreamSink sink;
346  SocketAddress accept_addr;
347
348  // Create client.
349  scoped_ptr<AsyncSocket> client(
350      ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
351  sink.Monitor(client.get());
352
353  // Create server, but don't listen yet.
354  scoped_ptr<AsyncSocket> server(
355      ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
356  sink.Monitor(server.get());
357  EXPECT_EQ(0, server->Bind(SocketAddress(loopback, 0)));
358
359  // Attempt connect to a non-existent host.
360  // We don't connect to the server socket created above, since on
361  // MacOS it takes about 75 seconds to get back an error!
362  SocketAddress bogus_dns_addr("not-a-real-hostname", 65535);
363  EXPECT_EQ(0, client->Connect(bogus_dns_addr));
364
365  // Wait for connection to fail (EHOSTNOTFOUND).
366  bool dns_lookup_finished = false;
367  WAIT_(client->GetState() == AsyncSocket::CS_CLOSED, kTimeout,
368        dns_lookup_finished);
369  if (!dns_lookup_finished) {
370    LOG(LS_WARNING) << "Skipping test; DNS resolution took longer than 5 "
371                    << "seconds.";
372    return;
373  }
374
375  EXPECT_EQ_WAIT(AsyncSocket::CS_CLOSED, client->GetState(), kTimeout);
376  EXPECT_FALSE(sink.Check(client.get(), testing::SSE_OPEN));
377  EXPECT_TRUE(sink.Check(client.get(), testing::SSE_ERROR));
378  EXPECT_TRUE(client->GetRemoteAddress().IsNil());
379  // Should be no pending server connections.
380  EXPECT_FALSE(sink.Check(server.get(), testing::SSE_READ));
381  EXPECT_TRUE(NULL == server->Accept(&accept_addr));
382  EXPECT_TRUE(accept_addr.IsNil());
383}
384
385void SocketTest::ConnectWithClosedSocketInternal(const IPAddress& loopback) {
386  // Create server and listen.
387  scoped_ptr<AsyncSocket> server(
388      ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
389  EXPECT_EQ(0, server->Bind(SocketAddress(loopback, 0)));
390  EXPECT_EQ(0, server->Listen(5));
391
392  // Create a client and put in to CS_CLOSED state.
393  scoped_ptr<AsyncSocket> client(
394      ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
395  EXPECT_EQ(0, client->Close());
396  EXPECT_EQ(AsyncSocket::CS_CLOSED, client->GetState());
397
398  // Connect() should reinitialize the socket, and put it in to CS_CONNECTING.
399  EXPECT_EQ(0, client->Connect(SocketAddress(server->GetLocalAddress())));
400  EXPECT_EQ(AsyncSocket::CS_CONNECTING, client->GetState());
401}
402
403void SocketTest::ConnectWhileNotClosedInternal(const IPAddress& loopback) {
404  // Create server and listen.
405  testing::StreamSink sink;
406  scoped_ptr<AsyncSocket> server(
407      ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
408  sink.Monitor(server.get());
409  EXPECT_EQ(0, server->Bind(SocketAddress(loopback, 0)));
410  EXPECT_EQ(0, server->Listen(5));
411  // Create client, connect.
412  scoped_ptr<AsyncSocket> client(
413      ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
414  EXPECT_EQ(0, client->Connect(SocketAddress(server->GetLocalAddress())));
415  EXPECT_EQ(AsyncSocket::CS_CONNECTING, client->GetState());
416  // Try to connect again. Should fail, but not interfere with original attempt.
417  EXPECT_EQ(SOCKET_ERROR,
418            client->Connect(SocketAddress(server->GetLocalAddress())));
419
420  // Accept the original connection.
421  SocketAddress accept_addr;
422  EXPECT_TRUE_WAIT((sink.Check(server.get(), testing::SSE_READ)), kTimeout);
423  scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr));
424  ASSERT_TRUE(accepted);
425  EXPECT_FALSE(accept_addr.IsNil());
426
427  // Check the states and addresses.
428  EXPECT_EQ(AsyncSocket::CS_CONNECTED, accepted->GetState());
429  EXPECT_EQ(server->GetLocalAddress(), accepted->GetLocalAddress());
430  EXPECT_EQ(client->GetLocalAddress(), accepted->GetRemoteAddress());
431  EXPECT_EQ_WAIT(AsyncSocket::CS_CONNECTED, client->GetState(), kTimeout);
432  EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
433  EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress());
434
435  // Try to connect again, to an unresolved hostname.
436  // Shouldn't break anything.
437  EXPECT_EQ(SOCKET_ERROR,
438            client->Connect(SocketAddress("localhost",
439                                          server->GetLocalAddress().port())));
440  EXPECT_EQ(AsyncSocket::CS_CONNECTED, accepted->GetState());
441  EXPECT_EQ(AsyncSocket::CS_CONNECTED, client->GetState());
442  EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
443  EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress());
444}
445
446void SocketTest::ServerCloseDuringConnectInternal(const IPAddress& loopback) {
447  testing::StreamSink sink;
448
449  // Create client.
450  scoped_ptr<AsyncSocket> client(
451      ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
452  sink.Monitor(client.get());
453
454  // Create server and listen.
455  scoped_ptr<AsyncSocket> server(
456      ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
457  sink.Monitor(server.get());
458  EXPECT_EQ(0, server->Bind(SocketAddress(loopback, 0)));
459  EXPECT_EQ(0, server->Listen(5));
460
461  // Attempt connect to listening socket.
462  EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
463
464  // Close down the server while the socket is in the accept queue.
465  EXPECT_TRUE_WAIT(sink.Check(server.get(), testing::SSE_READ), kTimeout);
466  server->Close();
467
468  // This should fail the connection for the client. Clean up.
469  EXPECT_EQ_WAIT(AsyncSocket::CS_CLOSED, client->GetState(), kTimeout);
470  EXPECT_TRUE(sink.Check(client.get(), testing::SSE_ERROR));
471  client->Close();
472}
473
474void SocketTest::ClientCloseDuringConnectInternal(const IPAddress& loopback) {
475  testing::StreamSink sink;
476  SocketAddress accept_addr;
477
478  // Create client.
479  scoped_ptr<AsyncSocket> client(
480      ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
481  sink.Monitor(client.get());
482
483  // Create server and listen.
484  scoped_ptr<AsyncSocket> server(
485      ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
486  sink.Monitor(server.get());
487  EXPECT_EQ(0, server->Bind(SocketAddress(loopback, 0)));
488  EXPECT_EQ(0, server->Listen(5));
489
490  // Attempt connect to listening socket.
491  EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
492
493  // Close down the client while the socket is in the accept queue.
494  EXPECT_TRUE_WAIT(sink.Check(server.get(), testing::SSE_READ), kTimeout);
495  client->Close();
496
497  // The connection should still be able to be accepted.
498  scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr));
499  ASSERT_TRUE(accepted);
500  sink.Monitor(accepted.get());
501  EXPECT_EQ(AsyncSocket::CS_CONNECTED, accepted->GetState());
502
503  // The accepted socket should then close (possibly with err, timing-related)
504  EXPECT_EQ_WAIT(AsyncSocket::CS_CLOSED, accepted->GetState(), kTimeout);
505  EXPECT_TRUE(sink.Check(accepted.get(), testing::SSE_CLOSE) ||
506              sink.Check(accepted.get(), testing::SSE_ERROR));
507
508  // The client should not get a close event.
509  EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE));
510}
511
512void SocketTest::ServerCloseInternal(const IPAddress& loopback) {
513  testing::StreamSink sink;
514  SocketAddress accept_addr;
515
516  // Create client.
517  scoped_ptr<AsyncSocket> client(
518      ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
519  sink.Monitor(client.get());
520
521  // Create server and listen.
522  scoped_ptr<AsyncSocket> server(
523      ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
524  sink.Monitor(server.get());
525  EXPECT_EQ(0, server->Bind(SocketAddress(loopback, 0)));
526  EXPECT_EQ(0, server->Listen(5));
527
528  // Attempt connection.
529  EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
530
531  // Accept connection.
532  EXPECT_TRUE_WAIT((sink.Check(server.get(), testing::SSE_READ)), kTimeout);
533  scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr));
534  ASSERT_TRUE(accepted);
535  sink.Monitor(accepted.get());
536
537  // Both sides are now connected.
538  EXPECT_EQ_WAIT(AsyncSocket::CS_CONNECTED, client->GetState(), kTimeout);
539  EXPECT_TRUE(sink.Check(client.get(), testing::SSE_OPEN));
540  EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress());
541  EXPECT_EQ(accepted->GetRemoteAddress(), client->GetLocalAddress());
542
543  // Send data to the client, and then close the connection.
544  EXPECT_EQ(1, accepted->Send("a", 1));
545  accepted->Close();
546  EXPECT_EQ(AsyncSocket::CS_CLOSED, accepted->GetState());
547
548  // Expect that the client is notified, and has not yet closed.
549  EXPECT_TRUE_WAIT(sink.Check(client.get(), testing::SSE_READ), kTimeout);
550  EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE));
551  EXPECT_EQ(AsyncSocket::CS_CONNECTED, client->GetState());
552
553  // Ensure the data can be read.
554  char buffer[10];
555  EXPECT_EQ(1, client->Recv(buffer, sizeof(buffer)));
556  EXPECT_EQ('a', buffer[0]);
557
558  // Now we should close, but the remote address will remain.
559  EXPECT_EQ_WAIT(AsyncSocket::CS_CLOSED, client->GetState(), kTimeout);
560  EXPECT_TRUE(sink.Check(client.get(), testing::SSE_CLOSE));
561  EXPECT_FALSE(client->GetRemoteAddress().IsAnyIP());
562
563  // The closer should not get a close signal.
564  EXPECT_FALSE(sink.Check(accepted.get(), testing::SSE_CLOSE));
565  EXPECT_TRUE(accepted->GetRemoteAddress().IsNil());
566
567  // And the closee should only get a single signal.
568  Thread::Current()->ProcessMessages(0);
569  EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE));
570
571  // Close down the client and ensure all is good.
572  client->Close();
573  EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE));
574  EXPECT_TRUE(client->GetRemoteAddress().IsNil());
575}
576
577class SocketCloser : public sigslot::has_slots<> {
578 public:
579  void OnClose(AsyncSocket* socket, int error) {
580    socket->Close();  // Deleting here would blow up the vector of handlers
581                      // for the socket's signal.
582  }
583};
584
585void SocketTest::CloseInClosedCallbackInternal(const IPAddress& loopback) {
586  testing::StreamSink sink;
587  SocketCloser closer;
588  SocketAddress accept_addr;
589
590  // Create client.
591  scoped_ptr<AsyncSocket> client(
592      ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
593  sink.Monitor(client.get());
594  client->SignalCloseEvent.connect(&closer, &SocketCloser::OnClose);
595
596  // Create server and listen.
597  scoped_ptr<AsyncSocket> server(
598      ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
599  sink.Monitor(server.get());
600  EXPECT_EQ(0, server->Bind(SocketAddress(loopback, 0)));
601  EXPECT_EQ(0, server->Listen(5));
602
603  // Attempt connection.
604  EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
605
606  // Accept connection.
607  EXPECT_TRUE_WAIT((sink.Check(server.get(), testing::SSE_READ)), kTimeout);
608  scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr));
609  ASSERT_TRUE(accepted);
610  sink.Monitor(accepted.get());
611
612  // Both sides are now connected.
613  EXPECT_EQ_WAIT(AsyncSocket::CS_CONNECTED, client->GetState(), kTimeout);
614  EXPECT_TRUE(sink.Check(client.get(), testing::SSE_OPEN));
615  EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress());
616  EXPECT_EQ(accepted->GetRemoteAddress(), client->GetLocalAddress());
617
618  // Send data to the client, and then close the connection.
619  accepted->Close();
620  EXPECT_EQ(AsyncSocket::CS_CLOSED, accepted->GetState());
621
622  // Expect that the client is notified, and has not yet closed.
623  EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE));
624  EXPECT_EQ(AsyncSocket::CS_CONNECTED, client->GetState());
625
626  // Now we should be closed and invalidated
627  EXPECT_EQ_WAIT(AsyncSocket::CS_CLOSED, client->GetState(), kTimeout);
628  EXPECT_TRUE(sink.Check(client.get(), testing::SSE_CLOSE));
629  EXPECT_TRUE(Socket::CS_CLOSED == client->GetState());
630}
631
632class Sleeper : public MessageHandler {
633 public:
634  Sleeper() {}
635  void OnMessage(Message* msg) {
636    Thread::Current()->SleepMs(500);
637  }
638};
639
640void SocketTest::SocketServerWaitInternal(const IPAddress& loopback) {
641  testing::StreamSink sink;
642  SocketAddress accept_addr;
643
644  // Create & connect server and client sockets.
645  scoped_ptr<AsyncSocket> client(
646      ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
647  scoped_ptr<AsyncSocket> server(
648      ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
649  sink.Monitor(client.get());
650  sink.Monitor(server.get());
651  EXPECT_EQ(0, server->Bind(SocketAddress(loopback, 0)));
652  EXPECT_EQ(0, server->Listen(5));
653
654  EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
655  EXPECT_TRUE_WAIT((sink.Check(server.get(), testing::SSE_READ)), kTimeout);
656
657  scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr));
658  ASSERT_TRUE(accepted);
659  sink.Monitor(accepted.get());
660  EXPECT_EQ(AsyncSocket::CS_CONNECTED, accepted->GetState());
661  EXPECT_EQ(server->GetLocalAddress(), accepted->GetLocalAddress());
662  EXPECT_EQ(client->GetLocalAddress(), accepted->GetRemoteAddress());
663
664  EXPECT_EQ_WAIT(AsyncSocket::CS_CONNECTED, client->GetState(), kTimeout);
665  EXPECT_TRUE(sink.Check(client.get(), testing::SSE_OPEN));
666  EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE));
667  EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
668  EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress());
669
670  // Do an i/o operation, triggering an eventual callback.
671  EXPECT_FALSE(sink.Check(accepted.get(), testing::SSE_READ));
672  char buf[1024] = {0};
673
674  EXPECT_EQ(1024, client->Send(buf, 1024));
675  EXPECT_FALSE(sink.Check(accepted.get(), testing::SSE_READ));
676
677  // Shouldn't signal when blocked in a thread Send, where process_io is false.
678  scoped_ptr<Thread> thread(new Thread());
679  thread->Start();
680  Sleeper sleeper;
681  TypedMessageData<AsyncSocket*> data(client.get());
682  thread->Send(&sleeper, 0, &data);
683  EXPECT_FALSE(sink.Check(accepted.get(), testing::SSE_READ));
684
685  // But should signal when process_io is true.
686  EXPECT_TRUE_WAIT((sink.Check(accepted.get(), testing::SSE_READ)), kTimeout);
687  EXPECT_LT(0, accepted->Recv(buf, 1024));
688}
689
690void SocketTest::TcpInternal(const IPAddress& loopback) {
691  testing::StreamSink sink;
692  SocketAddress accept_addr;
693
694  // Create test data.
695  const size_t kDataSize = 1024 * 1024;
696  scoped_array<char> send_buffer(new char[kDataSize]);
697  scoped_array<char> recv_buffer(new char[kDataSize]);
698  size_t send_pos = 0, recv_pos = 0;
699  for (size_t i = 0; i < kDataSize; ++i) {
700    send_buffer[i] = static_cast<char>(i % 256);
701    recv_buffer[i] = 0;
702  }
703
704  // Create client.
705  scoped_ptr<AsyncSocket> client(
706      ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
707  sink.Monitor(client.get());
708
709  // Create server and listen.
710  scoped_ptr<AsyncSocket> server(
711      ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
712  sink.Monitor(server.get());
713  EXPECT_EQ(0, server->Bind(SocketAddress(loopback, 0)));
714  EXPECT_EQ(0, server->Listen(5));
715
716  // Attempt connection.
717  EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
718
719  // Accept connection.
720  EXPECT_TRUE_WAIT((sink.Check(server.get(), testing::SSE_READ)), kTimeout);
721  scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr));
722  ASSERT_TRUE(accepted);
723  sink.Monitor(accepted.get());
724
725  // Both sides are now connected.
726  EXPECT_EQ_WAIT(AsyncSocket::CS_CONNECTED, client->GetState(), kTimeout);
727  EXPECT_TRUE(sink.Check(client.get(), testing::SSE_OPEN));
728  EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress());
729  EXPECT_EQ(accepted->GetRemoteAddress(), client->GetLocalAddress());
730
731  // Send and receive a bunch of data.
732  bool send_waiting_for_writability = false;
733  bool send_expect_success = true;
734  bool recv_waiting_for_readability = true;
735  bool recv_expect_success = false;
736  int data_in_flight = 0;
737  while (recv_pos < kDataSize) {
738    // Send as much as we can if we've been cleared to send.
739    while (!send_waiting_for_writability && send_pos < kDataSize) {
740      int tosend = static_cast<int>(kDataSize - send_pos);
741      int sent = accepted->Send(send_buffer.get() + send_pos, tosend);
742      if (send_expect_success) {
743        // The first Send() after connecting or getting writability should
744        // succeed and send some data.
745        EXPECT_GT(sent, 0);
746        send_expect_success = false;
747      }
748      if (sent >= 0) {
749        EXPECT_LE(sent, tosend);
750        send_pos += sent;
751        data_in_flight += sent;
752      } else {
753        ASSERT_TRUE(accepted->IsBlocking());
754        send_waiting_for_writability = true;
755      }
756    }
757
758    // Read all the sent data.
759    while (data_in_flight > 0) {
760      if (recv_waiting_for_readability) {
761        // Wait until data is available.
762        EXPECT_TRUE_WAIT(sink.Check(client.get(), testing::SSE_READ), kTimeout);
763        recv_waiting_for_readability = false;
764        recv_expect_success = true;
765      }
766
767      // Receive as much as we can get in a single recv call.
768      int rcvd = client->Recv(recv_buffer.get() + recv_pos,
769                              kDataSize - recv_pos);
770
771      if (recv_expect_success) {
772        // The first Recv() after getting readability should succeed and receive
773        // some data.
774        // TODO: The following line is disabled due to flakey pulse
775        //     builds.  Re-enable if/when possible.
776        // EXPECT_GT(rcvd, 0);
777        recv_expect_success = false;
778      }
779      if (rcvd >= 0) {
780        EXPECT_LE(rcvd, data_in_flight);
781        recv_pos += rcvd;
782        data_in_flight -= rcvd;
783      } else {
784        ASSERT_TRUE(client->IsBlocking());
785        recv_waiting_for_readability = true;
786      }
787    }
788
789    // Once all that we've sent has been rcvd, expect to be able to send again.
790    if (send_waiting_for_writability) {
791      EXPECT_TRUE_WAIT(sink.Check(accepted.get(), testing::SSE_WRITE),
792                       kTimeout);
793      send_waiting_for_writability = false;
794      send_expect_success = true;
795    }
796  }
797
798  // The received data matches the sent data.
799  EXPECT_EQ(kDataSize, send_pos);
800  EXPECT_EQ(kDataSize, recv_pos);
801  EXPECT_EQ(0, memcmp(recv_buffer.get(), send_buffer.get(), kDataSize));
802
803  // Close down.
804  accepted->Close();
805  EXPECT_EQ_WAIT(AsyncSocket::CS_CLOSED, client->GetState(), kTimeout);
806  EXPECT_TRUE(sink.Check(client.get(), testing::SSE_CLOSE));
807  client->Close();
808}
809
810void SocketTest::SingleFlowControlCallbackInternal(const IPAddress& loopback) {
811  testing::StreamSink sink;
812  SocketAddress accept_addr;
813
814  // Create client.
815  scoped_ptr<AsyncSocket> client(
816      ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
817  sink.Monitor(client.get());
818
819  // Create server and listen.
820  scoped_ptr<AsyncSocket> server(
821      ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
822  sink.Monitor(server.get());
823  EXPECT_EQ(0, server->Bind(SocketAddress(loopback, 0)));
824  EXPECT_EQ(0, server->Listen(5));
825
826  // Attempt connection.
827  EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
828
829  // Accept connection.
830  EXPECT_TRUE_WAIT((sink.Check(server.get(), testing::SSE_READ)), kTimeout);
831  scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr));
832  ASSERT_TRUE(accepted);
833  sink.Monitor(accepted.get());
834
835  // Both sides are now connected.
836  EXPECT_EQ_WAIT(AsyncSocket::CS_CONNECTED, client->GetState(), kTimeout);
837  EXPECT_TRUE(sink.Check(client.get(), testing::SSE_OPEN));
838  EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress());
839  EXPECT_EQ(accepted->GetRemoteAddress(), client->GetLocalAddress());
840
841  // Expect a writable callback from the connect.
842  EXPECT_TRUE_WAIT(sink.Check(accepted.get(), testing::SSE_WRITE), kTimeout);
843
844  // Fill the socket buffer.
845  char buf[1024 * 16] = {0};
846  int sends = 0;
847  while (++sends && accepted->Send(&buf, ARRAY_SIZE(buf)) != -1) {}
848  EXPECT_TRUE(accepted->IsBlocking());
849
850  // Wait until data is available.
851  EXPECT_TRUE_WAIT(sink.Check(client.get(), testing::SSE_READ), kTimeout);
852
853  // Pull data.
854  for (int i = 0; i < sends; ++i) {
855    client->Recv(buf, ARRAY_SIZE(buf));
856  }
857
858  // Expect at least one additional writable callback.
859  EXPECT_TRUE_WAIT(sink.Check(accepted.get(), testing::SSE_WRITE), kTimeout);
860
861  // Adding data in response to the writeable callback shouldn't cause infinite
862  // callbacks.
863  int extras = 0;
864  for (int i = 0; i < 100; ++i) {
865    accepted->Send(&buf, ARRAY_SIZE(buf));
866    talk_base::Thread::Current()->ProcessMessages(1);
867    if (sink.Check(accepted.get(), testing::SSE_WRITE)) {
868      extras++;
869    }
870  }
871  EXPECT_LT(extras, 2);
872
873  // Close down.
874  accepted->Close();
875  client->Close();
876}
877
878void SocketTest::UdpInternal(const IPAddress& loopback) {
879  SocketAddress empty = EmptySocketAddressWithFamily(loopback.family());
880  // Test basic bind and connect behavior.
881  AsyncSocket* socket =
882      ss_->CreateAsyncSocket(loopback.family(), SOCK_DGRAM);
883  EXPECT_EQ(AsyncSocket::CS_CLOSED, socket->GetState());
884  EXPECT_EQ(0, socket->Bind(SocketAddress(loopback, 0)));
885  SocketAddress addr1 = socket->GetLocalAddress();
886  EXPECT_EQ(0, socket->Connect(addr1));
887  EXPECT_EQ(AsyncSocket::CS_CONNECTED, socket->GetState());
888  socket->Close();
889  EXPECT_EQ(AsyncSocket::CS_CLOSED, socket->GetState());
890  delete socket;
891
892  // Test send/receive behavior.
893  scoped_ptr<TestClient> client1(
894      new TestClient(AsyncUDPSocket::Create(ss_, addr1)));
895  scoped_ptr<TestClient> client2(
896      new TestClient(AsyncUDPSocket::Create(ss_, empty)));
897
898  SocketAddress addr2;
899  EXPECT_EQ(3, client2->SendTo("foo", 3, addr1));
900  EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &addr2));
901
902  SocketAddress addr3;
903  EXPECT_EQ(6, client1->SendTo("bizbaz", 6, addr2));
904  EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &addr3));
905  EXPECT_EQ(addr3, addr1);
906  // TODO: figure out what the intent is here
907  for (int i = 0; i < 10; ++i) {
908    client2.reset(new TestClient(AsyncUDPSocket::Create(ss_, empty)));
909
910    SocketAddress addr4;
911    EXPECT_EQ(3, client2->SendTo("foo", 3, addr1));
912    EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &addr4));
913    EXPECT_EQ(addr4.ipaddr(), addr2.ipaddr());
914
915    SocketAddress addr5;
916    EXPECT_EQ(6, client1->SendTo("bizbaz", 6, addr4));
917    EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &addr5));
918    EXPECT_EQ(addr5, addr1);
919
920    addr2 = addr4;
921  }
922}
923
924void SocketTest::UdpReadyToSend(const IPAddress& loopback) {
925  SocketAddress empty = EmptySocketAddressWithFamily(loopback.family());
926  // RFC 5737 - The blocks 192.0.2.0/24 (TEST-NET-1) ... are provided for use in
927  // documentation.
928  // RFC 3849 - 2001:DB8::/32 as a documentation-only prefix.
929  std::string dest = (loopback.family() == AF_INET6) ?
930      "2001:db8::1" : "192.0.2.0";
931  SocketAddress test_addr(dest, 2345);
932
933  // Test send
934  scoped_ptr<TestClient> client(
935      new TestClient(AsyncUDPSocket::Create(ss_, empty)));
936  int test_packet_size = 1200;
937  talk_base::scoped_array<char> test_packet(new char[test_packet_size]);
938  // Init the test packet just to avoid memcheck warning.
939  memset(test_packet.get(), 0, test_packet_size);
940  // Set the send buffer size to the same size as the test packet to have a
941  // better chance to get EWOULDBLOCK.
942  int send_buffer_size = test_packet_size;
943#if defined(LINUX)
944  send_buffer_size /= 2;
945#endif
946  client->SetOption(talk_base::Socket::OPT_SNDBUF, send_buffer_size);
947
948  int error = 0;
949  uint32 start_ms = Time();
950  int sent_packet_num = 0;
951  int expected_error = EWOULDBLOCK;
952  while (start_ms + kTimeout > Time()) {
953    int ret = client->SendTo(test_packet.get(), test_packet_size, test_addr);
954    ++sent_packet_num;
955    if (ret != test_packet_size) {
956      error = client->GetError();
957      if (error == expected_error) {
958        LOG(LS_INFO) << "Got expected error code after sending "
959                     << sent_packet_num << " packets.";
960        break;
961      }
962    }
963  }
964  EXPECT_EQ(expected_error, error);
965  EXPECT_FALSE(client->ready_to_send());
966  EXPECT_TRUE_WAIT(client->ready_to_send(), kTimeout);
967  LOG(LS_INFO) << "Got SignalReadyToSend";
968}
969
970void SocketTest::GetSetOptionsInternal(const IPAddress& loopback) {
971  talk_base::scoped_ptr<AsyncSocket> socket(
972      ss_->CreateAsyncSocket(loopback.family(), SOCK_DGRAM));
973  socket->Bind(SocketAddress(loopback, 0));
974
975  // Check SNDBUF/RCVBUF.
976  const int desired_size = 12345;
977#if defined(LINUX) || defined(ANDROID)
978  // Yes, really.  It's in the kernel source.
979  const int expected_size = desired_size * 2;
980#else   // !LINUX && !ANDROID
981  const int expected_size = desired_size;
982#endif  // !LINUX && !ANDROID
983  int recv_size = 0;
984  int send_size = 0;
985  // get the initial sizes
986  ASSERT_NE(-1, socket->GetOption(Socket::OPT_RCVBUF, &recv_size));
987  ASSERT_NE(-1, socket->GetOption(Socket::OPT_SNDBUF, &send_size));
988  // set our desired sizes
989  ASSERT_NE(-1, socket->SetOption(Socket::OPT_RCVBUF, desired_size));
990  ASSERT_NE(-1, socket->SetOption(Socket::OPT_SNDBUF, desired_size));
991  // get the sizes again
992  ASSERT_NE(-1, socket->GetOption(Socket::OPT_RCVBUF, &recv_size));
993  ASSERT_NE(-1, socket->GetOption(Socket::OPT_SNDBUF, &send_size));
994  // make sure they are right
995  ASSERT_EQ(expected_size, recv_size);
996  ASSERT_EQ(expected_size, send_size);
997
998  // Check that we can't set NODELAY on a UDP socket.
999  int current_nd, desired_nd = 1;
1000  ASSERT_EQ(-1, socket->GetOption(Socket::OPT_NODELAY, &current_nd));
1001  ASSERT_EQ(-1, socket->SetOption(Socket::OPT_NODELAY, desired_nd));
1002
1003  // Skip the esimate MTU test for IPv6 for now.
1004  if (loopback.family() != AF_INET6) {
1005    // Try estimating MTU.
1006    talk_base::scoped_ptr<AsyncSocket>
1007        mtu_socket(
1008            ss_->CreateAsyncSocket(loopback.family(), SOCK_DGRAM));
1009    mtu_socket->Bind(SocketAddress(loopback, 0));
1010    uint16 mtu;
1011    // should fail until we connect
1012    ASSERT_EQ(-1, mtu_socket->EstimateMTU(&mtu));
1013    mtu_socket->Connect(SocketAddress(loopback, 0));
1014#if defined(WIN32)
1015    // now it should succeed
1016    ASSERT_NE(-1, mtu_socket->EstimateMTU(&mtu));
1017    ASSERT_GE(mtu, 1492);  // should be at least the 1492 "plateau" on localhost
1018#elif defined(OSX)
1019    // except on OSX, where it's not yet implemented
1020    ASSERT_EQ(-1, mtu_socket->EstimateMTU(&mtu));
1021#else
1022    // and the behavior seems unpredictable on Linux,
1023    // failing on the build machine
1024    // but succeeding on my Ubiquity instance.
1025#endif
1026  }
1027}
1028
1029}  // namespace talk_base
1030