1import asyncore 2import unittest 3import select 4import os 5import socket 6import sys 7import time 8import errno 9import struct 10 11from test import support 12from io import BytesIO 13 14if support.PGO: 15 raise unittest.SkipTest("test is not helpful for PGO") 16 17try: 18 import threading 19except ImportError: 20 threading = None 21 22TIMEOUT = 3 23HAS_UNIX_SOCKETS = hasattr(socket, 'AF_UNIX') 24 25class dummysocket: 26 def __init__(self): 27 self.closed = False 28 29 def close(self): 30 self.closed = True 31 32 def fileno(self): 33 return 42 34 35class dummychannel: 36 def __init__(self): 37 self.socket = dummysocket() 38 39 def close(self): 40 self.socket.close() 41 42class exitingdummy: 43 def __init__(self): 44 pass 45 46 def handle_read_event(self): 47 raise asyncore.ExitNow() 48 49 handle_write_event = handle_read_event 50 handle_close = handle_read_event 51 handle_expt_event = handle_read_event 52 53class crashingdummy: 54 def __init__(self): 55 self.error_handled = False 56 57 def handle_read_event(self): 58 raise Exception() 59 60 handle_write_event = handle_read_event 61 handle_close = handle_read_event 62 handle_expt_event = handle_read_event 63 64 def handle_error(self): 65 self.error_handled = True 66 67# used when testing senders; just collects what it gets until newline is sent 68def capture_server(evt, buf, serv): 69 try: 70 serv.listen() 71 conn, addr = serv.accept() 72 except socket.timeout: 73 pass 74 else: 75 n = 200 76 start = time.time() 77 while n > 0 and time.time() - start < 3.0: 78 r, w, e = select.select([conn], [], [], 0.1) 79 if r: 80 n -= 1 81 data = conn.recv(10) 82 # keep everything except for the newline terminator 83 buf.write(data.replace(b'\n', b'')) 84 if b'\n' in data: 85 break 86 time.sleep(0.01) 87 88 conn.close() 89 finally: 90 serv.close() 91 evt.set() 92 93def bind_af_aware(sock, addr): 94 """Helper function to bind a socket according to its family.""" 95 if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX: 96 # Make sure the path doesn't exist. 97 support.unlink(addr) 98 support.bind_unix_socket(sock, addr) 99 else: 100 sock.bind(addr) 101 102 103class HelperFunctionTests(unittest.TestCase): 104 def test_readwriteexc(self): 105 # Check exception handling behavior of read, write and _exception 106 107 # check that ExitNow exceptions in the object handler method 108 # bubbles all the way up through asyncore read/write/_exception calls 109 tr1 = exitingdummy() 110 self.assertRaises(asyncore.ExitNow, asyncore.read, tr1) 111 self.assertRaises(asyncore.ExitNow, asyncore.write, tr1) 112 self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1) 113 114 # check that an exception other than ExitNow in the object handler 115 # method causes the handle_error method to get called 116 tr2 = crashingdummy() 117 asyncore.read(tr2) 118 self.assertEqual(tr2.error_handled, True) 119 120 tr2 = crashingdummy() 121 asyncore.write(tr2) 122 self.assertEqual(tr2.error_handled, True) 123 124 tr2 = crashingdummy() 125 asyncore._exception(tr2) 126 self.assertEqual(tr2.error_handled, True) 127 128 # asyncore.readwrite uses constants in the select module that 129 # are not present in Windows systems (see this thread: 130 # http://mail.python.org/pipermail/python-list/2001-October/109973.html) 131 # These constants should be present as long as poll is available 132 133 @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') 134 def test_readwrite(self): 135 # Check that correct methods are called by readwrite() 136 137 attributes = ('read', 'expt', 'write', 'closed', 'error_handled') 138 139 expected = ( 140 (select.POLLIN, 'read'), 141 (select.POLLPRI, 'expt'), 142 (select.POLLOUT, 'write'), 143 (select.POLLERR, 'closed'), 144 (select.POLLHUP, 'closed'), 145 (select.POLLNVAL, 'closed'), 146 ) 147 148 class testobj: 149 def __init__(self): 150 self.read = False 151 self.write = False 152 self.closed = False 153 self.expt = False 154 self.error_handled = False 155 156 def handle_read_event(self): 157 self.read = True 158 159 def handle_write_event(self): 160 self.write = True 161 162 def handle_close(self): 163 self.closed = True 164 165 def handle_expt_event(self): 166 self.expt = True 167 168 def handle_error(self): 169 self.error_handled = True 170 171 for flag, expectedattr in expected: 172 tobj = testobj() 173 self.assertEqual(getattr(tobj, expectedattr), False) 174 asyncore.readwrite(tobj, flag) 175 176 # Only the attribute modified by the routine we expect to be 177 # called should be True. 178 for attr in attributes: 179 self.assertEqual(getattr(tobj, attr), attr==expectedattr) 180 181 # check that ExitNow exceptions in the object handler method 182 # bubbles all the way up through asyncore readwrite call 183 tr1 = exitingdummy() 184 self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag) 185 186 # check that an exception other than ExitNow in the object handler 187 # method causes the handle_error method to get called 188 tr2 = crashingdummy() 189 self.assertEqual(tr2.error_handled, False) 190 asyncore.readwrite(tr2, flag) 191 self.assertEqual(tr2.error_handled, True) 192 193 def test_closeall(self): 194 self.closeall_check(False) 195 196 def test_closeall_default(self): 197 self.closeall_check(True) 198 199 def closeall_check(self, usedefault): 200 # Check that close_all() closes everything in a given map 201 202 l = [] 203 testmap = {} 204 for i in range(10): 205 c = dummychannel() 206 l.append(c) 207 self.assertEqual(c.socket.closed, False) 208 testmap[i] = c 209 210 if usedefault: 211 socketmap = asyncore.socket_map 212 try: 213 asyncore.socket_map = testmap 214 asyncore.close_all() 215 finally: 216 testmap, asyncore.socket_map = asyncore.socket_map, socketmap 217 else: 218 asyncore.close_all(testmap) 219 220 self.assertEqual(len(testmap), 0) 221 222 for c in l: 223 self.assertEqual(c.socket.closed, True) 224 225 def test_compact_traceback(self): 226 try: 227 raise Exception("I don't like spam!") 228 except: 229 real_t, real_v, real_tb = sys.exc_info() 230 r = asyncore.compact_traceback() 231 else: 232 self.fail("Expected exception") 233 234 (f, function, line), t, v, info = r 235 self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py') 236 self.assertEqual(function, 'test_compact_traceback') 237 self.assertEqual(t, real_t) 238 self.assertEqual(v, real_v) 239 self.assertEqual(info, '[%s|%s|%s]' % (f, function, line)) 240 241 242class DispatcherTests(unittest.TestCase): 243 def setUp(self): 244 pass 245 246 def tearDown(self): 247 asyncore.close_all() 248 249 def test_basic(self): 250 d = asyncore.dispatcher() 251 self.assertEqual(d.readable(), True) 252 self.assertEqual(d.writable(), True) 253 254 def test_repr(self): 255 d = asyncore.dispatcher() 256 self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d)) 257 258 def test_log(self): 259 d = asyncore.dispatcher() 260 261 # capture output of dispatcher.log() (to stderr) 262 l1 = "Lovely spam! Wonderful spam!" 263 l2 = "I don't like spam!" 264 with support.captured_stderr() as stderr: 265 d.log(l1) 266 d.log(l2) 267 268 lines = stderr.getvalue().splitlines() 269 self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2]) 270 271 def test_log_info(self): 272 d = asyncore.dispatcher() 273 274 # capture output of dispatcher.log_info() (to stdout via print) 275 l1 = "Have you got anything without spam?" 276 l2 = "Why can't she have egg bacon spam and sausage?" 277 l3 = "THAT'S got spam in it!" 278 with support.captured_stdout() as stdout: 279 d.log_info(l1, 'EGGS') 280 d.log_info(l2) 281 d.log_info(l3, 'SPAM') 282 283 lines = stdout.getvalue().splitlines() 284 expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3] 285 self.assertEqual(lines, expected) 286 287 def test_unhandled(self): 288 d = asyncore.dispatcher() 289 d.ignore_log_types = () 290 291 # capture output of dispatcher.log_info() (to stdout via print) 292 with support.captured_stdout() as stdout: 293 d.handle_expt() 294 d.handle_read() 295 d.handle_write() 296 d.handle_connect() 297 298 lines = stdout.getvalue().splitlines() 299 expected = ['warning: unhandled incoming priority event', 300 'warning: unhandled read event', 301 'warning: unhandled write event', 302 'warning: unhandled connect event'] 303 self.assertEqual(lines, expected) 304 305 def test_strerror(self): 306 # refers to bug #8573 307 err = asyncore._strerror(errno.EPERM) 308 if hasattr(os, 'strerror'): 309 self.assertEqual(err, os.strerror(errno.EPERM)) 310 err = asyncore._strerror(-1) 311 self.assertTrue(err != "") 312 313 314class dispatcherwithsend_noread(asyncore.dispatcher_with_send): 315 def readable(self): 316 return False 317 318 def handle_connect(self): 319 pass 320 321 322class DispatcherWithSendTests(unittest.TestCase): 323 def setUp(self): 324 pass 325 326 def tearDown(self): 327 asyncore.close_all() 328 329 @unittest.skipUnless(threading, 'Threading required for this test.') 330 @support.reap_threads 331 def test_send(self): 332 evt = threading.Event() 333 sock = socket.socket() 334 sock.settimeout(3) 335 port = support.bind_port(sock) 336 337 cap = BytesIO() 338 args = (evt, cap, sock) 339 t = threading.Thread(target=capture_server, args=args) 340 t.start() 341 try: 342 # wait a little longer for the server to initialize (it sometimes 343 # refuses connections on slow machines without this wait) 344 time.sleep(0.2) 345 346 data = b"Suppose there isn't a 16-ton weight?" 347 d = dispatcherwithsend_noread() 348 d.create_socket() 349 d.connect((support.HOST, port)) 350 351 # give time for socket to connect 352 time.sleep(0.1) 353 354 d.send(data) 355 d.send(data) 356 d.send(b'\n') 357 358 n = 1000 359 while d.out_buffer and n > 0: 360 asyncore.poll() 361 n -= 1 362 363 evt.wait() 364 365 self.assertEqual(cap.getvalue(), data*2) 366 finally: 367 t.join(timeout=TIMEOUT) 368 if t.is_alive(): 369 self.fail("join() timed out") 370 371 372@unittest.skipUnless(hasattr(asyncore, 'file_wrapper'), 373 'asyncore.file_wrapper required') 374class FileWrapperTest(unittest.TestCase): 375 def setUp(self): 376 self.d = b"It's not dead, it's sleeping!" 377 with open(support.TESTFN, 'wb') as file: 378 file.write(self.d) 379 380 def tearDown(self): 381 support.unlink(support.TESTFN) 382 383 def test_recv(self): 384 fd = os.open(support.TESTFN, os.O_RDONLY) 385 w = asyncore.file_wrapper(fd) 386 os.close(fd) 387 388 self.assertNotEqual(w.fd, fd) 389 self.assertNotEqual(w.fileno(), fd) 390 self.assertEqual(w.recv(13), b"It's not dead") 391 self.assertEqual(w.read(6), b", it's") 392 w.close() 393 self.assertRaises(OSError, w.read, 1) 394 395 def test_send(self): 396 d1 = b"Come again?" 397 d2 = b"I want to buy some cheese." 398 fd = os.open(support.TESTFN, os.O_WRONLY | os.O_APPEND) 399 w = asyncore.file_wrapper(fd) 400 os.close(fd) 401 402 w.write(d1) 403 w.send(d2) 404 w.close() 405 with open(support.TESTFN, 'rb') as file: 406 self.assertEqual(file.read(), self.d + d1 + d2) 407 408 @unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'), 409 'asyncore.file_dispatcher required') 410 def test_dispatcher(self): 411 fd = os.open(support.TESTFN, os.O_RDONLY) 412 data = [] 413 class FileDispatcher(asyncore.file_dispatcher): 414 def handle_read(self): 415 data.append(self.recv(29)) 416 s = FileDispatcher(fd) 417 os.close(fd) 418 asyncore.loop(timeout=0.01, use_poll=True, count=2) 419 self.assertEqual(b"".join(data), self.d) 420 421 def test_resource_warning(self): 422 # Issue #11453 423 fd = os.open(support.TESTFN, os.O_RDONLY) 424 f = asyncore.file_wrapper(fd) 425 426 os.close(fd) 427 with support.check_warnings(('', ResourceWarning)): 428 f = None 429 support.gc_collect() 430 431 def test_close_twice(self): 432 fd = os.open(support.TESTFN, os.O_RDONLY) 433 f = asyncore.file_wrapper(fd) 434 os.close(fd) 435 436 f.close() 437 self.assertEqual(f.fd, -1) 438 # calling close twice should not fail 439 f.close() 440 441 442class BaseTestHandler(asyncore.dispatcher): 443 444 def __init__(self, sock=None): 445 asyncore.dispatcher.__init__(self, sock) 446 self.flag = False 447 448 def handle_accept(self): 449 raise Exception("handle_accept not supposed to be called") 450 451 def handle_accepted(self): 452 raise Exception("handle_accepted not supposed to be called") 453 454 def handle_connect(self): 455 raise Exception("handle_connect not supposed to be called") 456 457 def handle_expt(self): 458 raise Exception("handle_expt not supposed to be called") 459 460 def handle_close(self): 461 raise Exception("handle_close not supposed to be called") 462 463 def handle_error(self): 464 raise 465 466 467class BaseServer(asyncore.dispatcher): 468 """A server which listens on an address and dispatches the 469 connection to a handler. 470 """ 471 472 def __init__(self, family, addr, handler=BaseTestHandler): 473 asyncore.dispatcher.__init__(self) 474 self.create_socket(family) 475 self.set_reuse_addr() 476 bind_af_aware(self.socket, addr) 477 self.listen(5) 478 self.handler = handler 479 480 @property 481 def address(self): 482 return self.socket.getsockname() 483 484 def handle_accepted(self, sock, addr): 485 self.handler(sock) 486 487 def handle_error(self): 488 raise 489 490 491class BaseClient(BaseTestHandler): 492 493 def __init__(self, family, address): 494 BaseTestHandler.__init__(self) 495 self.create_socket(family) 496 self.connect(address) 497 498 def handle_connect(self): 499 pass 500 501 502class BaseTestAPI: 503 504 def tearDown(self): 505 asyncore.close_all() 506 507 def loop_waiting_for_flag(self, instance, timeout=5): 508 timeout = float(timeout) / 100 509 count = 100 510 while asyncore.socket_map and count > 0: 511 asyncore.loop(timeout=0.01, count=1, use_poll=self.use_poll) 512 if instance.flag: 513 return 514 count -= 1 515 time.sleep(timeout) 516 self.fail("flag not set") 517 518 def test_handle_connect(self): 519 # make sure handle_connect is called on connect() 520 521 class TestClient(BaseClient): 522 def handle_connect(self): 523 self.flag = True 524 525 server = BaseServer(self.family, self.addr) 526 client = TestClient(self.family, server.address) 527 self.loop_waiting_for_flag(client) 528 529 def test_handle_accept(self): 530 # make sure handle_accept() is called when a client connects 531 532 class TestListener(BaseTestHandler): 533 534 def __init__(self, family, addr): 535 BaseTestHandler.__init__(self) 536 self.create_socket(family) 537 bind_af_aware(self.socket, addr) 538 self.listen(5) 539 self.address = self.socket.getsockname() 540 541 def handle_accept(self): 542 self.flag = True 543 544 server = TestListener(self.family, self.addr) 545 client = BaseClient(self.family, server.address) 546 self.loop_waiting_for_flag(server) 547 548 def test_handle_accepted(self): 549 # make sure handle_accepted() is called when a client connects 550 551 class TestListener(BaseTestHandler): 552 553 def __init__(self, family, addr): 554 BaseTestHandler.__init__(self) 555 self.create_socket(family) 556 bind_af_aware(self.socket, addr) 557 self.listen(5) 558 self.address = self.socket.getsockname() 559 560 def handle_accept(self): 561 asyncore.dispatcher.handle_accept(self) 562 563 def handle_accepted(self, sock, addr): 564 sock.close() 565 self.flag = True 566 567 server = TestListener(self.family, self.addr) 568 client = BaseClient(self.family, server.address) 569 self.loop_waiting_for_flag(server) 570 571 572 def test_handle_read(self): 573 # make sure handle_read is called on data received 574 575 class TestClient(BaseClient): 576 def handle_read(self): 577 self.flag = True 578 579 class TestHandler(BaseTestHandler): 580 def __init__(self, conn): 581 BaseTestHandler.__init__(self, conn) 582 self.send(b'x' * 1024) 583 584 server = BaseServer(self.family, self.addr, TestHandler) 585 client = TestClient(self.family, server.address) 586 self.loop_waiting_for_flag(client) 587 588 def test_handle_write(self): 589 # make sure handle_write is called 590 591 class TestClient(BaseClient): 592 def handle_write(self): 593 self.flag = True 594 595 server = BaseServer(self.family, self.addr) 596 client = TestClient(self.family, server.address) 597 self.loop_waiting_for_flag(client) 598 599 def test_handle_close(self): 600 # make sure handle_close is called when the other end closes 601 # the connection 602 603 class TestClient(BaseClient): 604 605 def handle_read(self): 606 # in order to make handle_close be called we are supposed 607 # to make at least one recv() call 608 self.recv(1024) 609 610 def handle_close(self): 611 self.flag = True 612 self.close() 613 614 class TestHandler(BaseTestHandler): 615 def __init__(self, conn): 616 BaseTestHandler.__init__(self, conn) 617 self.close() 618 619 server = BaseServer(self.family, self.addr, TestHandler) 620 client = TestClient(self.family, server.address) 621 self.loop_waiting_for_flag(client) 622 623 def test_handle_close_after_conn_broken(self): 624 # Check that ECONNRESET/EPIPE is correctly handled (issues #5661 and 625 # #11265). 626 627 data = b'\0' * 128 628 629 class TestClient(BaseClient): 630 631 def handle_write(self): 632 self.send(data) 633 634 def handle_close(self): 635 self.flag = True 636 self.close() 637 638 def handle_expt(self): 639 self.flag = True 640 self.close() 641 642 class TestHandler(BaseTestHandler): 643 644 def handle_read(self): 645 self.recv(len(data)) 646 self.close() 647 648 def writable(self): 649 return False 650 651 server = BaseServer(self.family, self.addr, TestHandler) 652 client = TestClient(self.family, server.address) 653 self.loop_waiting_for_flag(client) 654 655 @unittest.skipIf(sys.platform.startswith("sunos"), 656 "OOB support is broken on Solaris") 657 def test_handle_expt(self): 658 # Make sure handle_expt is called on OOB data received. 659 # Note: this might fail on some platforms as OOB data is 660 # tenuously supported and rarely used. 661 if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX: 662 self.skipTest("Not applicable to AF_UNIX sockets.") 663 664 if sys.platform == "darwin" and self.use_poll: 665 self.skipTest("poll may fail on macOS; see issue #28087") 666 667 class TestClient(BaseClient): 668 def handle_expt(self): 669 self.socket.recv(1024, socket.MSG_OOB) 670 self.flag = True 671 672 class TestHandler(BaseTestHandler): 673 def __init__(self, conn): 674 BaseTestHandler.__init__(self, conn) 675 self.socket.send(bytes(chr(244), 'latin-1'), socket.MSG_OOB) 676 677 server = BaseServer(self.family, self.addr, TestHandler) 678 client = TestClient(self.family, server.address) 679 self.loop_waiting_for_flag(client) 680 681 def test_handle_error(self): 682 683 class TestClient(BaseClient): 684 def handle_write(self): 685 1.0 / 0 686 def handle_error(self): 687 self.flag = True 688 try: 689 raise 690 except ZeroDivisionError: 691 pass 692 else: 693 raise Exception("exception not raised") 694 695 server = BaseServer(self.family, self.addr) 696 client = TestClient(self.family, server.address) 697 self.loop_waiting_for_flag(client) 698 699 def test_connection_attributes(self): 700 server = BaseServer(self.family, self.addr) 701 client = BaseClient(self.family, server.address) 702 703 # we start disconnected 704 self.assertFalse(server.connected) 705 self.assertTrue(server.accepting) 706 # this can't be taken for granted across all platforms 707 #self.assertFalse(client.connected) 708 self.assertFalse(client.accepting) 709 710 # execute some loops so that client connects to server 711 asyncore.loop(timeout=0.01, use_poll=self.use_poll, count=100) 712 self.assertFalse(server.connected) 713 self.assertTrue(server.accepting) 714 self.assertTrue(client.connected) 715 self.assertFalse(client.accepting) 716 717 # disconnect the client 718 client.close() 719 self.assertFalse(server.connected) 720 self.assertTrue(server.accepting) 721 self.assertFalse(client.connected) 722 self.assertFalse(client.accepting) 723 724 # stop serving 725 server.close() 726 self.assertFalse(server.connected) 727 self.assertFalse(server.accepting) 728 729 def test_create_socket(self): 730 s = asyncore.dispatcher() 731 s.create_socket(self.family) 732 self.assertEqual(s.socket.family, self.family) 733 SOCK_NONBLOCK = getattr(socket, 'SOCK_NONBLOCK', 0) 734 sock_type = socket.SOCK_STREAM | SOCK_NONBLOCK 735 if hasattr(socket, 'SOCK_CLOEXEC'): 736 self.assertIn(s.socket.type, 737 (sock_type | socket.SOCK_CLOEXEC, sock_type)) 738 else: 739 self.assertEqual(s.socket.type, sock_type) 740 741 def test_bind(self): 742 if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX: 743 self.skipTest("Not applicable to AF_UNIX sockets.") 744 s1 = asyncore.dispatcher() 745 s1.create_socket(self.family) 746 s1.bind(self.addr) 747 s1.listen(5) 748 port = s1.socket.getsockname()[1] 749 750 s2 = asyncore.dispatcher() 751 s2.create_socket(self.family) 752 # EADDRINUSE indicates the socket was correctly bound 753 self.assertRaises(OSError, s2.bind, (self.addr[0], port)) 754 755 def test_set_reuse_addr(self): 756 if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX: 757 self.skipTest("Not applicable to AF_UNIX sockets.") 758 sock = socket.socket(self.family) 759 try: 760 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 761 except OSError: 762 unittest.skip("SO_REUSEADDR not supported on this platform") 763 else: 764 # if SO_REUSEADDR succeeded for sock we expect asyncore 765 # to do the same 766 s = asyncore.dispatcher(socket.socket(self.family)) 767 self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET, 768 socket.SO_REUSEADDR)) 769 s.socket.close() 770 s.create_socket(self.family) 771 s.set_reuse_addr() 772 self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET, 773 socket.SO_REUSEADDR)) 774 finally: 775 sock.close() 776 777 @unittest.skipUnless(threading, 'Threading required for this test.') 778 @support.reap_threads 779 def test_quick_connect(self): 780 # see: http://bugs.python.org/issue10340 781 if self.family in (socket.AF_INET, getattr(socket, "AF_INET6", object())): 782 server = BaseServer(self.family, self.addr) 783 t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1, 784 count=500)) 785 t.start() 786 def cleanup(): 787 t.join(timeout=TIMEOUT) 788 if t.is_alive(): 789 self.fail("join() timed out") 790 self.addCleanup(cleanup) 791 792 s = socket.socket(self.family, socket.SOCK_STREAM) 793 s.settimeout(.2) 794 s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, 795 struct.pack('ii', 1, 0)) 796 try: 797 s.connect(server.address) 798 except OSError: 799 pass 800 finally: 801 s.close() 802 803class TestAPI_UseIPv4Sockets(BaseTestAPI): 804 family = socket.AF_INET 805 addr = (support.HOST, 0) 806 807@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 support required') 808class TestAPI_UseIPv6Sockets(BaseTestAPI): 809 family = socket.AF_INET6 810 addr = (support.HOSTv6, 0) 811 812@unittest.skipUnless(HAS_UNIX_SOCKETS, 'Unix sockets required') 813class TestAPI_UseUnixSockets(BaseTestAPI): 814 if HAS_UNIX_SOCKETS: 815 family = socket.AF_UNIX 816 addr = support.TESTFN 817 818 def tearDown(self): 819 support.unlink(self.addr) 820 BaseTestAPI.tearDown(self) 821 822class TestAPI_UseIPv4Select(TestAPI_UseIPv4Sockets, unittest.TestCase): 823 use_poll = False 824 825@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') 826class TestAPI_UseIPv4Poll(TestAPI_UseIPv4Sockets, unittest.TestCase): 827 use_poll = True 828 829class TestAPI_UseIPv6Select(TestAPI_UseIPv6Sockets, unittest.TestCase): 830 use_poll = False 831 832@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') 833class TestAPI_UseIPv6Poll(TestAPI_UseIPv6Sockets, unittest.TestCase): 834 use_poll = True 835 836class TestAPI_UseUnixSocketsSelect(TestAPI_UseUnixSockets, unittest.TestCase): 837 use_poll = False 838 839@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') 840class TestAPI_UseUnixSocketsPoll(TestAPI_UseUnixSockets, unittest.TestCase): 841 use_poll = True 842 843if __name__ == "__main__": 844 unittest.main() 845