1import base64
2import os
3import email
4import urllib.parse
5import urllib.request
6import http.server
7import unittest
8import hashlib
9
10from test import support
11
12threading = support.import_module('threading')
13
14try:
15    import ssl
16except ImportError:
17    ssl = None
18
19here = os.path.dirname(__file__)
20# Self-signed cert file for 'localhost'
21CERT_localhost = os.path.join(here, 'keycert.pem')
22# Self-signed cert file for 'fakehostname'
23CERT_fakehostname = os.path.join(here, 'keycert2.pem')
24
25
26# Loopback http server infrastructure
27
28class LoopbackHttpServer(http.server.HTTPServer):
29    """HTTP server w/ a few modifications that make it useful for
30    loopback testing purposes.
31    """
32
33    def __init__(self, server_address, RequestHandlerClass):
34        http.server.HTTPServer.__init__(self,
35                                        server_address,
36                                        RequestHandlerClass)
37
38        # Set the timeout of our listening socket really low so
39        # that we can stop the server easily.
40        self.socket.settimeout(0.1)
41
42    def get_request(self):
43        """HTTPServer method, overridden."""
44
45        request, client_address = self.socket.accept()
46
47        # It's a loopback connection, so setting the timeout
48        # really low shouldn't affect anything, but should make
49        # deadlocks less likely to occur.
50        request.settimeout(10.0)
51
52        return (request, client_address)
53
54class LoopbackHttpServerThread(threading.Thread):
55    """Stoppable thread that runs a loopback http server."""
56
57    def __init__(self, request_handler):
58        threading.Thread.__init__(self)
59        self._stop_server = False
60        self.ready = threading.Event()
61        request_handler.protocol_version = "HTTP/1.0"
62        self.httpd = LoopbackHttpServer(("127.0.0.1", 0),
63                                        request_handler)
64        self.port = self.httpd.server_port
65
66    def stop(self):
67        """Stops the webserver if it's currently running."""
68
69        self._stop_server = True
70
71        self.join()
72        self.httpd.server_close()
73
74    def run(self):
75        self.ready.set()
76        while not self._stop_server:
77            self.httpd.handle_request()
78
79# Authentication infrastructure
80
81class DigestAuthHandler:
82    """Handler for performing digest authentication."""
83
84    def __init__(self):
85        self._request_num = 0
86        self._nonces = []
87        self._users = {}
88        self._realm_name = "Test Realm"
89        self._qop = "auth"
90
91    def set_qop(self, qop):
92        self._qop = qop
93
94    def set_users(self, users):
95        assert isinstance(users, dict)
96        self._users = users
97
98    def set_realm(self, realm):
99        self._realm_name = realm
100
101    def _generate_nonce(self):
102        self._request_num += 1
103        nonce = hashlib.md5(str(self._request_num).encode("ascii")).hexdigest()
104        self._nonces.append(nonce)
105        return nonce
106
107    def _create_auth_dict(self, auth_str):
108        first_space_index = auth_str.find(" ")
109        auth_str = auth_str[first_space_index+1:]
110
111        parts = auth_str.split(",")
112
113        auth_dict = {}
114        for part in parts:
115            name, value = part.split("=")
116            name = name.strip()
117            if value[0] == '"' and value[-1] == '"':
118                value = value[1:-1]
119            else:
120                value = value.strip()
121            auth_dict[name] = value
122        return auth_dict
123
124    def _validate_auth(self, auth_dict, password, method, uri):
125        final_dict = {}
126        final_dict.update(auth_dict)
127        final_dict["password"] = password
128        final_dict["method"] = method
129        final_dict["uri"] = uri
130        HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
131        HA1 = hashlib.md5(HA1_str.encode("ascii")).hexdigest()
132        HA2_str = "%(method)s:%(uri)s" % final_dict
133        HA2 = hashlib.md5(HA2_str.encode("ascii")).hexdigest()
134        final_dict["HA1"] = HA1
135        final_dict["HA2"] = HA2
136        response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
137                       "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
138        response = hashlib.md5(response_str.encode("ascii")).hexdigest()
139
140        return response == auth_dict["response"]
141
142    def _return_auth_challenge(self, request_handler):
143        request_handler.send_response(407, "Proxy Authentication Required")
144        request_handler.send_header("Content-Type", "text/html")
145        request_handler.send_header(
146            'Proxy-Authenticate', 'Digest realm="%s", '
147            'qop="%s",'
148            'nonce="%s", ' % \
149            (self._realm_name, self._qop, self._generate_nonce()))
150        # XXX: Not sure if we're supposed to add this next header or
151        # not.
152        #request_handler.send_header('Connection', 'close')
153        request_handler.end_headers()
154        request_handler.wfile.write(b"Proxy Authentication Required.")
155        return False
156
157    def handle_request(self, request_handler):
158        """Performs digest authentication on the given HTTP request
159        handler.  Returns True if authentication was successful, False
160        otherwise.
161
162        If no users have been set, then digest auth is effectively
163        disabled and this method will always return True.
164        """
165
166        if len(self._users) == 0:
167            return True
168
169        if "Proxy-Authorization" not in request_handler.headers:
170            return self._return_auth_challenge(request_handler)
171        else:
172            auth_dict = self._create_auth_dict(
173                request_handler.headers["Proxy-Authorization"]
174                )
175            if auth_dict["username"] in self._users:
176                password = self._users[ auth_dict["username"] ]
177            else:
178                return self._return_auth_challenge(request_handler)
179            if not auth_dict.get("nonce") in self._nonces:
180                return self._return_auth_challenge(request_handler)
181            else:
182                self._nonces.remove(auth_dict["nonce"])
183
184            auth_validated = False
185
186            # MSIE uses short_path in its validation, but Python's
187            # urllib.request uses the full path, so we're going to see if
188            # either of them works here.
189
190            for path in [request_handler.path, request_handler.short_path]:
191                if self._validate_auth(auth_dict,
192                                       password,
193                                       request_handler.command,
194                                       path):
195                    auth_validated = True
196
197            if not auth_validated:
198                return self._return_auth_challenge(request_handler)
199            return True
200
201
202class BasicAuthHandler(http.server.BaseHTTPRequestHandler):
203    """Handler for performing basic authentication."""
204    # Server side values
205    USER = 'testUser'
206    PASSWD = 'testPass'
207    REALM = 'Test'
208    USER_PASSWD = "%s:%s" % (USER, PASSWD)
209    ENCODED_AUTH = base64.b64encode(USER_PASSWD.encode('ascii')).decode('ascii')
210
211    def __init__(self, *args, **kwargs):
212        http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
213
214    def log_message(self, format, *args):
215        # Suppress console log message
216        pass
217
218    def do_HEAD(self):
219        self.send_response(200)
220        self.send_header("Content-type", "text/html")
221        self.end_headers()
222
223    def do_AUTHHEAD(self):
224        self.send_response(401)
225        self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM)
226        self.send_header("Content-type", "text/html")
227        self.end_headers()
228
229    def do_GET(self):
230        if not self.headers.get("Authorization", ""):
231            self.do_AUTHHEAD()
232            self.wfile.write(b"No Auth header received")
233        elif self.headers.get(
234                "Authorization", "") == "Basic " + self.ENCODED_AUTH:
235            self.send_response(200)
236            self.end_headers()
237            self.wfile.write(b"It works")
238        else:
239            # Request Unauthorized
240            self.do_AUTHHEAD()
241
242
243
244# Proxy test infrastructure
245
246class FakeProxyHandler(http.server.BaseHTTPRequestHandler):
247    """This is a 'fake proxy' that makes it look like the entire
248    internet has gone down due to a sudden zombie invasion.  It main
249    utility is in providing us with authentication support for
250    testing.
251    """
252
253    def __init__(self, digest_auth_handler, *args, **kwargs):
254        # This has to be set before calling our parent's __init__(), which will
255        # try to call do_GET().
256        self.digest_auth_handler = digest_auth_handler
257        http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
258
259    def log_message(self, format, *args):
260        # Uncomment the next line for debugging.
261        # sys.stderr.write(format % args)
262        pass
263
264    def do_GET(self):
265        (scm, netloc, path, params, query, fragment) = urllib.parse.urlparse(
266            self.path, "http")
267        self.short_path = path
268        if self.digest_auth_handler.handle_request(self):
269            self.send_response(200, "OK")
270            self.send_header("Content-Type", "text/html")
271            self.end_headers()
272            self.wfile.write(bytes("You've reached %s!<BR>" % self.path,
273                                   "ascii"))
274            self.wfile.write(b"Our apologies, but our server is down due to "
275                             b"a sudden zombie invasion.")
276
277# Test cases
278
279@unittest.skipUnless(threading, "Threading required for this test.")
280class BasicAuthTests(unittest.TestCase):
281    USER = "testUser"
282    PASSWD = "testPass"
283    INCORRECT_PASSWD = "Incorrect"
284    REALM = "Test"
285
286    def setUp(self):
287        super(BasicAuthTests, self).setUp()
288        # With Basic Authentication
289        def http_server_with_basic_auth_handler(*args, **kwargs):
290            return BasicAuthHandler(*args, **kwargs)
291        self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler)
292        self.addCleanup(self.server.stop)
293        self.server_url = 'http://127.0.0.1:%s' % self.server.port
294        self.server.start()
295        self.server.ready.wait()
296
297    def tearDown(self):
298        super(BasicAuthTests, self).tearDown()
299
300    def test_basic_auth_success(self):
301        ah = urllib.request.HTTPBasicAuthHandler()
302        ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD)
303        urllib.request.install_opener(urllib.request.build_opener(ah))
304        try:
305            self.assertTrue(urllib.request.urlopen(self.server_url))
306        except urllib.error.HTTPError:
307            self.fail("Basic auth failed for the url: %s", self.server_url)
308
309    def test_basic_auth_httperror(self):
310        ah = urllib.request.HTTPBasicAuthHandler()
311        ah.add_password(self.REALM, self.server_url, self.USER, self.INCORRECT_PASSWD)
312        urllib.request.install_opener(urllib.request.build_opener(ah))
313        self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, self.server_url)
314
315
316@unittest.skipUnless(threading, "Threading required for this test.")
317class ProxyAuthTests(unittest.TestCase):
318    URL = "http://localhost"
319
320    USER = "tester"
321    PASSWD = "test123"
322    REALM = "TestRealm"
323
324    def setUp(self):
325        super(ProxyAuthTests, self).setUp()
326        # Ignore proxy bypass settings in the environment.
327        def restore_environ(old_environ):
328            os.environ.clear()
329            os.environ.update(old_environ)
330        self.addCleanup(restore_environ, os.environ.copy())
331        os.environ['NO_PROXY'] = ''
332        os.environ['no_proxy'] = ''
333
334        self.digest_auth_handler = DigestAuthHandler()
335        self.digest_auth_handler.set_users({self.USER: self.PASSWD})
336        self.digest_auth_handler.set_realm(self.REALM)
337        # With Digest Authentication.
338        def create_fake_proxy_handler(*args, **kwargs):
339            return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
340
341        self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
342        self.server.start()
343        self.server.ready.wait()
344        proxy_url = "http://127.0.0.1:%d" % self.server.port
345        handler = urllib.request.ProxyHandler({"http" : proxy_url})
346        self.proxy_digest_handler = urllib.request.ProxyDigestAuthHandler()
347        self.opener = urllib.request.build_opener(
348            handler, self.proxy_digest_handler)
349
350    def tearDown(self):
351        self.server.stop()
352        super(ProxyAuthTests, self).tearDown()
353
354    def test_proxy_with_bad_password_raises_httperror(self):
355        self.proxy_digest_handler.add_password(self.REALM, self.URL,
356                                               self.USER, self.PASSWD+"bad")
357        self.digest_auth_handler.set_qop("auth")
358        self.assertRaises(urllib.error.HTTPError,
359                          self.opener.open,
360                          self.URL)
361
362    def test_proxy_with_no_password_raises_httperror(self):
363        self.digest_auth_handler.set_qop("auth")
364        self.assertRaises(urllib.error.HTTPError,
365                          self.opener.open,
366                          self.URL)
367
368    def test_proxy_qop_auth_works(self):
369        self.proxy_digest_handler.add_password(self.REALM, self.URL,
370                                               self.USER, self.PASSWD)
371        self.digest_auth_handler.set_qop("auth")
372        result = self.opener.open(self.URL)
373        while result.read():
374            pass
375        result.close()
376
377    def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
378        self.proxy_digest_handler.add_password(self.REALM, self.URL,
379                                               self.USER, self.PASSWD)
380        self.digest_auth_handler.set_qop("auth-int")
381        try:
382            result = self.opener.open(self.URL)
383        except urllib.error.URLError:
384            # It's okay if we don't support auth-int, but we certainly
385            # shouldn't receive any kind of exception here other than
386            # a URLError.
387            result = None
388        if result:
389            while result.read():
390                pass
391            result.close()
392
393
394def GetRequestHandler(responses):
395
396    class FakeHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
397
398        server_version = "TestHTTP/"
399        requests = []
400        headers_received = []
401        port = 80
402
403        def do_GET(self):
404            body = self.send_head()
405            while body:
406                done = self.wfile.write(body)
407                body = body[done:]
408
409        def do_POST(self):
410            content_length = self.headers["Content-Length"]
411            post_data = self.rfile.read(int(content_length))
412            self.do_GET()
413            self.requests.append(post_data)
414
415        def send_head(self):
416            FakeHTTPRequestHandler.headers_received = self.headers
417            self.requests.append(self.path)
418            response_code, headers, body = responses.pop(0)
419
420            self.send_response(response_code)
421
422            for (header, value) in headers:
423                self.send_header(header, value % {'port':self.port})
424            if body:
425                self.send_header("Content-type", "text/plain")
426                self.end_headers()
427                return body
428            self.end_headers()
429
430        def log_message(self, *args):
431            pass
432
433
434    return FakeHTTPRequestHandler
435
436
437@unittest.skipUnless(threading, "Threading required for this test.")
438class TestUrlopen(unittest.TestCase):
439    """Tests urllib.request.urlopen using the network.
440
441    These tests are not exhaustive.  Assuming that testing using files does a
442    good job overall of some of the basic interface features.  There are no
443    tests exercising the optional 'data' and 'proxies' arguments.  No tests
444    for transparent redirection have been written.
445    """
446
447    def setUp(self):
448        super(TestUrlopen, self).setUp()
449
450        # Ignore proxies for localhost tests.
451        def restore_environ(old_environ):
452            os.environ.clear()
453            os.environ.update(old_environ)
454        self.addCleanup(restore_environ, os.environ.copy())
455        os.environ['NO_PROXY'] = '*'
456        os.environ['no_proxy'] = '*'
457
458    def urlopen(self, url, data=None, **kwargs):
459        l = []
460        f = urllib.request.urlopen(url, data, **kwargs)
461        try:
462            # Exercise various methods
463            l.extend(f.readlines(200))
464            l.append(f.readline())
465            l.append(f.read(1024))
466            l.append(f.read())
467        finally:
468            f.close()
469        return b"".join(l)
470
471    def start_server(self, responses=None):
472        if responses is None:
473            responses = [(200, [], b"we don't care")]
474        handler = GetRequestHandler(responses)
475
476        self.server = LoopbackHttpServerThread(handler)
477        self.addCleanup(self.server.stop)
478        self.server.start()
479        self.server.ready.wait()
480        port = self.server.port
481        handler.port = port
482        return handler
483
484    def start_https_server(self, responses=None, **kwargs):
485        if not hasattr(urllib.request, 'HTTPSHandler'):
486            self.skipTest('ssl support required')
487        from test.ssl_servers import make_https_server
488        if responses is None:
489            responses = [(200, [], b"we care a bit")]
490        handler = GetRequestHandler(responses)
491        server = make_https_server(self, handler_class=handler, **kwargs)
492        handler.port = server.port
493        return handler
494
495    def test_redirection(self):
496        expected_response = b"We got here..."
497        responses = [
498            (302, [("Location", "http://localhost:%(port)s/somewhere_else")],
499             ""),
500            (200, [], expected_response)
501        ]
502
503        handler = self.start_server(responses)
504        data = self.urlopen("http://localhost:%s/" % handler.port)
505        self.assertEqual(data, expected_response)
506        self.assertEqual(handler.requests, ["/", "/somewhere_else"])
507
508    def test_chunked(self):
509        expected_response = b"hello world"
510        chunked_start = (
511                        b'a\r\n'
512                        b'hello worl\r\n'
513                        b'1\r\n'
514                        b'd\r\n'
515                        b'0\r\n'
516                        )
517        response = [(200, [("Transfer-Encoding", "chunked")], chunked_start)]
518        handler = self.start_server(response)
519        data = self.urlopen("http://localhost:%s/" % handler.port)
520        self.assertEqual(data, expected_response)
521
522    def test_404(self):
523        expected_response = b"Bad bad bad..."
524        handler = self.start_server([(404, [], expected_response)])
525
526        try:
527            self.urlopen("http://localhost:%s/weeble" % handler.port)
528        except urllib.error.URLError as f:
529            data = f.read()
530            f.close()
531        else:
532            self.fail("404 should raise URLError")
533
534        self.assertEqual(data, expected_response)
535        self.assertEqual(handler.requests, ["/weeble"])
536
537    def test_200(self):
538        expected_response = b"pycon 2008..."
539        handler = self.start_server([(200, [], expected_response)])
540        data = self.urlopen("http://localhost:%s/bizarre" % handler.port)
541        self.assertEqual(data, expected_response)
542        self.assertEqual(handler.requests, ["/bizarre"])
543
544    def test_200_with_parameters(self):
545        expected_response = b"pycon 2008..."
546        handler = self.start_server([(200, [], expected_response)])
547        data = self.urlopen("http://localhost:%s/bizarre" % handler.port,
548                             b"get=with_feeling")
549        self.assertEqual(data, expected_response)
550        self.assertEqual(handler.requests, ["/bizarre", b"get=with_feeling"])
551
552    def test_https(self):
553        handler = self.start_https_server()
554        context = ssl.create_default_context(cafile=CERT_localhost)
555        data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context)
556        self.assertEqual(data, b"we care a bit")
557
558    def test_https_with_cafile(self):
559        handler = self.start_https_server(certfile=CERT_localhost)
560        with support.check_warnings(('', DeprecationWarning)):
561            # Good cert
562            data = self.urlopen("https://localhost:%s/bizarre" % handler.port,
563                                cafile=CERT_localhost)
564            self.assertEqual(data, b"we care a bit")
565            # Bad cert
566            with self.assertRaises(urllib.error.URLError) as cm:
567                self.urlopen("https://localhost:%s/bizarre" % handler.port,
568                             cafile=CERT_fakehostname)
569            # Good cert, but mismatching hostname
570            handler = self.start_https_server(certfile=CERT_fakehostname)
571            with self.assertRaises(ssl.CertificateError) as cm:
572                self.urlopen("https://localhost:%s/bizarre" % handler.port,
573                             cafile=CERT_fakehostname)
574
575    def test_https_with_cadefault(self):
576        handler = self.start_https_server(certfile=CERT_localhost)
577        # Self-signed cert should fail verification with system certificate store
578        with support.check_warnings(('', DeprecationWarning)):
579            with self.assertRaises(urllib.error.URLError) as cm:
580                self.urlopen("https://localhost:%s/bizarre" % handler.port,
581                             cadefault=True)
582
583    def test_https_sni(self):
584        if ssl is None:
585            self.skipTest("ssl module required")
586        if not ssl.HAS_SNI:
587            self.skipTest("SNI support required in OpenSSL")
588        sni_name = None
589        def cb_sni(ssl_sock, server_name, initial_context):
590            nonlocal sni_name
591            sni_name = server_name
592        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
593        context.set_servername_callback(cb_sni)
594        handler = self.start_https_server(context=context, certfile=CERT_localhost)
595        context = ssl.create_default_context(cafile=CERT_localhost)
596        self.urlopen("https://localhost:%s" % handler.port, context=context)
597        self.assertEqual(sni_name, "localhost")
598
599    def test_sending_headers(self):
600        handler = self.start_server()
601        req = urllib.request.Request("http://localhost:%s/" % handler.port,
602                                     headers={"Range": "bytes=20-39"})
603        with urllib.request.urlopen(req):
604            pass
605        self.assertEqual(handler.headers_received["Range"], "bytes=20-39")
606
607    def test_basic(self):
608        handler = self.start_server()
609        open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
610        for attr in ("read", "close", "info", "geturl"):
611            self.assertTrue(hasattr(open_url, attr), "object returned from "
612                         "urlopen lacks the %s attribute" % attr)
613        try:
614            self.assertTrue(open_url.read(), "calling 'read' failed")
615        finally:
616            open_url.close()
617
618    def test_info(self):
619        handler = self.start_server()
620        open_url = urllib.request.urlopen(
621            "http://localhost:%s" % handler.port)
622        with open_url:
623            info_obj = open_url.info()
624        self.assertIsInstance(info_obj, email.message.Message,
625                              "object returned by 'info' is not an "
626                              "instance of email.message.Message")
627        self.assertEqual(info_obj.get_content_subtype(), "plain")
628
629    def test_geturl(self):
630        # Make sure same URL as opened is returned by geturl.
631        handler = self.start_server()
632        open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
633        with open_url:
634            url = open_url.geturl()
635        self.assertEqual(url, "http://localhost:%s" % handler.port)
636
637    def test_iteration(self):
638        expected_response = b"pycon 2008..."
639        handler = self.start_server([(200, [], expected_response)])
640        data = urllib.request.urlopen("http://localhost:%s" % handler.port)
641        for line in data:
642            self.assertEqual(line, expected_response)
643
644    def test_line_iteration(self):
645        lines = [b"We\n", b"got\n", b"here\n", b"verylong " * 8192 + b"\n"]
646        expected_response = b"".join(lines)
647        handler = self.start_server([(200, [], expected_response)])
648        data = urllib.request.urlopen("http://localhost:%s" % handler.port)
649        for index, line in enumerate(data):
650            self.assertEqual(line, lines[index],
651                             "Fetched line number %s doesn't match expected:\n"
652                             "    Expected length was %s, got %s" %
653                             (index, len(lines[index]), len(line)))
654        self.assertEqual(index + 1, len(lines))
655
656
657threads_key = None
658
659def setUpModule():
660    # Store the threading_setup in a key and ensure that it is cleaned up
661    # in the tearDown
662    global threads_key
663    threads_key = support.threading_setup()
664
665def tearDownModule():
666    if threads_key:
667        support.threading_cleanup(threads_key)
668
669if __name__ == "__main__":
670    unittest.main()
671