1# Test the support for SSL and sockets
2
3import sys
4import unittest
5from test import test_support
6import asyncore
7import socket
8import select
9import time
10import gc
11import os
12import errno
13import pprint
14import urllib, urlparse
15import traceback
16import weakref
17import functools
18import platform
19
20from BaseHTTPServer import HTTPServer
21from SimpleHTTPServer import SimpleHTTPRequestHandler
22
23ssl = test_support.import_module("ssl")
24
25HOST = test_support.HOST
26CERTFILE = None
27SVN_PYTHON_ORG_ROOT_CERT = None
28
29def handle_error(prefix):
30    exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
31    if test_support.verbose:
32        sys.stdout.write(prefix + exc_format)
33
34
35class BasicTests(unittest.TestCase):
36
37    def test_sslwrap_simple(self):
38        # A crude test for the legacy API
39        try:
40            ssl.sslwrap_simple(socket.socket(socket.AF_INET))
41        except IOError, e:
42            if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
43                pass
44            else:
45                raise
46        try:
47            ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock)
48        except IOError, e:
49            if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
50                pass
51            else:
52                raise
53
54# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
55def skip_if_broken_ubuntu_ssl(func):
56    if hasattr(ssl, 'PROTOCOL_SSLv2'):
57        # We need to access the lower-level wrapper in order to create an
58        # implicit SSL context without trying to connect or listen.
59        try:
60            import _ssl
61        except ImportError:
62            # The returned function won't get executed, just ignore the error
63            pass
64        @functools.wraps(func)
65        def f(*args, **kwargs):
66            try:
67                s = socket.socket(socket.AF_INET)
68                _ssl.sslwrap(s._sock, 0, None, None,
69                             ssl.CERT_NONE, ssl.PROTOCOL_SSLv2, None, None)
70            except ssl.SSLError as e:
71                if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
72                    platform.linux_distribution() == ('debian', 'squeeze/sid', '')
73                    and 'Invalid SSL protocol variant specified' in str(e)):
74                    raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
75            return func(*args, **kwargs)
76        return f
77    else:
78        return func
79
80
81class BasicSocketTests(unittest.TestCase):
82
83    def test_constants(self):
84        #ssl.PROTOCOL_SSLv2
85        ssl.PROTOCOL_SSLv23
86        ssl.PROTOCOL_SSLv3
87        ssl.PROTOCOL_TLSv1
88        ssl.CERT_NONE
89        ssl.CERT_OPTIONAL
90        ssl.CERT_REQUIRED
91
92    def test_random(self):
93        v = ssl.RAND_status()
94        if test_support.verbose:
95            sys.stdout.write("\n RAND_status is %d (%s)\n"
96                             % (v, (v and "sufficient randomness") or
97                                "insufficient randomness"))
98        self.assertRaises(TypeError, ssl.RAND_egd, 1)
99        self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
100        ssl.RAND_add("this is a random string", 75.0)
101
102    def test_parse_cert(self):
103        # note that this uses an 'unofficial' function in _ssl.c,
104        # provided solely for this test, to exercise the certificate
105        # parsing code
106        p = ssl._ssl._test_decode_cert(CERTFILE, False)
107        if test_support.verbose:
108            sys.stdout.write("\n" + pprint.pformat(p) + "\n")
109        self.assertEqual(p['subject'],
110                         ((('countryName', 'XY'),),
111                          (('localityName', 'Castle Anthrax'),),
112                          (('organizationName', 'Python Software Foundation'),),
113                          (('commonName', 'localhost'),))
114                        )
115        self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
116        # Issue #13034: the subjectAltName in some certificates
117        # (notably projects.developer.nokia.com:443) wasn't parsed
118        p = ssl._ssl._test_decode_cert(NOKIACERT)
119        if test_support.verbose:
120            sys.stdout.write("\n" + pprint.pformat(p) + "\n")
121        self.assertEqual(p['subjectAltName'],
122                         (('DNS', 'projects.developer.nokia.com'),
123                          ('DNS', 'projects.forum.nokia.com'))
124                        )
125
126    def test_DER_to_PEM(self):
127        with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
128            pem = f.read()
129        d1 = ssl.PEM_cert_to_DER_cert(pem)
130        p2 = ssl.DER_cert_to_PEM_cert(d1)
131        d2 = ssl.PEM_cert_to_DER_cert(p2)
132        self.assertEqual(d1, d2)
133        if not p2.startswith(ssl.PEM_HEADER + '\n'):
134            self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
135        if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
136            self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
137
138    def test_openssl_version(self):
139        n = ssl.OPENSSL_VERSION_NUMBER
140        t = ssl.OPENSSL_VERSION_INFO
141        s = ssl.OPENSSL_VERSION
142        self.assertIsInstance(n, (int, long))
143        self.assertIsInstance(t, tuple)
144        self.assertIsInstance(s, str)
145        # Some sanity checks follow
146        # >= 0.9
147        self.assertGreaterEqual(n, 0x900000)
148        # < 2.0
149        self.assertLess(n, 0x20000000)
150        major, minor, fix, patch, status = t
151        self.assertGreaterEqual(major, 0)
152        self.assertLess(major, 2)
153        self.assertGreaterEqual(minor, 0)
154        self.assertLess(minor, 256)
155        self.assertGreaterEqual(fix, 0)
156        self.assertLess(fix, 256)
157        self.assertGreaterEqual(patch, 0)
158        self.assertLessEqual(patch, 26)
159        self.assertGreaterEqual(status, 0)
160        self.assertLessEqual(status, 15)
161        # Version string as returned by OpenSSL, the format might change
162        self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
163                        (s, t))
164
165    def test_ciphers(self):
166        if not test_support.is_resource_enabled('network'):
167            return
168        remote = ("svn.python.org", 443)
169        with test_support.transient_internet(remote[0]):
170            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
171                                cert_reqs=ssl.CERT_NONE, ciphers="ALL")
172            s.connect(remote)
173            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
174                                cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")
175            s.connect(remote)
176            # Error checking occurs when connecting, because the SSL context
177            # isn't created before.
178            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
179                                cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
180            with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
181                s.connect(remote)
182
183    @test_support.cpython_only
184    def test_refcycle(self):
185        # Issue #7943: an SSL object doesn't create reference cycles with
186        # itself.
187        s = socket.socket(socket.AF_INET)
188        ss = ssl.wrap_socket(s)
189        wr = weakref.ref(ss)
190        del ss
191        self.assertEqual(wr(), None)
192
193    def test_wrapped_unconnected(self):
194        # The _delegate_methods in socket.py are correctly delegated to by an
195        # unconnected SSLSocket, so they will raise a socket.error rather than
196        # something unexpected like TypeError.
197        s = socket.socket(socket.AF_INET)
198        ss = ssl.wrap_socket(s)
199        self.assertRaises(socket.error, ss.recv, 1)
200        self.assertRaises(socket.error, ss.recv_into, bytearray(b'x'))
201        self.assertRaises(socket.error, ss.recvfrom, 1)
202        self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1)
203        self.assertRaises(socket.error, ss.send, b'x')
204        self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0))
205
206
207class NetworkedTests(unittest.TestCase):
208
209    def test_connect(self):
210        with test_support.transient_internet("svn.python.org"):
211            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
212                                cert_reqs=ssl.CERT_NONE)
213            s.connect(("svn.python.org", 443))
214            c = s.getpeercert()
215            if c:
216                self.fail("Peer cert %s shouldn't be here!")
217            s.close()
218
219            # this should fail because we have no verification certs
220            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
221                                cert_reqs=ssl.CERT_REQUIRED)
222            try:
223                s.connect(("svn.python.org", 443))
224            except ssl.SSLError:
225                pass
226            finally:
227                s.close()
228
229            # this should succeed because we specify the root cert
230            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
231                                cert_reqs=ssl.CERT_REQUIRED,
232                                ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
233            try:
234                s.connect(("svn.python.org", 443))
235            finally:
236                s.close()
237
238    def test_connect_ex(self):
239        # Issue #11326: check connect_ex() implementation
240        with test_support.transient_internet("svn.python.org"):
241            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
242                                cert_reqs=ssl.CERT_REQUIRED,
243                                ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
244            try:
245                self.assertEqual(0, s.connect_ex(("svn.python.org", 443)))
246                self.assertTrue(s.getpeercert())
247            finally:
248                s.close()
249
250    def test_non_blocking_connect_ex(self):
251        # Issue #11326: non-blocking connect_ex() should allow handshake
252        # to proceed after the socket gets ready.
253        with test_support.transient_internet("svn.python.org"):
254            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
255                                cert_reqs=ssl.CERT_REQUIRED,
256                                ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
257                                do_handshake_on_connect=False)
258            try:
259                s.setblocking(False)
260                rc = s.connect_ex(('svn.python.org', 443))
261                # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
262                self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
263                # Wait for connect to finish
264                select.select([], [s], [], 5.0)
265                # Non-blocking handshake
266                while True:
267                    try:
268                        s.do_handshake()
269                        break
270                    except ssl.SSLError as err:
271                        if err.args[0] == ssl.SSL_ERROR_WANT_READ:
272                            select.select([s], [], [], 5.0)
273                        elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
274                            select.select([], [s], [], 5.0)
275                        else:
276                            raise
277                # SSL established
278                self.assertTrue(s.getpeercert())
279            finally:
280                s.close()
281
282    def test_timeout_connect_ex(self):
283        # Issue #12065: on a timeout, connect_ex() should return the original
284        # errno (mimicking the behaviour of non-SSL sockets).
285        with test_support.transient_internet("svn.python.org"):
286            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
287                                cert_reqs=ssl.CERT_REQUIRED,
288                                ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
289                                do_handshake_on_connect=False)
290            try:
291                s.settimeout(0.0000001)
292                rc = s.connect_ex(('svn.python.org', 443))
293                if rc == 0:
294                    self.skipTest("svn.python.org responded too quickly")
295                self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
296            finally:
297                s.close()
298
299    def test_connect_ex_error(self):
300        with test_support.transient_internet("svn.python.org"):
301            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
302                                cert_reqs=ssl.CERT_REQUIRED,
303                                ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
304            try:
305                self.assertEqual(errno.ECONNREFUSED,
306                                 s.connect_ex(("svn.python.org", 444)))
307            finally:
308                s.close()
309
310    @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
311    def test_makefile_close(self):
312        # Issue #5238: creating a file-like object with makefile() shouldn't
313        # delay closing the underlying "real socket" (here tested with its
314        # file descriptor, hence skipping the test under Windows).
315        with test_support.transient_internet("svn.python.org"):
316            ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
317            ss.connect(("svn.python.org", 443))
318            fd = ss.fileno()
319            f = ss.makefile()
320            f.close()
321            # The fd is still open
322            os.read(fd, 0)
323            # Closing the SSL socket should close the fd too
324            ss.close()
325            gc.collect()
326            with self.assertRaises(OSError) as e:
327                os.read(fd, 0)
328            self.assertEqual(e.exception.errno, errno.EBADF)
329
330    def test_non_blocking_handshake(self):
331        with test_support.transient_internet("svn.python.org"):
332            s = socket.socket(socket.AF_INET)
333            s.connect(("svn.python.org", 443))
334            s.setblocking(False)
335            s = ssl.wrap_socket(s,
336                                cert_reqs=ssl.CERT_NONE,
337                                do_handshake_on_connect=False)
338            count = 0
339            while True:
340                try:
341                    count += 1
342                    s.do_handshake()
343                    break
344                except ssl.SSLError, err:
345                    if err.args[0] == ssl.SSL_ERROR_WANT_READ:
346                        select.select([s], [], [])
347                    elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
348                        select.select([], [s], [])
349                    else:
350                        raise
351            s.close()
352            if test_support.verbose:
353                sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
354
355    def test_get_server_certificate(self):
356        with test_support.transient_internet("svn.python.org"):
357            pem = ssl.get_server_certificate(("svn.python.org", 443))
358            if not pem:
359                self.fail("No server certificate on svn.python.org:443!")
360
361            try:
362                pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
363            except ssl.SSLError:
364                #should fail
365                pass
366            else:
367                self.fail("Got server certificate %s for svn.python.org!" % pem)
368
369            pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
370            if not pem:
371                self.fail("No server certificate on svn.python.org:443!")
372            if test_support.verbose:
373                sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem)
374
375    def test_algorithms(self):
376        # Issue #8484: all algorithms should be available when verifying a
377        # certificate.
378        # SHA256 was added in OpenSSL 0.9.8
379        if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
380            self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
381        self.skipTest("remote host needs SNI, only available on Python 3.2+")
382        # NOTE: https://sha2.hboeck.de is another possible test host
383        remote = ("sha256.tbs-internet.com", 443)
384        sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
385        with test_support.transient_internet("sha256.tbs-internet.com"):
386            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
387                                cert_reqs=ssl.CERT_REQUIRED,
388                                ca_certs=sha256_cert,)
389            try:
390                s.connect(remote)
391                if test_support.verbose:
392                    sys.stdout.write("\nCipher with %r is %r\n" %
393                                     (remote, s.cipher()))
394                    sys.stdout.write("Certificate is:\n%s\n" %
395                                     pprint.pformat(s.getpeercert()))
396            finally:
397                s.close()
398
399
400try:
401    import threading
402except ImportError:
403    _have_threads = False
404else:
405    _have_threads = True
406
407    class ThreadedEchoServer(threading.Thread):
408
409        class ConnectionHandler(threading.Thread):
410
411            """A mildly complicated class, because we want it to work both
412            with and without the SSL wrapper around the socket connection, so
413            that we can test the STARTTLS functionality."""
414
415            def __init__(self, server, connsock):
416                self.server = server
417                self.running = False
418                self.sock = connsock
419                self.sock.setblocking(1)
420                self.sslconn = None
421                threading.Thread.__init__(self)
422                self.daemon = True
423
424            def show_conn_details(self):
425                if self.server.certreqs == ssl.CERT_REQUIRED:
426                    cert = self.sslconn.getpeercert()
427                    if test_support.verbose and self.server.chatty:
428                        sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
429                    cert_binary = self.sslconn.getpeercert(True)
430                    if test_support.verbose and self.server.chatty:
431                        sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
432                cipher = self.sslconn.cipher()
433                if test_support.verbose and self.server.chatty:
434                    sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
435
436            def wrap_conn(self):
437                try:
438                    self.sslconn = ssl.wrap_socket(self.sock, server_side=True,
439                                                   certfile=self.server.certificate,
440                                                   ssl_version=self.server.protocol,
441                                                   ca_certs=self.server.cacerts,
442                                                   cert_reqs=self.server.certreqs,
443                                                   ciphers=self.server.ciphers)
444                except ssl.SSLError as e:
445                    # XXX Various errors can have happened here, for example
446                    # a mismatching protocol version, an invalid certificate,
447                    # or a low-level bug. This should be made more discriminating.
448                    self.server.conn_errors.append(e)
449                    if self.server.chatty:
450                        handle_error("\n server:  bad connection attempt from " +
451                                     str(self.sock.getpeername()) + ":\n")
452                    self.close()
453                    self.running = False
454                    self.server.stop()
455                    return False
456                else:
457                    return True
458
459            def read(self):
460                if self.sslconn:
461                    return self.sslconn.read()
462                else:
463                    return self.sock.recv(1024)
464
465            def write(self, bytes):
466                if self.sslconn:
467                    return self.sslconn.write(bytes)
468                else:
469                    return self.sock.send(bytes)
470
471            def close(self):
472                if self.sslconn:
473                    self.sslconn.close()
474                else:
475                    self.sock._sock.close()
476
477            def run(self):
478                self.running = True
479                if not self.server.starttls_server:
480                    if isinstance(self.sock, ssl.SSLSocket):
481                        self.sslconn = self.sock
482                    elif not self.wrap_conn():
483                        return
484                    self.show_conn_details()
485                while self.running:
486                    try:
487                        msg = self.read()
488                        if not msg:
489                            # eof, so quit this handler
490                            self.running = False
491                            self.close()
492                        elif msg.strip() == 'over':
493                            if test_support.verbose and self.server.connectionchatty:
494                                sys.stdout.write(" server: client closed connection\n")
495                            self.close()
496                            return
497                        elif self.server.starttls_server and msg.strip() == 'STARTTLS':
498                            if test_support.verbose and self.server.connectionchatty:
499                                sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
500                            self.write("OK\n")
501                            if not self.wrap_conn():
502                                return
503                        elif self.server.starttls_server and self.sslconn and msg.strip() == 'ENDTLS':
504                            if test_support.verbose and self.server.connectionchatty:
505                                sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
506                            self.write("OK\n")
507                            self.sslconn.unwrap()
508                            self.sslconn = None
509                            if test_support.verbose and self.server.connectionchatty:
510                                sys.stdout.write(" server: connection is now unencrypted...\n")
511                        else:
512                            if (test_support.verbose and
513                                self.server.connectionchatty):
514                                ctype = (self.sslconn and "encrypted") or "unencrypted"
515                                sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n"
516                                                 % (repr(msg), ctype, repr(msg.lower()), ctype))
517                            self.write(msg.lower())
518                    except ssl.SSLError:
519                        if self.server.chatty:
520                            handle_error("Test server failure:\n")
521                        self.close()
522                        self.running = False
523                        # normally, we'd just stop here, but for the test
524                        # harness, we want to stop the server
525                        self.server.stop()
526
527        def __init__(self, certificate, ssl_version=None,
528                     certreqs=None, cacerts=None,
529                     chatty=True, connectionchatty=False, starttls_server=False,
530                     wrap_accepting_socket=False, ciphers=None):
531
532            if ssl_version is None:
533                ssl_version = ssl.PROTOCOL_TLSv1
534            if certreqs is None:
535                certreqs = ssl.CERT_NONE
536            self.certificate = certificate
537            self.protocol = ssl_version
538            self.certreqs = certreqs
539            self.cacerts = cacerts
540            self.ciphers = ciphers
541            self.chatty = chatty
542            self.connectionchatty = connectionchatty
543            self.starttls_server = starttls_server
544            self.sock = socket.socket()
545            self.flag = None
546            if wrap_accepting_socket:
547                self.sock = ssl.wrap_socket(self.sock, server_side=True,
548                                            certfile=self.certificate,
549                                            cert_reqs = self.certreqs,
550                                            ca_certs = self.cacerts,
551                                            ssl_version = self.protocol,
552                                            ciphers = self.ciphers)
553                if test_support.verbose and self.chatty:
554                    sys.stdout.write(' server:  wrapped server socket as %s\n' % str(self.sock))
555            self.port = test_support.bind_port(self.sock)
556            self.active = False
557            self.conn_errors = []
558            threading.Thread.__init__(self)
559            self.daemon = True
560
561        def __enter__(self):
562            self.start(threading.Event())
563            self.flag.wait()
564            return self
565
566        def __exit__(self, *args):
567            self.stop()
568            self.join()
569
570        def start(self, flag=None):
571            self.flag = flag
572            threading.Thread.start(self)
573
574        def run(self):
575            self.sock.settimeout(0.05)
576            self.sock.listen(5)
577            self.active = True
578            if self.flag:
579                # signal an event
580                self.flag.set()
581            while self.active:
582                try:
583                    newconn, connaddr = self.sock.accept()
584                    if test_support.verbose and self.chatty:
585                        sys.stdout.write(' server:  new connection from '
586                                         + str(connaddr) + '\n')
587                    handler = self.ConnectionHandler(self, newconn)
588                    handler.start()
589                    handler.join()
590                except socket.timeout:
591                    pass
592                except KeyboardInterrupt:
593                    self.stop()
594            self.sock.close()
595
596        def stop(self):
597            self.active = False
598
599    class AsyncoreEchoServer(threading.Thread):
600
601        class EchoServer(asyncore.dispatcher):
602
603            class ConnectionHandler(asyncore.dispatcher_with_send):
604
605                def __init__(self, conn, certfile):
606                    asyncore.dispatcher_with_send.__init__(self, conn)
607                    self.socket = ssl.wrap_socket(conn, server_side=True,
608                                                  certfile=certfile,
609                                                  do_handshake_on_connect=False)
610                    self._ssl_accepting = True
611
612                def readable(self):
613                    if isinstance(self.socket, ssl.SSLSocket):
614                        while self.socket.pending() > 0:
615                            self.handle_read_event()
616                    return True
617
618                def _do_ssl_handshake(self):
619                    try:
620                        self.socket.do_handshake()
621                    except ssl.SSLError, err:
622                        if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
623                                           ssl.SSL_ERROR_WANT_WRITE):
624                            return
625                        elif err.args[0] == ssl.SSL_ERROR_EOF:
626                            return self.handle_close()
627                        raise
628                    except socket.error, err:
629                        if err.args[0] == errno.ECONNABORTED:
630                            return self.handle_close()
631                    else:
632                        self._ssl_accepting = False
633
634                def handle_read(self):
635                    if self._ssl_accepting:
636                        self._do_ssl_handshake()
637                    else:
638                        data = self.recv(1024)
639                        if data and data.strip() != 'over':
640                            self.send(data.lower())
641
642                def handle_close(self):
643                    self.close()
644                    if test_support.verbose:
645                        sys.stdout.write(" server:  closed connection %s\n" % self.socket)
646
647                def handle_error(self):
648                    raise
649
650            def __init__(self, certfile):
651                self.certfile = certfile
652                asyncore.dispatcher.__init__(self)
653                self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
654                self.port = test_support.bind_port(self.socket)
655                self.listen(5)
656
657            def handle_accept(self):
658                sock_obj, addr = self.accept()
659                if test_support.verbose:
660                    sys.stdout.write(" server:  new connection from %s:%s\n" %addr)
661                self.ConnectionHandler(sock_obj, self.certfile)
662
663            def handle_error(self):
664                raise
665
666        def __init__(self, certfile):
667            self.flag = None
668            self.active = False
669            self.server = self.EchoServer(certfile)
670            self.port = self.server.port
671            threading.Thread.__init__(self)
672            self.daemon = True
673
674        def __str__(self):
675            return "<%s %s>" % (self.__class__.__name__, self.server)
676
677        def __enter__(self):
678            self.start(threading.Event())
679            self.flag.wait()
680            return self
681
682        def __exit__(self, *args):
683            if test_support.verbose:
684                sys.stdout.write(" cleanup: stopping server.\n")
685            self.stop()
686            if test_support.verbose:
687                sys.stdout.write(" cleanup: joining server thread.\n")
688            self.join()
689            if test_support.verbose:
690                sys.stdout.write(" cleanup: successfully joined.\n")
691
692        def start(self, flag=None):
693            self.flag = flag
694            threading.Thread.start(self)
695
696        def run(self):
697            self.active = True
698            if self.flag:
699                self.flag.set()
700            while self.active:
701                asyncore.loop(0.05)
702
703        def stop(self):
704            self.active = False
705            self.server.close()
706
707    class SocketServerHTTPSServer(threading.Thread):
708
709        class HTTPSServer(HTTPServer):
710
711            def __init__(self, server_address, RequestHandlerClass, certfile):
712                HTTPServer.__init__(self, server_address, RequestHandlerClass)
713                # we assume the certfile contains both private key and certificate
714                self.certfile = certfile
715                self.allow_reuse_address = True
716
717            def __str__(self):
718                return ('<%s %s:%s>' %
719                        (self.__class__.__name__,
720                         self.server_name,
721                         self.server_port))
722
723            def get_request(self):
724                # override this to wrap socket with SSL
725                sock, addr = self.socket.accept()
726                sslconn = ssl.wrap_socket(sock, server_side=True,
727                                          certfile=self.certfile)
728                return sslconn, addr
729
730        class RootedHTTPRequestHandler(SimpleHTTPRequestHandler):
731            # need to override translate_path to get a known root,
732            # instead of using os.curdir, since the test could be
733            # run from anywhere
734
735            server_version = "TestHTTPS/1.0"
736
737            root = None
738
739            def translate_path(self, path):
740                """Translate a /-separated PATH to the local filename syntax.
741
742                Components that mean special things to the local file system
743                (e.g. drive or directory names) are ignored.  (XXX They should
744                probably be diagnosed.)
745
746                """
747                # abandon query parameters
748                path = urlparse.urlparse(path)[2]
749                path = os.path.normpath(urllib.unquote(path))
750                words = path.split('/')
751                words = filter(None, words)
752                path = self.root
753                for word in words:
754                    drive, word = os.path.splitdrive(word)
755                    head, word = os.path.split(word)
756                    if word in self.root: continue
757                    path = os.path.join(path, word)
758                return path
759
760            def log_message(self, format, *args):
761
762                # we override this to suppress logging unless "verbose"
763
764                if test_support.verbose:
765                    sys.stdout.write(" server (%s:%d %s):\n   [%s] %s\n" %
766                                     (self.server.server_address,
767                                      self.server.server_port,
768                                      self.request.cipher(),
769                                      self.log_date_time_string(),
770                                      format%args))
771
772
773        def __init__(self, certfile):
774            self.flag = None
775            self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0]
776            self.server = self.HTTPSServer(
777                (HOST, 0), self.RootedHTTPRequestHandler, certfile)
778            self.port = self.server.server_port
779            threading.Thread.__init__(self)
780            self.daemon = True
781
782        def __str__(self):
783            return "<%s %s>" % (self.__class__.__name__, self.server)
784
785        def start(self, flag=None):
786            self.flag = flag
787            threading.Thread.start(self)
788
789        def run(self):
790            if self.flag:
791                self.flag.set()
792            self.server.serve_forever(0.05)
793
794        def stop(self):
795            self.server.shutdown()
796
797
798    def bad_cert_test(certfile):
799        """
800        Launch a server with CERT_REQUIRED, and check that trying to
801        connect to it with the given client certificate fails.
802        """
803        server = ThreadedEchoServer(CERTFILE,
804                                    certreqs=ssl.CERT_REQUIRED,
805                                    cacerts=CERTFILE, chatty=False)
806        with server:
807            try:
808                s = ssl.wrap_socket(socket.socket(),
809                                    certfile=certfile,
810                                    ssl_version=ssl.PROTOCOL_TLSv1)
811                s.connect((HOST, server.port))
812            except ssl.SSLError, x:
813                if test_support.verbose:
814                    sys.stdout.write("\nSSLError is %s\n" % x[1])
815            except socket.error, x:
816                if test_support.verbose:
817                    sys.stdout.write("\nsocket.error is %s\n" % x[1])
818            else:
819                raise AssertionError("Use of invalid cert should have failed!")
820
821    def server_params_test(certfile, protocol, certreqs, cacertsfile,
822                           client_certfile, client_protocol=None, indata="FOO\n",
823                           ciphers=None, chatty=True, connectionchatty=False,
824                           wrap_accepting_socket=False):
825        """
826        Launch a server, connect a client to it and try various reads
827        and writes.
828        """
829        server = ThreadedEchoServer(certfile,
830                                    certreqs=certreqs,
831                                    ssl_version=protocol,
832                                    cacerts=cacertsfile,
833                                    ciphers=ciphers,
834                                    chatty=chatty,
835                                    connectionchatty=connectionchatty,
836                                    wrap_accepting_socket=wrap_accepting_socket)
837        with server:
838            # try to connect
839            if client_protocol is None:
840                client_protocol = protocol
841            s = ssl.wrap_socket(socket.socket(),
842                                certfile=client_certfile,
843                                ca_certs=cacertsfile,
844                                ciphers=ciphers,
845                                cert_reqs=certreqs,
846                                ssl_version=client_protocol)
847            s.connect((HOST, server.port))
848            for arg in [indata, bytearray(indata), memoryview(indata)]:
849                if connectionchatty:
850                    if test_support.verbose:
851                        sys.stdout.write(
852                            " client:  sending %s...\n" % (repr(arg)))
853                s.write(arg)
854                outdata = s.read()
855                if connectionchatty:
856                    if test_support.verbose:
857                        sys.stdout.write(" client:  read %s\n" % repr(outdata))
858                if outdata != indata.lower():
859                    raise AssertionError(
860                        "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
861                        % (outdata[:min(len(outdata),20)], len(outdata),
862                           indata[:min(len(indata),20)].lower(), len(indata)))
863            s.write("over\n")
864            if connectionchatty:
865                if test_support.verbose:
866                    sys.stdout.write(" client:  closing connection.\n")
867            s.close()
868
869    def try_protocol_combo(server_protocol,
870                           client_protocol,
871                           expect_success,
872                           certsreqs=None):
873        if certsreqs is None:
874            certsreqs = ssl.CERT_NONE
875        certtype = {
876            ssl.CERT_NONE: "CERT_NONE",
877            ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
878            ssl.CERT_REQUIRED: "CERT_REQUIRED",
879        }[certsreqs]
880        if test_support.verbose:
881            formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
882            sys.stdout.write(formatstr %
883                             (ssl.get_protocol_name(client_protocol),
884                              ssl.get_protocol_name(server_protocol),
885                              certtype))
886        try:
887            # NOTE: we must enable "ALL" ciphers, otherwise an SSLv23 client
888            # will send an SSLv3 hello (rather than SSLv2) starting from
889            # OpenSSL 1.0.0 (see issue #8322).
890            server_params_test(CERTFILE, server_protocol, certsreqs,
891                               CERTFILE, CERTFILE, client_protocol,
892                               ciphers="ALL", chatty=False)
893        # Protocol mismatch can result in either an SSLError, or a
894        # "Connection reset by peer" error.
895        except ssl.SSLError:
896            if expect_success:
897                raise
898        except socket.error as e:
899            if expect_success or e.errno != errno.ECONNRESET:
900                raise
901        else:
902            if not expect_success:
903                raise AssertionError(
904                    "Client protocol %s succeeded with server protocol %s!"
905                    % (ssl.get_protocol_name(client_protocol),
906                       ssl.get_protocol_name(server_protocol)))
907
908
909    class ThreadedTests(unittest.TestCase):
910
911        def test_rude_shutdown(self):
912            """A brutal shutdown of an SSL server should raise an IOError
913            in the client when attempting handshake.
914            """
915            listener_ready = threading.Event()
916            listener_gone = threading.Event()
917
918            s = socket.socket()
919            port = test_support.bind_port(s, HOST)
920
921            # `listener` runs in a thread.  It sits in an accept() until
922            # the main thread connects.  Then it rudely closes the socket,
923            # and sets Event `listener_gone` to let the main thread know
924            # the socket is gone.
925            def listener():
926                s.listen(5)
927                listener_ready.set()
928                s.accept()
929                s.close()
930                listener_gone.set()
931
932            def connector():
933                listener_ready.wait()
934                c = socket.socket()
935                c.connect((HOST, port))
936                listener_gone.wait()
937                try:
938                    ssl_sock = ssl.wrap_socket(c)
939                except IOError:
940                    pass
941                else:
942                    self.fail('connecting to closed SSL socket should have failed')
943
944            t = threading.Thread(target=listener)
945            t.start()
946            try:
947                connector()
948            finally:
949                t.join()
950
951        @skip_if_broken_ubuntu_ssl
952        def test_echo(self):
953            """Basic test of an SSL client connecting to a server"""
954            if test_support.verbose:
955                sys.stdout.write("\n")
956            server_params_test(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE,
957                               CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1,
958                               chatty=True, connectionchatty=True)
959
960        def test_getpeercert(self):
961            if test_support.verbose:
962                sys.stdout.write("\n")
963            s2 = socket.socket()
964            server = ThreadedEchoServer(CERTFILE,
965                                        certreqs=ssl.CERT_NONE,
966                                        ssl_version=ssl.PROTOCOL_SSLv23,
967                                        cacerts=CERTFILE,
968                                        chatty=False)
969            with server:
970                s = ssl.wrap_socket(socket.socket(),
971                                    certfile=CERTFILE,
972                                    ca_certs=CERTFILE,
973                                    cert_reqs=ssl.CERT_REQUIRED,
974                                    ssl_version=ssl.PROTOCOL_SSLv23)
975                s.connect((HOST, server.port))
976                cert = s.getpeercert()
977                self.assertTrue(cert, "Can't get peer certificate.")
978                cipher = s.cipher()
979                if test_support.verbose:
980                    sys.stdout.write(pprint.pformat(cert) + '\n')
981                    sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
982                if 'subject' not in cert:
983                    self.fail("No subject field in certificate: %s." %
984                              pprint.pformat(cert))
985                if ((('organizationName', 'Python Software Foundation'),)
986                    not in cert['subject']):
987                    self.fail(
988                        "Missing or invalid 'organizationName' field in certificate subject; "
989                        "should be 'Python Software Foundation'.")
990                s.close()
991
992        def test_empty_cert(self):
993            """Connecting with an empty cert file"""
994            bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
995                                      "nullcert.pem"))
996        def test_malformed_cert(self):
997            """Connecting with a badly formatted certificate (syntax error)"""
998            bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
999                                       "badcert.pem"))
1000        def test_nonexisting_cert(self):
1001            """Connecting with a non-existing cert file"""
1002            bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
1003                                       "wrongcert.pem"))
1004        def test_malformed_key(self):
1005            """Connecting with a badly formatted key (syntax error)"""
1006            bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
1007                                       "badkey.pem"))
1008
1009        @skip_if_broken_ubuntu_ssl
1010        def test_protocol_sslv2(self):
1011            """Connecting to an SSLv2 server with various client options"""
1012            if test_support.verbose:
1013                sys.stdout.write("\n")
1014            if not hasattr(ssl, 'PROTOCOL_SSLv2'):
1015                self.skipTest("PROTOCOL_SSLv2 needed")
1016            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
1017            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
1018            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
1019            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True)
1020            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
1021            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
1022
1023        @skip_if_broken_ubuntu_ssl
1024        def test_protocol_sslv23(self):
1025            """Connecting to an SSLv23 server with various client options"""
1026            if test_support.verbose:
1027                sys.stdout.write("\n")
1028            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True)
1029            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
1030            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True)
1031
1032            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
1033            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
1034            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
1035
1036            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
1037            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
1038            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
1039
1040        @skip_if_broken_ubuntu_ssl
1041        def test_protocol_sslv3(self):
1042            """Connecting to an SSLv3 server with various client options"""
1043            if test_support.verbose:
1044                sys.stdout.write("\n")
1045            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True)
1046            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
1047            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
1048            if hasattr(ssl, 'PROTOCOL_SSLv2'):
1049                try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
1050            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
1051
1052        @skip_if_broken_ubuntu_ssl
1053        def test_protocol_tlsv1(self):
1054            """Connecting to a TLSv1 server with various client options"""
1055            if test_support.verbose:
1056                sys.stdout.write("\n")
1057            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)
1058            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
1059            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
1060            if hasattr(ssl, 'PROTOCOL_SSLv2'):
1061                try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
1062            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
1063
1064        def test_starttls(self):
1065            """Switching from clear text to encrypted and back again."""
1066            msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4", "ENDTLS", "msg 5", "msg 6")
1067
1068            server = ThreadedEchoServer(CERTFILE,
1069                                        ssl_version=ssl.PROTOCOL_TLSv1,
1070                                        starttls_server=True,
1071                                        chatty=True,
1072                                        connectionchatty=True)
1073            wrapped = False
1074            with server:
1075                s = socket.socket()
1076                s.setblocking(1)
1077                s.connect((HOST, server.port))
1078                if test_support.verbose:
1079                    sys.stdout.write("\n")
1080                for indata in msgs:
1081                    if test_support.verbose:
1082                        sys.stdout.write(
1083                            " client:  sending %s...\n" % repr(indata))
1084                    if wrapped:
1085                        conn.write(indata)
1086                        outdata = conn.read()
1087                    else:
1088                        s.send(indata)
1089                        outdata = s.recv(1024)
1090                    if (indata == "STARTTLS" and
1091                        outdata.strip().lower().startswith("ok")):
1092                        # STARTTLS ok, switch to secure mode
1093                        if test_support.verbose:
1094                            sys.stdout.write(
1095                                " client:  read %s from server, starting TLS...\n"
1096                                % repr(outdata))
1097                        conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
1098                        wrapped = True
1099                    elif (indata == "ENDTLS" and
1100                        outdata.strip().lower().startswith("ok")):
1101                        # ENDTLS ok, switch back to clear text
1102                        if test_support.verbose:
1103                            sys.stdout.write(
1104                                " client:  read %s from server, ending TLS...\n"
1105                                % repr(outdata))
1106                        s = conn.unwrap()
1107                        wrapped = False
1108                    else:
1109                        if test_support.verbose:
1110                            sys.stdout.write(
1111                                " client:  read %s from server\n" % repr(outdata))
1112                if test_support.verbose:
1113                    sys.stdout.write(" client:  closing connection.\n")
1114                if wrapped:
1115                    conn.write("over\n")
1116                else:
1117                    s.send("over\n")
1118                s.close()
1119
1120        def test_socketserver(self):
1121            """Using a SocketServer to create and manage SSL connections."""
1122            server = SocketServerHTTPSServer(CERTFILE)
1123            flag = threading.Event()
1124            server.start(flag)
1125            # wait for it to start
1126            flag.wait()
1127            # try to connect
1128            try:
1129                if test_support.verbose:
1130                    sys.stdout.write('\n')
1131                with open(CERTFILE, 'rb') as f:
1132                    d1 = f.read()
1133                d2 = ''
1134                # now fetch the same data from the HTTPS server
1135                url = 'https://127.0.0.1:%d/%s' % (
1136                    server.port, os.path.split(CERTFILE)[1])
1137                with test_support.check_py3k_warnings():
1138                    f = urllib.urlopen(url)
1139                dlen = f.info().getheader("content-length")
1140                if dlen and (int(dlen) > 0):
1141                    d2 = f.read(int(dlen))
1142                    if test_support.verbose:
1143                        sys.stdout.write(
1144                            " client: read %d bytes from remote server '%s'\n"
1145                            % (len(d2), server))
1146                f.close()
1147                self.assertEqual(d1, d2)
1148            finally:
1149                server.stop()
1150                server.join()
1151
1152        def test_wrapped_accept(self):
1153            """Check the accept() method on SSL sockets."""
1154            if test_support.verbose:
1155                sys.stdout.write("\n")
1156            server_params_test(CERTFILE, ssl.PROTOCOL_SSLv23, ssl.CERT_REQUIRED,
1157                               CERTFILE, CERTFILE, ssl.PROTOCOL_SSLv23,
1158                               chatty=True, connectionchatty=True,
1159                               wrap_accepting_socket=True)
1160
1161        def test_asyncore_server(self):
1162            """Check the example asyncore integration."""
1163            indata = "TEST MESSAGE of mixed case\n"
1164
1165            if test_support.verbose:
1166                sys.stdout.write("\n")
1167            server = AsyncoreEchoServer(CERTFILE)
1168            with server:
1169                s = ssl.wrap_socket(socket.socket())
1170                s.connect(('127.0.0.1', server.port))
1171                if test_support.verbose:
1172                    sys.stdout.write(
1173                        " client:  sending %s...\n" % (repr(indata)))
1174                s.write(indata)
1175                outdata = s.read()
1176                if test_support.verbose:
1177                    sys.stdout.write(" client:  read %s\n" % repr(outdata))
1178                if outdata != indata.lower():
1179                    self.fail(
1180                        "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
1181                        % (outdata[:min(len(outdata),20)], len(outdata),
1182                           indata[:min(len(indata),20)].lower(), len(indata)))
1183                s.write("over\n")
1184                if test_support.verbose:
1185                    sys.stdout.write(" client:  closing connection.\n")
1186                s.close()
1187
1188        def test_recv_send(self):
1189            """Test recv(), send() and friends."""
1190            if test_support.verbose:
1191                sys.stdout.write("\n")
1192
1193            server = ThreadedEchoServer(CERTFILE,
1194                                        certreqs=ssl.CERT_NONE,
1195                                        ssl_version=ssl.PROTOCOL_TLSv1,
1196                                        cacerts=CERTFILE,
1197                                        chatty=True,
1198                                        connectionchatty=False)
1199            with server:
1200                s = ssl.wrap_socket(socket.socket(),
1201                                    server_side=False,
1202                                    certfile=CERTFILE,
1203                                    ca_certs=CERTFILE,
1204                                    cert_reqs=ssl.CERT_NONE,
1205                                    ssl_version=ssl.PROTOCOL_TLSv1)
1206                s.connect((HOST, server.port))
1207                # helper methods for standardising recv* method signatures
1208                def _recv_into():
1209                    b = bytearray("\0"*100)
1210                    count = s.recv_into(b)
1211                    return b[:count]
1212
1213                def _recvfrom_into():
1214                    b = bytearray("\0"*100)
1215                    count, addr = s.recvfrom_into(b)
1216                    return b[:count]
1217
1218                # (name, method, whether to expect success, *args)
1219                send_methods = [
1220                    ('send', s.send, True, []),
1221                    ('sendto', s.sendto, False, ["some.address"]),
1222                    ('sendall', s.sendall, True, []),
1223                ]
1224                recv_methods = [
1225                    ('recv', s.recv, True, []),
1226                    ('recvfrom', s.recvfrom, False, ["some.address"]),
1227                    ('recv_into', _recv_into, True, []),
1228                    ('recvfrom_into', _recvfrom_into, False, []),
1229                ]
1230                data_prefix = u"PREFIX_"
1231
1232                for meth_name, send_meth, expect_success, args in send_methods:
1233                    indata = data_prefix + meth_name
1234                    try:
1235                        send_meth(indata.encode('ASCII', 'strict'), *args)
1236                        outdata = s.read()
1237                        outdata = outdata.decode('ASCII', 'strict')
1238                        if outdata != indata.lower():
1239                            self.fail(
1240                                "While sending with <<%s>> bad data "
1241                                "<<%r>> (%d) received; "
1242                                "expected <<%r>> (%d)\n" % (
1243                                    meth_name, outdata[:20], len(outdata),
1244                                    indata[:20], len(indata)
1245                                )
1246                            )
1247                    except ValueError as e:
1248                        if expect_success:
1249                            self.fail(
1250                                "Failed to send with method <<%s>>; "
1251                                "expected to succeed.\n" % (meth_name,)
1252                            )
1253                        if not str(e).startswith(meth_name):
1254                            self.fail(
1255                                "Method <<%s>> failed with unexpected "
1256                                "exception message: %s\n" % (
1257                                    meth_name, e
1258                                )
1259                            )
1260
1261                for meth_name, recv_meth, expect_success, args in recv_methods:
1262                    indata = data_prefix + meth_name
1263                    try:
1264                        s.send(indata.encode('ASCII', 'strict'))
1265                        outdata = recv_meth(*args)
1266                        outdata = outdata.decode('ASCII', 'strict')
1267                        if outdata != indata.lower():
1268                            self.fail(
1269                                "While receiving with <<%s>> bad data "
1270                                "<<%r>> (%d) received; "
1271                                "expected <<%r>> (%d)\n" % (
1272                                    meth_name, outdata[:20], len(outdata),
1273                                    indata[:20], len(indata)
1274                                )
1275                            )
1276                    except ValueError as e:
1277                        if expect_success:
1278                            self.fail(
1279                                "Failed to receive with method <<%s>>; "
1280                                "expected to succeed.\n" % (meth_name,)
1281                            )
1282                        if not str(e).startswith(meth_name):
1283                            self.fail(
1284                                "Method <<%s>> failed with unexpected "
1285                                "exception message: %s\n" % (
1286                                    meth_name, e
1287                                )
1288                            )
1289                        # consume data
1290                        s.read()
1291
1292                s.write("over\n".encode("ASCII", "strict"))
1293                s.close()
1294
1295        def test_handshake_timeout(self):
1296            # Issue #5103: SSL handshake must respect the socket timeout
1297            server = socket.socket(socket.AF_INET)
1298            host = "127.0.0.1"
1299            port = test_support.bind_port(server)
1300            started = threading.Event()
1301            finish = False
1302
1303            def serve():
1304                server.listen(5)
1305                started.set()
1306                conns = []
1307                while not finish:
1308                    r, w, e = select.select([server], [], [], 0.1)
1309                    if server in r:
1310                        # Let the socket hang around rather than having
1311                        # it closed by garbage collection.
1312                        conns.append(server.accept()[0])
1313
1314            t = threading.Thread(target=serve)
1315            t.start()
1316            started.wait()
1317
1318            try:
1319                try:
1320                    c = socket.socket(socket.AF_INET)
1321                    c.settimeout(0.2)
1322                    c.connect((host, port))
1323                    # Will attempt handshake and time out
1324                    self.assertRaisesRegexp(ssl.SSLError, "timed out",
1325                                            ssl.wrap_socket, c)
1326                finally:
1327                    c.close()
1328                try:
1329                    c = socket.socket(socket.AF_INET)
1330                    c.settimeout(0.2)
1331                    c = ssl.wrap_socket(c)
1332                    # Will attempt handshake and time out
1333                    self.assertRaisesRegexp(ssl.SSLError, "timed out",
1334                                            c.connect, (host, port))
1335                finally:
1336                    c.close()
1337            finally:
1338                finish = True
1339                t.join()
1340                server.close()
1341
1342        def test_default_ciphers(self):
1343            with ThreadedEchoServer(CERTFILE,
1344                                    ssl_version=ssl.PROTOCOL_SSLv23,
1345                                    chatty=False) as server:
1346                sock = socket.socket()
1347                try:
1348                    # Force a set of weak ciphers on our client socket
1349                    try:
1350                        s = ssl.wrap_socket(sock,
1351                                            ssl_version=ssl.PROTOCOL_SSLv23,
1352                                            ciphers="DES")
1353                    except ssl.SSLError:
1354                        self.skipTest("no DES cipher available")
1355                    with self.assertRaises((OSError, ssl.SSLError)):
1356                        s.connect((HOST, server.port))
1357                finally:
1358                    sock.close()
1359            self.assertIn("no shared cipher", str(server.conn_errors[0]))
1360
1361
1362def test_main(verbose=False):
1363    global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, NOKIACERT
1364    CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
1365                            "keycert.pem")
1366    SVN_PYTHON_ORG_ROOT_CERT = os.path.join(
1367        os.path.dirname(__file__) or os.curdir,
1368        "https_svn_python_org_root.pem")
1369    NOKIACERT = os.path.join(os.path.dirname(__file__) or os.curdir,
1370                             "nokia.pem")
1371
1372    if (not os.path.exists(CERTFILE) or
1373        not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT) or
1374        not os.path.exists(NOKIACERT)):
1375        raise test_support.TestFailed("Can't read certificate files!")
1376
1377    tests = [BasicTests, BasicSocketTests]
1378
1379    if test_support.is_resource_enabled('network'):
1380        tests.append(NetworkedTests)
1381
1382    if _have_threads:
1383        thread_info = test_support.threading_setup()
1384        if thread_info and test_support.is_resource_enabled('network'):
1385            tests.append(ThreadedTests)
1386
1387    try:
1388        test_support.run_unittest(*tests)
1389    finally:
1390        if _have_threads:
1391            test_support.threading_cleanup(*thread_info)
1392
1393if __name__ == "__main__":
1394    test_main()
1395