1# Test the support for SSL and sockets 2 3import sys 4import unittest 5from test import test_support 6import asyncore 7import socket 8import select 9import time 10import gc 11import os 12import errno 13import pprint 14import urllib, urlparse 15import traceback 16import weakref 17import functools 18import platform 19 20from BaseHTTPServer import HTTPServer 21from SimpleHTTPServer import SimpleHTTPRequestHandler 22 23ssl = test_support.import_module("ssl") 24 25HOST = test_support.HOST 26CERTFILE = None 27SVN_PYTHON_ORG_ROOT_CERT = None 28 29def handle_error(prefix): 30 exc_format = ' '.join(traceback.format_exception(*sys.exc_info())) 31 if test_support.verbose: 32 sys.stdout.write(prefix + exc_format) 33 34 35class BasicTests(unittest.TestCase): 36 37 def test_sslwrap_simple(self): 38 # A crude test for the legacy API 39 try: 40 ssl.sslwrap_simple(socket.socket(socket.AF_INET)) 41 except IOError, e: 42 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that 43 pass 44 else: 45 raise 46 try: 47 ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock) 48 except IOError, e: 49 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that 50 pass 51 else: 52 raise 53 54# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2 55def skip_if_broken_ubuntu_ssl(func): 56 if hasattr(ssl, 'PROTOCOL_SSLv2'): 57 # We need to access the lower-level wrapper in order to create an 58 # implicit SSL context without trying to connect or listen. 59 try: 60 import _ssl 61 except ImportError: 62 # The returned function won't get executed, just ignore the error 63 pass 64 @functools.wraps(func) 65 def f(*args, **kwargs): 66 try: 67 s = socket.socket(socket.AF_INET) 68 _ssl.sslwrap(s._sock, 0, None, None, 69 ssl.CERT_NONE, ssl.PROTOCOL_SSLv2, None, None) 70 except ssl.SSLError as e: 71 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and 72 platform.linux_distribution() == ('debian', 'squeeze/sid', '') 73 and 'Invalid SSL protocol variant specified' in str(e)): 74 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour") 75 return func(*args, **kwargs) 76 return f 77 else: 78 return func 79 80 81class BasicSocketTests(unittest.TestCase): 82 83 def test_constants(self): 84 #ssl.PROTOCOL_SSLv2 85 ssl.PROTOCOL_SSLv23 86 ssl.PROTOCOL_SSLv3 87 ssl.PROTOCOL_TLSv1 88 ssl.CERT_NONE 89 ssl.CERT_OPTIONAL 90 ssl.CERT_REQUIRED 91 92 def test_random(self): 93 v = ssl.RAND_status() 94 if test_support.verbose: 95 sys.stdout.write("\n RAND_status is %d (%s)\n" 96 % (v, (v and "sufficient randomness") or 97 "insufficient randomness")) 98 self.assertRaises(TypeError, ssl.RAND_egd, 1) 99 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1) 100 ssl.RAND_add("this is a random string", 75.0) 101 102 def test_parse_cert(self): 103 # note that this uses an 'unofficial' function in _ssl.c, 104 # provided solely for this test, to exercise the certificate 105 # parsing code 106 p = ssl._ssl._test_decode_cert(CERTFILE, False) 107 if test_support.verbose: 108 sys.stdout.write("\n" + pprint.pformat(p) + "\n") 109 self.assertEqual(p['subject'], 110 ((('countryName', 'XY'),), 111 (('localityName', 'Castle Anthrax'),), 112 (('organizationName', 'Python Software Foundation'),), 113 (('commonName', 'localhost'),)) 114 ) 115 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),)) 116 # Issue #13034: the subjectAltName in some certificates 117 # (notably projects.developer.nokia.com:443) wasn't parsed 118 p = ssl._ssl._test_decode_cert(NOKIACERT) 119 if test_support.verbose: 120 sys.stdout.write("\n" + pprint.pformat(p) + "\n") 121 self.assertEqual(p['subjectAltName'], 122 (('DNS', 'projects.developer.nokia.com'), 123 ('DNS', 'projects.forum.nokia.com')) 124 ) 125 126 def test_DER_to_PEM(self): 127 with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f: 128 pem = f.read() 129 d1 = ssl.PEM_cert_to_DER_cert(pem) 130 p2 = ssl.DER_cert_to_PEM_cert(d1) 131 d2 = ssl.PEM_cert_to_DER_cert(p2) 132 self.assertEqual(d1, d2) 133 if not p2.startswith(ssl.PEM_HEADER + '\n'): 134 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2) 135 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'): 136 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2) 137 138 def test_openssl_version(self): 139 n = ssl.OPENSSL_VERSION_NUMBER 140 t = ssl.OPENSSL_VERSION_INFO 141 s = ssl.OPENSSL_VERSION 142 self.assertIsInstance(n, (int, long)) 143 self.assertIsInstance(t, tuple) 144 self.assertIsInstance(s, str) 145 # Some sanity checks follow 146 # >= 0.9 147 self.assertGreaterEqual(n, 0x900000) 148 # < 2.0 149 self.assertLess(n, 0x20000000) 150 major, minor, fix, patch, status = t 151 self.assertGreaterEqual(major, 0) 152 self.assertLess(major, 2) 153 self.assertGreaterEqual(minor, 0) 154 self.assertLess(minor, 256) 155 self.assertGreaterEqual(fix, 0) 156 self.assertLess(fix, 256) 157 self.assertGreaterEqual(patch, 0) 158 self.assertLessEqual(patch, 26) 159 self.assertGreaterEqual(status, 0) 160 self.assertLessEqual(status, 15) 161 # Version string as returned by OpenSSL, the format might change 162 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)), 163 (s, t)) 164 165 def test_ciphers(self): 166 if not test_support.is_resource_enabled('network'): 167 return 168 remote = ("svn.python.org", 443) 169 with test_support.transient_internet(remote[0]): 170 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 171 cert_reqs=ssl.CERT_NONE, ciphers="ALL") 172 s.connect(remote) 173 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 174 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") 175 s.connect(remote) 176 # Error checking occurs when connecting, because the SSL context 177 # isn't created before. 178 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 179 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx") 180 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"): 181 s.connect(remote) 182 183 @test_support.cpython_only 184 def test_refcycle(self): 185 # Issue #7943: an SSL object doesn't create reference cycles with 186 # itself. 187 s = socket.socket(socket.AF_INET) 188 ss = ssl.wrap_socket(s) 189 wr = weakref.ref(ss) 190 del ss 191 self.assertEqual(wr(), None) 192 193 def test_wrapped_unconnected(self): 194 # The _delegate_methods in socket.py are correctly delegated to by an 195 # unconnected SSLSocket, so they will raise a socket.error rather than 196 # something unexpected like TypeError. 197 s = socket.socket(socket.AF_INET) 198 ss = ssl.wrap_socket(s) 199 self.assertRaises(socket.error, ss.recv, 1) 200 self.assertRaises(socket.error, ss.recv_into, bytearray(b'x')) 201 self.assertRaises(socket.error, ss.recvfrom, 1) 202 self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1) 203 self.assertRaises(socket.error, ss.send, b'x') 204 self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0)) 205 206 207class NetworkedTests(unittest.TestCase): 208 209 def test_connect(self): 210 with test_support.transient_internet("svn.python.org"): 211 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 212 cert_reqs=ssl.CERT_NONE) 213 s.connect(("svn.python.org", 443)) 214 c = s.getpeercert() 215 if c: 216 self.fail("Peer cert %s shouldn't be here!") 217 s.close() 218 219 # this should fail because we have no verification certs 220 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 221 cert_reqs=ssl.CERT_REQUIRED) 222 try: 223 s.connect(("svn.python.org", 443)) 224 except ssl.SSLError: 225 pass 226 finally: 227 s.close() 228 229 # this should succeed because we specify the root cert 230 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 231 cert_reqs=ssl.CERT_REQUIRED, 232 ca_certs=SVN_PYTHON_ORG_ROOT_CERT) 233 try: 234 s.connect(("svn.python.org", 443)) 235 finally: 236 s.close() 237 238 def test_connect_ex(self): 239 # Issue #11326: check connect_ex() implementation 240 with test_support.transient_internet("svn.python.org"): 241 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 242 cert_reqs=ssl.CERT_REQUIRED, 243 ca_certs=SVN_PYTHON_ORG_ROOT_CERT) 244 try: 245 self.assertEqual(0, s.connect_ex(("svn.python.org", 443))) 246 self.assertTrue(s.getpeercert()) 247 finally: 248 s.close() 249 250 def test_non_blocking_connect_ex(self): 251 # Issue #11326: non-blocking connect_ex() should allow handshake 252 # to proceed after the socket gets ready. 253 with test_support.transient_internet("svn.python.org"): 254 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 255 cert_reqs=ssl.CERT_REQUIRED, 256 ca_certs=SVN_PYTHON_ORG_ROOT_CERT, 257 do_handshake_on_connect=False) 258 try: 259 s.setblocking(False) 260 rc = s.connect_ex(('svn.python.org', 443)) 261 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere 262 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK)) 263 # Wait for connect to finish 264 select.select([], [s], [], 5.0) 265 # Non-blocking handshake 266 while True: 267 try: 268 s.do_handshake() 269 break 270 except ssl.SSLError as err: 271 if err.args[0] == ssl.SSL_ERROR_WANT_READ: 272 select.select([s], [], [], 5.0) 273 elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE: 274 select.select([], [s], [], 5.0) 275 else: 276 raise 277 # SSL established 278 self.assertTrue(s.getpeercert()) 279 finally: 280 s.close() 281 282 def test_timeout_connect_ex(self): 283 # Issue #12065: on a timeout, connect_ex() should return the original 284 # errno (mimicking the behaviour of non-SSL sockets). 285 with test_support.transient_internet("svn.python.org"): 286 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 287 cert_reqs=ssl.CERT_REQUIRED, 288 ca_certs=SVN_PYTHON_ORG_ROOT_CERT, 289 do_handshake_on_connect=False) 290 try: 291 s.settimeout(0.0000001) 292 rc = s.connect_ex(('svn.python.org', 443)) 293 if rc == 0: 294 self.skipTest("svn.python.org responded too quickly") 295 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK)) 296 finally: 297 s.close() 298 299 def test_connect_ex_error(self): 300 with test_support.transient_internet("svn.python.org"): 301 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 302 cert_reqs=ssl.CERT_REQUIRED, 303 ca_certs=SVN_PYTHON_ORG_ROOT_CERT) 304 try: 305 self.assertEqual(errno.ECONNREFUSED, 306 s.connect_ex(("svn.python.org", 444))) 307 finally: 308 s.close() 309 310 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows") 311 def test_makefile_close(self): 312 # Issue #5238: creating a file-like object with makefile() shouldn't 313 # delay closing the underlying "real socket" (here tested with its 314 # file descriptor, hence skipping the test under Windows). 315 with test_support.transient_internet("svn.python.org"): 316 ss = ssl.wrap_socket(socket.socket(socket.AF_INET)) 317 ss.connect(("svn.python.org", 443)) 318 fd = ss.fileno() 319 f = ss.makefile() 320 f.close() 321 # The fd is still open 322 os.read(fd, 0) 323 # Closing the SSL socket should close the fd too 324 ss.close() 325 gc.collect() 326 with self.assertRaises(OSError) as e: 327 os.read(fd, 0) 328 self.assertEqual(e.exception.errno, errno.EBADF) 329 330 def test_non_blocking_handshake(self): 331 with test_support.transient_internet("svn.python.org"): 332 s = socket.socket(socket.AF_INET) 333 s.connect(("svn.python.org", 443)) 334 s.setblocking(False) 335 s = ssl.wrap_socket(s, 336 cert_reqs=ssl.CERT_NONE, 337 do_handshake_on_connect=False) 338 count = 0 339 while True: 340 try: 341 count += 1 342 s.do_handshake() 343 break 344 except ssl.SSLError, err: 345 if err.args[0] == ssl.SSL_ERROR_WANT_READ: 346 select.select([s], [], []) 347 elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE: 348 select.select([], [s], []) 349 else: 350 raise 351 s.close() 352 if test_support.verbose: 353 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count) 354 355 def test_get_server_certificate(self): 356 with test_support.transient_internet("svn.python.org"): 357 pem = ssl.get_server_certificate(("svn.python.org", 443)) 358 if not pem: 359 self.fail("No server certificate on svn.python.org:443!") 360 361 try: 362 pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE) 363 except ssl.SSLError: 364 #should fail 365 pass 366 else: 367 self.fail("Got server certificate %s for svn.python.org!" % pem) 368 369 pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT) 370 if not pem: 371 self.fail("No server certificate on svn.python.org:443!") 372 if test_support.verbose: 373 sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem) 374 375 def test_algorithms(self): 376 # Issue #8484: all algorithms should be available when verifying a 377 # certificate. 378 # SHA256 was added in OpenSSL 0.9.8 379 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15): 380 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION) 381 self.skipTest("remote host needs SNI, only available on Python 3.2+") 382 # NOTE: https://sha2.hboeck.de is another possible test host 383 remote = ("sha256.tbs-internet.com", 443) 384 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem") 385 with test_support.transient_internet("sha256.tbs-internet.com"): 386 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 387 cert_reqs=ssl.CERT_REQUIRED, 388 ca_certs=sha256_cert,) 389 try: 390 s.connect(remote) 391 if test_support.verbose: 392 sys.stdout.write("\nCipher with %r is %r\n" % 393 (remote, s.cipher())) 394 sys.stdout.write("Certificate is:\n%s\n" % 395 pprint.pformat(s.getpeercert())) 396 finally: 397 s.close() 398 399 400try: 401 import threading 402except ImportError: 403 _have_threads = False 404else: 405 _have_threads = True 406 407 class ThreadedEchoServer(threading.Thread): 408 409 class ConnectionHandler(threading.Thread): 410 411 """A mildly complicated class, because we want it to work both 412 with and without the SSL wrapper around the socket connection, so 413 that we can test the STARTTLS functionality.""" 414 415 def __init__(self, server, connsock): 416 self.server = server 417 self.running = False 418 self.sock = connsock 419 self.sock.setblocking(1) 420 self.sslconn = None 421 threading.Thread.__init__(self) 422 self.daemon = True 423 424 def show_conn_details(self): 425 if self.server.certreqs == ssl.CERT_REQUIRED: 426 cert = self.sslconn.getpeercert() 427 if test_support.verbose and self.server.chatty: 428 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n") 429 cert_binary = self.sslconn.getpeercert(True) 430 if test_support.verbose and self.server.chatty: 431 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n") 432 cipher = self.sslconn.cipher() 433 if test_support.verbose and self.server.chatty: 434 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n") 435 436 def wrap_conn(self): 437 try: 438 self.sslconn = ssl.wrap_socket(self.sock, server_side=True, 439 certfile=self.server.certificate, 440 ssl_version=self.server.protocol, 441 ca_certs=self.server.cacerts, 442 cert_reqs=self.server.certreqs, 443 ciphers=self.server.ciphers) 444 except ssl.SSLError as e: 445 # XXX Various errors can have happened here, for example 446 # a mismatching protocol version, an invalid certificate, 447 # or a low-level bug. This should be made more discriminating. 448 self.server.conn_errors.append(e) 449 if self.server.chatty: 450 handle_error("\n server: bad connection attempt from " + 451 str(self.sock.getpeername()) + ":\n") 452 self.close() 453 self.running = False 454 self.server.stop() 455 return False 456 else: 457 return True 458 459 def read(self): 460 if self.sslconn: 461 return self.sslconn.read() 462 else: 463 return self.sock.recv(1024) 464 465 def write(self, bytes): 466 if self.sslconn: 467 return self.sslconn.write(bytes) 468 else: 469 return self.sock.send(bytes) 470 471 def close(self): 472 if self.sslconn: 473 self.sslconn.close() 474 else: 475 self.sock._sock.close() 476 477 def run(self): 478 self.running = True 479 if not self.server.starttls_server: 480 if isinstance(self.sock, ssl.SSLSocket): 481 self.sslconn = self.sock 482 elif not self.wrap_conn(): 483 return 484 self.show_conn_details() 485 while self.running: 486 try: 487 msg = self.read() 488 if not msg: 489 # eof, so quit this handler 490 self.running = False 491 self.close() 492 elif msg.strip() == 'over': 493 if test_support.verbose and self.server.connectionchatty: 494 sys.stdout.write(" server: client closed connection\n") 495 self.close() 496 return 497 elif self.server.starttls_server and msg.strip() == 'STARTTLS': 498 if test_support.verbose and self.server.connectionchatty: 499 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n") 500 self.write("OK\n") 501 if not self.wrap_conn(): 502 return 503 elif self.server.starttls_server and self.sslconn and msg.strip() == 'ENDTLS': 504 if test_support.verbose and self.server.connectionchatty: 505 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n") 506 self.write("OK\n") 507 self.sslconn.unwrap() 508 self.sslconn = None 509 if test_support.verbose and self.server.connectionchatty: 510 sys.stdout.write(" server: connection is now unencrypted...\n") 511 else: 512 if (test_support.verbose and 513 self.server.connectionchatty): 514 ctype = (self.sslconn and "encrypted") or "unencrypted" 515 sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n" 516 % (repr(msg), ctype, repr(msg.lower()), ctype)) 517 self.write(msg.lower()) 518 except ssl.SSLError: 519 if self.server.chatty: 520 handle_error("Test server failure:\n") 521 self.close() 522 self.running = False 523 # normally, we'd just stop here, but for the test 524 # harness, we want to stop the server 525 self.server.stop() 526 527 def __init__(self, certificate, ssl_version=None, 528 certreqs=None, cacerts=None, 529 chatty=True, connectionchatty=False, starttls_server=False, 530 wrap_accepting_socket=False, ciphers=None): 531 532 if ssl_version is None: 533 ssl_version = ssl.PROTOCOL_TLSv1 534 if certreqs is None: 535 certreqs = ssl.CERT_NONE 536 self.certificate = certificate 537 self.protocol = ssl_version 538 self.certreqs = certreqs 539 self.cacerts = cacerts 540 self.ciphers = ciphers 541 self.chatty = chatty 542 self.connectionchatty = connectionchatty 543 self.starttls_server = starttls_server 544 self.sock = socket.socket() 545 self.flag = None 546 if wrap_accepting_socket: 547 self.sock = ssl.wrap_socket(self.sock, server_side=True, 548 certfile=self.certificate, 549 cert_reqs = self.certreqs, 550 ca_certs = self.cacerts, 551 ssl_version = self.protocol, 552 ciphers = self.ciphers) 553 if test_support.verbose and self.chatty: 554 sys.stdout.write(' server: wrapped server socket as %s\n' % str(self.sock)) 555 self.port = test_support.bind_port(self.sock) 556 self.active = False 557 self.conn_errors = [] 558 threading.Thread.__init__(self) 559 self.daemon = True 560 561 def __enter__(self): 562 self.start(threading.Event()) 563 self.flag.wait() 564 return self 565 566 def __exit__(self, *args): 567 self.stop() 568 self.join() 569 570 def start(self, flag=None): 571 self.flag = flag 572 threading.Thread.start(self) 573 574 def run(self): 575 self.sock.settimeout(0.05) 576 self.sock.listen(5) 577 self.active = True 578 if self.flag: 579 # signal an event 580 self.flag.set() 581 while self.active: 582 try: 583 newconn, connaddr = self.sock.accept() 584 if test_support.verbose and self.chatty: 585 sys.stdout.write(' server: new connection from ' 586 + str(connaddr) + '\n') 587 handler = self.ConnectionHandler(self, newconn) 588 handler.start() 589 handler.join() 590 except socket.timeout: 591 pass 592 except KeyboardInterrupt: 593 self.stop() 594 self.sock.close() 595 596 def stop(self): 597 self.active = False 598 599 class AsyncoreEchoServer(threading.Thread): 600 601 class EchoServer(asyncore.dispatcher): 602 603 class ConnectionHandler(asyncore.dispatcher_with_send): 604 605 def __init__(self, conn, certfile): 606 asyncore.dispatcher_with_send.__init__(self, conn) 607 self.socket = ssl.wrap_socket(conn, server_side=True, 608 certfile=certfile, 609 do_handshake_on_connect=False) 610 self._ssl_accepting = True 611 612 def readable(self): 613 if isinstance(self.socket, ssl.SSLSocket): 614 while self.socket.pending() > 0: 615 self.handle_read_event() 616 return True 617 618 def _do_ssl_handshake(self): 619 try: 620 self.socket.do_handshake() 621 except ssl.SSLError, err: 622 if err.args[0] in (ssl.SSL_ERROR_WANT_READ, 623 ssl.SSL_ERROR_WANT_WRITE): 624 return 625 elif err.args[0] == ssl.SSL_ERROR_EOF: 626 return self.handle_close() 627 raise 628 except socket.error, err: 629 if err.args[0] == errno.ECONNABORTED: 630 return self.handle_close() 631 else: 632 self._ssl_accepting = False 633 634 def handle_read(self): 635 if self._ssl_accepting: 636 self._do_ssl_handshake() 637 else: 638 data = self.recv(1024) 639 if data and data.strip() != 'over': 640 self.send(data.lower()) 641 642 def handle_close(self): 643 self.close() 644 if test_support.verbose: 645 sys.stdout.write(" server: closed connection %s\n" % self.socket) 646 647 def handle_error(self): 648 raise 649 650 def __init__(self, certfile): 651 self.certfile = certfile 652 asyncore.dispatcher.__init__(self) 653 self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 654 self.port = test_support.bind_port(self.socket) 655 self.listen(5) 656 657 def handle_accept(self): 658 sock_obj, addr = self.accept() 659 if test_support.verbose: 660 sys.stdout.write(" server: new connection from %s:%s\n" %addr) 661 self.ConnectionHandler(sock_obj, self.certfile) 662 663 def handle_error(self): 664 raise 665 666 def __init__(self, certfile): 667 self.flag = None 668 self.active = False 669 self.server = self.EchoServer(certfile) 670 self.port = self.server.port 671 threading.Thread.__init__(self) 672 self.daemon = True 673 674 def __str__(self): 675 return "<%s %s>" % (self.__class__.__name__, self.server) 676 677 def __enter__(self): 678 self.start(threading.Event()) 679 self.flag.wait() 680 return self 681 682 def __exit__(self, *args): 683 if test_support.verbose: 684 sys.stdout.write(" cleanup: stopping server.\n") 685 self.stop() 686 if test_support.verbose: 687 sys.stdout.write(" cleanup: joining server thread.\n") 688 self.join() 689 if test_support.verbose: 690 sys.stdout.write(" cleanup: successfully joined.\n") 691 692 def start(self, flag=None): 693 self.flag = flag 694 threading.Thread.start(self) 695 696 def run(self): 697 self.active = True 698 if self.flag: 699 self.flag.set() 700 while self.active: 701 asyncore.loop(0.05) 702 703 def stop(self): 704 self.active = False 705 self.server.close() 706 707 class SocketServerHTTPSServer(threading.Thread): 708 709 class HTTPSServer(HTTPServer): 710 711 def __init__(self, server_address, RequestHandlerClass, certfile): 712 HTTPServer.__init__(self, server_address, RequestHandlerClass) 713 # we assume the certfile contains both private key and certificate 714 self.certfile = certfile 715 self.allow_reuse_address = True 716 717 def __str__(self): 718 return ('<%s %s:%s>' % 719 (self.__class__.__name__, 720 self.server_name, 721 self.server_port)) 722 723 def get_request(self): 724 # override this to wrap socket with SSL 725 sock, addr = self.socket.accept() 726 sslconn = ssl.wrap_socket(sock, server_side=True, 727 certfile=self.certfile) 728 return sslconn, addr 729 730 class RootedHTTPRequestHandler(SimpleHTTPRequestHandler): 731 # need to override translate_path to get a known root, 732 # instead of using os.curdir, since the test could be 733 # run from anywhere 734 735 server_version = "TestHTTPS/1.0" 736 737 root = None 738 739 def translate_path(self, path): 740 """Translate a /-separated PATH to the local filename syntax. 741 742 Components that mean special things to the local file system 743 (e.g. drive or directory names) are ignored. (XXX They should 744 probably be diagnosed.) 745 746 """ 747 # abandon query parameters 748 path = urlparse.urlparse(path)[2] 749 path = os.path.normpath(urllib.unquote(path)) 750 words = path.split('/') 751 words = filter(None, words) 752 path = self.root 753 for word in words: 754 drive, word = os.path.splitdrive(word) 755 head, word = os.path.split(word) 756 if word in self.root: continue 757 path = os.path.join(path, word) 758 return path 759 760 def log_message(self, format, *args): 761 762 # we override this to suppress logging unless "verbose" 763 764 if test_support.verbose: 765 sys.stdout.write(" server (%s:%d %s):\n [%s] %s\n" % 766 (self.server.server_address, 767 self.server.server_port, 768 self.request.cipher(), 769 self.log_date_time_string(), 770 format%args)) 771 772 773 def __init__(self, certfile): 774 self.flag = None 775 self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0] 776 self.server = self.HTTPSServer( 777 (HOST, 0), self.RootedHTTPRequestHandler, certfile) 778 self.port = self.server.server_port 779 threading.Thread.__init__(self) 780 self.daemon = True 781 782 def __str__(self): 783 return "<%s %s>" % (self.__class__.__name__, self.server) 784 785 def start(self, flag=None): 786 self.flag = flag 787 threading.Thread.start(self) 788 789 def run(self): 790 if self.flag: 791 self.flag.set() 792 self.server.serve_forever(0.05) 793 794 def stop(self): 795 self.server.shutdown() 796 797 798 def bad_cert_test(certfile): 799 """ 800 Launch a server with CERT_REQUIRED, and check that trying to 801 connect to it with the given client certificate fails. 802 """ 803 server = ThreadedEchoServer(CERTFILE, 804 certreqs=ssl.CERT_REQUIRED, 805 cacerts=CERTFILE, chatty=False) 806 with server: 807 try: 808 s = ssl.wrap_socket(socket.socket(), 809 certfile=certfile, 810 ssl_version=ssl.PROTOCOL_TLSv1) 811 s.connect((HOST, server.port)) 812 except ssl.SSLError, x: 813 if test_support.verbose: 814 sys.stdout.write("\nSSLError is %s\n" % x[1]) 815 except socket.error, x: 816 if test_support.verbose: 817 sys.stdout.write("\nsocket.error is %s\n" % x[1]) 818 else: 819 raise AssertionError("Use of invalid cert should have failed!") 820 821 def server_params_test(certfile, protocol, certreqs, cacertsfile, 822 client_certfile, client_protocol=None, indata="FOO\n", 823 ciphers=None, chatty=True, connectionchatty=False, 824 wrap_accepting_socket=False): 825 """ 826 Launch a server, connect a client to it and try various reads 827 and writes. 828 """ 829 server = ThreadedEchoServer(certfile, 830 certreqs=certreqs, 831 ssl_version=protocol, 832 cacerts=cacertsfile, 833 ciphers=ciphers, 834 chatty=chatty, 835 connectionchatty=connectionchatty, 836 wrap_accepting_socket=wrap_accepting_socket) 837 with server: 838 # try to connect 839 if client_protocol is None: 840 client_protocol = protocol 841 s = ssl.wrap_socket(socket.socket(), 842 certfile=client_certfile, 843 ca_certs=cacertsfile, 844 ciphers=ciphers, 845 cert_reqs=certreqs, 846 ssl_version=client_protocol) 847 s.connect((HOST, server.port)) 848 for arg in [indata, bytearray(indata), memoryview(indata)]: 849 if connectionchatty: 850 if test_support.verbose: 851 sys.stdout.write( 852 " client: sending %s...\n" % (repr(arg))) 853 s.write(arg) 854 outdata = s.read() 855 if connectionchatty: 856 if test_support.verbose: 857 sys.stdout.write(" client: read %s\n" % repr(outdata)) 858 if outdata != indata.lower(): 859 raise AssertionError( 860 "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n" 861 % (outdata[:min(len(outdata),20)], len(outdata), 862 indata[:min(len(indata),20)].lower(), len(indata))) 863 s.write("over\n") 864 if connectionchatty: 865 if test_support.verbose: 866 sys.stdout.write(" client: closing connection.\n") 867 s.close() 868 869 def try_protocol_combo(server_protocol, 870 client_protocol, 871 expect_success, 872 certsreqs=None): 873 if certsreqs is None: 874 certsreqs = ssl.CERT_NONE 875 certtype = { 876 ssl.CERT_NONE: "CERT_NONE", 877 ssl.CERT_OPTIONAL: "CERT_OPTIONAL", 878 ssl.CERT_REQUIRED: "CERT_REQUIRED", 879 }[certsreqs] 880 if test_support.verbose: 881 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n" 882 sys.stdout.write(formatstr % 883 (ssl.get_protocol_name(client_protocol), 884 ssl.get_protocol_name(server_protocol), 885 certtype)) 886 try: 887 # NOTE: we must enable "ALL" ciphers, otherwise an SSLv23 client 888 # will send an SSLv3 hello (rather than SSLv2) starting from 889 # OpenSSL 1.0.0 (see issue #8322). 890 server_params_test(CERTFILE, server_protocol, certsreqs, 891 CERTFILE, CERTFILE, client_protocol, 892 ciphers="ALL", chatty=False) 893 # Protocol mismatch can result in either an SSLError, or a 894 # "Connection reset by peer" error. 895 except ssl.SSLError: 896 if expect_success: 897 raise 898 except socket.error as e: 899 if expect_success or e.errno != errno.ECONNRESET: 900 raise 901 else: 902 if not expect_success: 903 raise AssertionError( 904 "Client protocol %s succeeded with server protocol %s!" 905 % (ssl.get_protocol_name(client_protocol), 906 ssl.get_protocol_name(server_protocol))) 907 908 909 class ThreadedTests(unittest.TestCase): 910 911 def test_rude_shutdown(self): 912 """A brutal shutdown of an SSL server should raise an IOError 913 in the client when attempting handshake. 914 """ 915 listener_ready = threading.Event() 916 listener_gone = threading.Event() 917 918 s = socket.socket() 919 port = test_support.bind_port(s, HOST) 920 921 # `listener` runs in a thread. It sits in an accept() until 922 # the main thread connects. Then it rudely closes the socket, 923 # and sets Event `listener_gone` to let the main thread know 924 # the socket is gone. 925 def listener(): 926 s.listen(5) 927 listener_ready.set() 928 s.accept() 929 s.close() 930 listener_gone.set() 931 932 def connector(): 933 listener_ready.wait() 934 c = socket.socket() 935 c.connect((HOST, port)) 936 listener_gone.wait() 937 try: 938 ssl_sock = ssl.wrap_socket(c) 939 except IOError: 940 pass 941 else: 942 self.fail('connecting to closed SSL socket should have failed') 943 944 t = threading.Thread(target=listener) 945 t.start() 946 try: 947 connector() 948 finally: 949 t.join() 950 951 @skip_if_broken_ubuntu_ssl 952 def test_echo(self): 953 """Basic test of an SSL client connecting to a server""" 954 if test_support.verbose: 955 sys.stdout.write("\n") 956 server_params_test(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE, 957 CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1, 958 chatty=True, connectionchatty=True) 959 960 def test_getpeercert(self): 961 if test_support.verbose: 962 sys.stdout.write("\n") 963 s2 = socket.socket() 964 server = ThreadedEchoServer(CERTFILE, 965 certreqs=ssl.CERT_NONE, 966 ssl_version=ssl.PROTOCOL_SSLv23, 967 cacerts=CERTFILE, 968 chatty=False) 969 with server: 970 s = ssl.wrap_socket(socket.socket(), 971 certfile=CERTFILE, 972 ca_certs=CERTFILE, 973 cert_reqs=ssl.CERT_REQUIRED, 974 ssl_version=ssl.PROTOCOL_SSLv23) 975 s.connect((HOST, server.port)) 976 cert = s.getpeercert() 977 self.assertTrue(cert, "Can't get peer certificate.") 978 cipher = s.cipher() 979 if test_support.verbose: 980 sys.stdout.write(pprint.pformat(cert) + '\n') 981 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n') 982 if 'subject' not in cert: 983 self.fail("No subject field in certificate: %s." % 984 pprint.pformat(cert)) 985 if ((('organizationName', 'Python Software Foundation'),) 986 not in cert['subject']): 987 self.fail( 988 "Missing or invalid 'organizationName' field in certificate subject; " 989 "should be 'Python Software Foundation'.") 990 s.close() 991 992 def test_empty_cert(self): 993 """Connecting with an empty cert file""" 994 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, 995 "nullcert.pem")) 996 def test_malformed_cert(self): 997 """Connecting with a badly formatted certificate (syntax error)""" 998 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, 999 "badcert.pem")) 1000 def test_nonexisting_cert(self): 1001 """Connecting with a non-existing cert file""" 1002 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, 1003 "wrongcert.pem")) 1004 def test_malformed_key(self): 1005 """Connecting with a badly formatted key (syntax error)""" 1006 bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir, 1007 "badkey.pem")) 1008 1009 @skip_if_broken_ubuntu_ssl 1010 def test_protocol_sslv2(self): 1011 """Connecting to an SSLv2 server with various client options""" 1012 if test_support.verbose: 1013 sys.stdout.write("\n") 1014 if not hasattr(ssl, 'PROTOCOL_SSLv2'): 1015 self.skipTest("PROTOCOL_SSLv2 needed") 1016 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True) 1017 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL) 1018 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED) 1019 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True) 1020 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False) 1021 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False) 1022 1023 @skip_if_broken_ubuntu_ssl 1024 def test_protocol_sslv23(self): 1025 """Connecting to an SSLv23 server with various client options""" 1026 if test_support.verbose: 1027 sys.stdout.write("\n") 1028 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True) 1029 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True) 1030 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True) 1031 1032 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL) 1033 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL) 1034 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL) 1035 1036 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED) 1037 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED) 1038 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED) 1039 1040 @skip_if_broken_ubuntu_ssl 1041 def test_protocol_sslv3(self): 1042 """Connecting to an SSLv3 server with various client options""" 1043 if test_support.verbose: 1044 sys.stdout.write("\n") 1045 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True) 1046 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL) 1047 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED) 1048 if hasattr(ssl, 'PROTOCOL_SSLv2'): 1049 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False) 1050 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False) 1051 1052 @skip_if_broken_ubuntu_ssl 1053 def test_protocol_tlsv1(self): 1054 """Connecting to a TLSv1 server with various client options""" 1055 if test_support.verbose: 1056 sys.stdout.write("\n") 1057 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True) 1058 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL) 1059 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED) 1060 if hasattr(ssl, 'PROTOCOL_SSLv2'): 1061 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False) 1062 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False) 1063 1064 def test_starttls(self): 1065 """Switching from clear text to encrypted and back again.""" 1066 msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4", "ENDTLS", "msg 5", "msg 6") 1067 1068 server = ThreadedEchoServer(CERTFILE, 1069 ssl_version=ssl.PROTOCOL_TLSv1, 1070 starttls_server=True, 1071 chatty=True, 1072 connectionchatty=True) 1073 wrapped = False 1074 with server: 1075 s = socket.socket() 1076 s.setblocking(1) 1077 s.connect((HOST, server.port)) 1078 if test_support.verbose: 1079 sys.stdout.write("\n") 1080 for indata in msgs: 1081 if test_support.verbose: 1082 sys.stdout.write( 1083 " client: sending %s...\n" % repr(indata)) 1084 if wrapped: 1085 conn.write(indata) 1086 outdata = conn.read() 1087 else: 1088 s.send(indata) 1089 outdata = s.recv(1024) 1090 if (indata == "STARTTLS" and 1091 outdata.strip().lower().startswith("ok")): 1092 # STARTTLS ok, switch to secure mode 1093 if test_support.verbose: 1094 sys.stdout.write( 1095 " client: read %s from server, starting TLS...\n" 1096 % repr(outdata)) 1097 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1) 1098 wrapped = True 1099 elif (indata == "ENDTLS" and 1100 outdata.strip().lower().startswith("ok")): 1101 # ENDTLS ok, switch back to clear text 1102 if test_support.verbose: 1103 sys.stdout.write( 1104 " client: read %s from server, ending TLS...\n" 1105 % repr(outdata)) 1106 s = conn.unwrap() 1107 wrapped = False 1108 else: 1109 if test_support.verbose: 1110 sys.stdout.write( 1111 " client: read %s from server\n" % repr(outdata)) 1112 if test_support.verbose: 1113 sys.stdout.write(" client: closing connection.\n") 1114 if wrapped: 1115 conn.write("over\n") 1116 else: 1117 s.send("over\n") 1118 s.close() 1119 1120 def test_socketserver(self): 1121 """Using a SocketServer to create and manage SSL connections.""" 1122 server = SocketServerHTTPSServer(CERTFILE) 1123 flag = threading.Event() 1124 server.start(flag) 1125 # wait for it to start 1126 flag.wait() 1127 # try to connect 1128 try: 1129 if test_support.verbose: 1130 sys.stdout.write('\n') 1131 with open(CERTFILE, 'rb') as f: 1132 d1 = f.read() 1133 d2 = '' 1134 # now fetch the same data from the HTTPS server 1135 url = 'https://127.0.0.1:%d/%s' % ( 1136 server.port, os.path.split(CERTFILE)[1]) 1137 with test_support.check_py3k_warnings(): 1138 f = urllib.urlopen(url) 1139 dlen = f.info().getheader("content-length") 1140 if dlen and (int(dlen) > 0): 1141 d2 = f.read(int(dlen)) 1142 if test_support.verbose: 1143 sys.stdout.write( 1144 " client: read %d bytes from remote server '%s'\n" 1145 % (len(d2), server)) 1146 f.close() 1147 self.assertEqual(d1, d2) 1148 finally: 1149 server.stop() 1150 server.join() 1151 1152 def test_wrapped_accept(self): 1153 """Check the accept() method on SSL sockets.""" 1154 if test_support.verbose: 1155 sys.stdout.write("\n") 1156 server_params_test(CERTFILE, ssl.PROTOCOL_SSLv23, ssl.CERT_REQUIRED, 1157 CERTFILE, CERTFILE, ssl.PROTOCOL_SSLv23, 1158 chatty=True, connectionchatty=True, 1159 wrap_accepting_socket=True) 1160 1161 def test_asyncore_server(self): 1162 """Check the example asyncore integration.""" 1163 indata = "TEST MESSAGE of mixed case\n" 1164 1165 if test_support.verbose: 1166 sys.stdout.write("\n") 1167 server = AsyncoreEchoServer(CERTFILE) 1168 with server: 1169 s = ssl.wrap_socket(socket.socket()) 1170 s.connect(('127.0.0.1', server.port)) 1171 if test_support.verbose: 1172 sys.stdout.write( 1173 " client: sending %s...\n" % (repr(indata))) 1174 s.write(indata) 1175 outdata = s.read() 1176 if test_support.verbose: 1177 sys.stdout.write(" client: read %s\n" % repr(outdata)) 1178 if outdata != indata.lower(): 1179 self.fail( 1180 "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n" 1181 % (outdata[:min(len(outdata),20)], len(outdata), 1182 indata[:min(len(indata),20)].lower(), len(indata))) 1183 s.write("over\n") 1184 if test_support.verbose: 1185 sys.stdout.write(" client: closing connection.\n") 1186 s.close() 1187 1188 def test_recv_send(self): 1189 """Test recv(), send() and friends.""" 1190 if test_support.verbose: 1191 sys.stdout.write("\n") 1192 1193 server = ThreadedEchoServer(CERTFILE, 1194 certreqs=ssl.CERT_NONE, 1195 ssl_version=ssl.PROTOCOL_TLSv1, 1196 cacerts=CERTFILE, 1197 chatty=True, 1198 connectionchatty=False) 1199 with server: 1200 s = ssl.wrap_socket(socket.socket(), 1201 server_side=False, 1202 certfile=CERTFILE, 1203 ca_certs=CERTFILE, 1204 cert_reqs=ssl.CERT_NONE, 1205 ssl_version=ssl.PROTOCOL_TLSv1) 1206 s.connect((HOST, server.port)) 1207 # helper methods for standardising recv* method signatures 1208 def _recv_into(): 1209 b = bytearray("\0"*100) 1210 count = s.recv_into(b) 1211 return b[:count] 1212 1213 def _recvfrom_into(): 1214 b = bytearray("\0"*100) 1215 count, addr = s.recvfrom_into(b) 1216 return b[:count] 1217 1218 # (name, method, whether to expect success, *args) 1219 send_methods = [ 1220 ('send', s.send, True, []), 1221 ('sendto', s.sendto, False, ["some.address"]), 1222 ('sendall', s.sendall, True, []), 1223 ] 1224 recv_methods = [ 1225 ('recv', s.recv, True, []), 1226 ('recvfrom', s.recvfrom, False, ["some.address"]), 1227 ('recv_into', _recv_into, True, []), 1228 ('recvfrom_into', _recvfrom_into, False, []), 1229 ] 1230 data_prefix = u"PREFIX_" 1231 1232 for meth_name, send_meth, expect_success, args in send_methods: 1233 indata = data_prefix + meth_name 1234 try: 1235 send_meth(indata.encode('ASCII', 'strict'), *args) 1236 outdata = s.read() 1237 outdata = outdata.decode('ASCII', 'strict') 1238 if outdata != indata.lower(): 1239 self.fail( 1240 "While sending with <<%s>> bad data " 1241 "<<%r>> (%d) received; " 1242 "expected <<%r>> (%d)\n" % ( 1243 meth_name, outdata[:20], len(outdata), 1244 indata[:20], len(indata) 1245 ) 1246 ) 1247 except ValueError as e: 1248 if expect_success: 1249 self.fail( 1250 "Failed to send with method <<%s>>; " 1251 "expected to succeed.\n" % (meth_name,) 1252 ) 1253 if not str(e).startswith(meth_name): 1254 self.fail( 1255 "Method <<%s>> failed with unexpected " 1256 "exception message: %s\n" % ( 1257 meth_name, e 1258 ) 1259 ) 1260 1261 for meth_name, recv_meth, expect_success, args in recv_methods: 1262 indata = data_prefix + meth_name 1263 try: 1264 s.send(indata.encode('ASCII', 'strict')) 1265 outdata = recv_meth(*args) 1266 outdata = outdata.decode('ASCII', 'strict') 1267 if outdata != indata.lower(): 1268 self.fail( 1269 "While receiving with <<%s>> bad data " 1270 "<<%r>> (%d) received; " 1271 "expected <<%r>> (%d)\n" % ( 1272 meth_name, outdata[:20], len(outdata), 1273 indata[:20], len(indata) 1274 ) 1275 ) 1276 except ValueError as e: 1277 if expect_success: 1278 self.fail( 1279 "Failed to receive with method <<%s>>; " 1280 "expected to succeed.\n" % (meth_name,) 1281 ) 1282 if not str(e).startswith(meth_name): 1283 self.fail( 1284 "Method <<%s>> failed with unexpected " 1285 "exception message: %s\n" % ( 1286 meth_name, e 1287 ) 1288 ) 1289 # consume data 1290 s.read() 1291 1292 s.write("over\n".encode("ASCII", "strict")) 1293 s.close() 1294 1295 def test_handshake_timeout(self): 1296 # Issue #5103: SSL handshake must respect the socket timeout 1297 server = socket.socket(socket.AF_INET) 1298 host = "127.0.0.1" 1299 port = test_support.bind_port(server) 1300 started = threading.Event() 1301 finish = False 1302 1303 def serve(): 1304 server.listen(5) 1305 started.set() 1306 conns = [] 1307 while not finish: 1308 r, w, e = select.select([server], [], [], 0.1) 1309 if server in r: 1310 # Let the socket hang around rather than having 1311 # it closed by garbage collection. 1312 conns.append(server.accept()[0]) 1313 1314 t = threading.Thread(target=serve) 1315 t.start() 1316 started.wait() 1317 1318 try: 1319 try: 1320 c = socket.socket(socket.AF_INET) 1321 c.settimeout(0.2) 1322 c.connect((host, port)) 1323 # Will attempt handshake and time out 1324 self.assertRaisesRegexp(ssl.SSLError, "timed out", 1325 ssl.wrap_socket, c) 1326 finally: 1327 c.close() 1328 try: 1329 c = socket.socket(socket.AF_INET) 1330 c.settimeout(0.2) 1331 c = ssl.wrap_socket(c) 1332 # Will attempt handshake and time out 1333 self.assertRaisesRegexp(ssl.SSLError, "timed out", 1334 c.connect, (host, port)) 1335 finally: 1336 c.close() 1337 finally: 1338 finish = True 1339 t.join() 1340 server.close() 1341 1342 def test_default_ciphers(self): 1343 with ThreadedEchoServer(CERTFILE, 1344 ssl_version=ssl.PROTOCOL_SSLv23, 1345 chatty=False) as server: 1346 sock = socket.socket() 1347 try: 1348 # Force a set of weak ciphers on our client socket 1349 try: 1350 s = ssl.wrap_socket(sock, 1351 ssl_version=ssl.PROTOCOL_SSLv23, 1352 ciphers="DES") 1353 except ssl.SSLError: 1354 self.skipTest("no DES cipher available") 1355 with self.assertRaises((OSError, ssl.SSLError)): 1356 s.connect((HOST, server.port)) 1357 finally: 1358 sock.close() 1359 self.assertIn("no shared cipher", str(server.conn_errors[0])) 1360 1361 1362def test_main(verbose=False): 1363 global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, NOKIACERT 1364 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, 1365 "keycert.pem") 1366 SVN_PYTHON_ORG_ROOT_CERT = os.path.join( 1367 os.path.dirname(__file__) or os.curdir, 1368 "https_svn_python_org_root.pem") 1369 NOKIACERT = os.path.join(os.path.dirname(__file__) or os.curdir, 1370 "nokia.pem") 1371 1372 if (not os.path.exists(CERTFILE) or 1373 not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT) or 1374 not os.path.exists(NOKIACERT)): 1375 raise test_support.TestFailed("Can't read certificate files!") 1376 1377 tests = [BasicTests, BasicSocketTests] 1378 1379 if test_support.is_resource_enabled('network'): 1380 tests.append(NetworkedTests) 1381 1382 if _have_threads: 1383 thread_info = test_support.threading_setup() 1384 if thread_info and test_support.is_resource_enabled('network'): 1385 tests.append(ThreadedTests) 1386 1387 try: 1388 test_support.run_unittest(*tests) 1389 finally: 1390 if _have_threads: 1391 test_support.threading_cleanup(*thread_info) 1392 1393if __name__ == "__main__": 1394 test_main() 1395