1#!/usr/bin/env python
2
3import urlparse
4import urllib2
5import BaseHTTPServer
6import unittest
7import hashlib
8
9from test import test_support
10
11mimetools = test_support.import_module('mimetools', deprecated=True)
12threading = test_support.import_module('threading')
13
14# Loopback http server infrastructure
15
16class LoopbackHttpServer(BaseHTTPServer.HTTPServer):
17    """HTTP server w/ a few modifications that make it useful for
18    loopback testing purposes.
19    """
20
21    def __init__(self, server_address, RequestHandlerClass):
22        BaseHTTPServer.HTTPServer.__init__(self,
23                                           server_address,
24                                           RequestHandlerClass)
25
26        # Set the timeout of our listening socket really low so
27        # that we can stop the server easily.
28        self.socket.settimeout(1.0)
29
30    def get_request(self):
31        """BaseHTTPServer method, overridden."""
32
33        request, client_address = self.socket.accept()
34
35        # It's a loopback connection, so setting the timeout
36        # really low shouldn't affect anything, but should make
37        # deadlocks less likely to occur.
38        request.settimeout(10.0)
39
40        return (request, client_address)
41
42class LoopbackHttpServerThread(threading.Thread):
43    """Stoppable thread that runs a loopback http server."""
44
45    def __init__(self, request_handler):
46        threading.Thread.__init__(self)
47        self._stop = False
48        self.ready = threading.Event()
49        request_handler.protocol_version = "HTTP/1.0"
50        self.httpd = LoopbackHttpServer(('127.0.0.1', 0),
51                                        request_handler)
52        #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
53        #                                      self.httpd.server_port)
54        self.port = self.httpd.server_port
55
56    def stop(self):
57        """Stops the webserver if it's currently running."""
58
59        # Set the stop flag.
60        self._stop = True
61
62        self.join()
63
64    def run(self):
65        self.ready.set()
66        while not self._stop:
67            self.httpd.handle_request()
68
69# Authentication infrastructure
70
71class DigestAuthHandler:
72    """Handler for performing digest authentication."""
73
74    def __init__(self):
75        self._request_num = 0
76        self._nonces = []
77        self._users = {}
78        self._realm_name = "Test Realm"
79        self._qop = "auth"
80
81    def set_qop(self, qop):
82        self._qop = qop
83
84    def set_users(self, users):
85        assert isinstance(users, dict)
86        self._users = users
87
88    def set_realm(self, realm):
89        self._realm_name = realm
90
91    def _generate_nonce(self):
92        self._request_num += 1
93        nonce = hashlib.md5(str(self._request_num)).hexdigest()
94        self._nonces.append(nonce)
95        return nonce
96
97    def _create_auth_dict(self, auth_str):
98        first_space_index = auth_str.find(" ")
99        auth_str = auth_str[first_space_index+1:]
100
101        parts = auth_str.split(",")
102
103        auth_dict = {}
104        for part in parts:
105            name, value = part.split("=")
106            name = name.strip()
107            if value[0] == '"' and value[-1] == '"':
108                value = value[1:-1]
109            else:
110                value = value.strip()
111            auth_dict[name] = value
112        return auth_dict
113
114    def _validate_auth(self, auth_dict, password, method, uri):
115        final_dict = {}
116        final_dict.update(auth_dict)
117        final_dict["password"] = password
118        final_dict["method"] = method
119        final_dict["uri"] = uri
120        HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
121        HA1 = hashlib.md5(HA1_str).hexdigest()
122        HA2_str = "%(method)s:%(uri)s" % final_dict
123        HA2 = hashlib.md5(HA2_str).hexdigest()
124        final_dict["HA1"] = HA1
125        final_dict["HA2"] = HA2
126        response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
127                       "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
128        response = hashlib.md5(response_str).hexdigest()
129
130        return response == auth_dict["response"]
131
132    def _return_auth_challenge(self, request_handler):
133        request_handler.send_response(407, "Proxy Authentication Required")
134        request_handler.send_header("Content-Type", "text/html")
135        request_handler.send_header(
136            'Proxy-Authenticate', 'Digest realm="%s", '
137            'qop="%s",'
138            'nonce="%s", ' % \
139            (self._realm_name, self._qop, self._generate_nonce()))
140        # XXX: Not sure if we're supposed to add this next header or
141        # not.
142        #request_handler.send_header('Connection', 'close')
143        request_handler.end_headers()
144        request_handler.wfile.write("Proxy Authentication Required.")
145        return False
146
147    def handle_request(self, request_handler):
148        """Performs digest authentication on the given HTTP request
149        handler.  Returns True if authentication was successful, False
150        otherwise.
151
152        If no users have been set, then digest auth is effectively
153        disabled and this method will always return True.
154        """
155
156        if len(self._users) == 0:
157            return True
158
159        if 'Proxy-Authorization' not in request_handler.headers:
160            return self._return_auth_challenge(request_handler)
161        else:
162            auth_dict = self._create_auth_dict(
163                request_handler.headers['Proxy-Authorization']
164                )
165            if auth_dict["username"] in self._users:
166                password = self._users[ auth_dict["username"] ]
167            else:
168                return self._return_auth_challenge(request_handler)
169            if not auth_dict.get("nonce") in self._nonces:
170                return self._return_auth_challenge(request_handler)
171            else:
172                self._nonces.remove(auth_dict["nonce"])
173
174            auth_validated = False
175
176            # MSIE uses short_path in its validation, but Python's
177            # urllib2 uses the full path, so we're going to see if
178            # either of them works here.
179
180            for path in [request_handler.path, request_handler.short_path]:
181                if self._validate_auth(auth_dict,
182                                       password,
183                                       request_handler.command,
184                                       path):
185                    auth_validated = True
186
187            if not auth_validated:
188                return self._return_auth_challenge(request_handler)
189            return True
190
191# Proxy test infrastructure
192
193class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
194    """This is a 'fake proxy' that makes it look like the entire
195    internet has gone down due to a sudden zombie invasion.  It main
196    utility is in providing us with authentication support for
197    testing.
198    """
199
200    def __init__(self, digest_auth_handler, *args, **kwargs):
201        # This has to be set before calling our parent's __init__(), which will
202        # try to call do_GET().
203        self.digest_auth_handler = digest_auth_handler
204        BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
205
206    def log_message(self, format, *args):
207        # Uncomment the next line for debugging.
208        #sys.stderr.write(format % args)
209        pass
210
211    def do_GET(self):
212        (scm, netloc, path, params, query, fragment) = urlparse.urlparse(
213            self.path, 'http')
214        self.short_path = path
215        if self.digest_auth_handler.handle_request(self):
216            self.send_response(200, "OK")
217            self.send_header("Content-Type", "text/html")
218            self.end_headers()
219            self.wfile.write("You've reached %s!<BR>" % self.path)
220            self.wfile.write("Our apologies, but our server is down due to "
221                              "a sudden zombie invasion.")
222
223# Test cases
224
225class BaseTestCase(unittest.TestCase):
226    def setUp(self):
227        self._threads = test_support.threading_setup()
228
229    def tearDown(self):
230        test_support.threading_cleanup(*self._threads)
231
232
233class ProxyAuthTests(BaseTestCase):
234    URL = "http://localhost"
235
236    USER = "tester"
237    PASSWD = "test123"
238    REALM = "TestRealm"
239
240    def setUp(self):
241        super(ProxyAuthTests, self).setUp()
242        self.digest_auth_handler = DigestAuthHandler()
243        self.digest_auth_handler.set_users({self.USER: self.PASSWD})
244        self.digest_auth_handler.set_realm(self.REALM)
245        def create_fake_proxy_handler(*args, **kwargs):
246            return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
247
248        self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
249        self.server.start()
250        self.server.ready.wait()
251        proxy_url = "http://127.0.0.1:%d" % self.server.port
252        handler = urllib2.ProxyHandler({"http" : proxy_url})
253        self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler()
254        self.opener = urllib2.build_opener(handler, self.proxy_digest_handler)
255
256    def tearDown(self):
257        self.server.stop()
258        super(ProxyAuthTests, self).tearDown()
259
260    def test_proxy_with_bad_password_raises_httperror(self):
261        self.proxy_digest_handler.add_password(self.REALM, self.URL,
262                                               self.USER, self.PASSWD+"bad")
263        self.digest_auth_handler.set_qop("auth")
264        self.assertRaises(urllib2.HTTPError,
265                          self.opener.open,
266                          self.URL)
267
268    def test_proxy_with_no_password_raises_httperror(self):
269        self.digest_auth_handler.set_qop("auth")
270        self.assertRaises(urllib2.HTTPError,
271                          self.opener.open,
272                          self.URL)
273
274    def test_proxy_qop_auth_works(self):
275        self.proxy_digest_handler.add_password(self.REALM, self.URL,
276                                               self.USER, self.PASSWD)
277        self.digest_auth_handler.set_qop("auth")
278        result = self.opener.open(self.URL)
279        while result.read():
280            pass
281        result.close()
282
283    def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
284        self.proxy_digest_handler.add_password(self.REALM, self.URL,
285                                               self.USER, self.PASSWD)
286        self.digest_auth_handler.set_qop("auth-int")
287        try:
288            result = self.opener.open(self.URL)
289        except urllib2.URLError:
290            # It's okay if we don't support auth-int, but we certainly
291            # shouldn't receive any kind of exception here other than
292            # a URLError.
293            result = None
294        if result:
295            while result.read():
296                pass
297            result.close()
298
299
300def GetRequestHandler(responses):
301
302    class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
303
304        server_version = "TestHTTP/"
305        requests = []
306        headers_received = []
307        port = 80
308
309        def do_GET(self):
310            body = self.send_head()
311            if body:
312                self.wfile.write(body)
313
314        def do_POST(self):
315            content_length = self.headers['Content-Length']
316            post_data = self.rfile.read(int(content_length))
317            self.do_GET()
318            self.requests.append(post_data)
319
320        def send_head(self):
321            FakeHTTPRequestHandler.headers_received = self.headers
322            self.requests.append(self.path)
323            response_code, headers, body = responses.pop(0)
324
325            self.send_response(response_code)
326
327            for (header, value) in headers:
328                self.send_header(header, value % self.port)
329            if body:
330                self.send_header('Content-type', 'text/plain')
331                self.end_headers()
332                return body
333            self.end_headers()
334
335        def log_message(self, *args):
336            pass
337
338
339    return FakeHTTPRequestHandler
340
341
342class TestUrlopen(BaseTestCase):
343    """Tests urllib2.urlopen using the network.
344
345    These tests are not exhaustive.  Assuming that testing using files does a
346    good job overall of some of the basic interface features.  There are no
347    tests exercising the optional 'data' and 'proxies' arguments.  No tests
348    for transparent redirection have been written.
349    """
350
351    def setUp(self):
352        proxy_handler = urllib2.ProxyHandler({})
353        opener = urllib2.build_opener(proxy_handler)
354        urllib2.install_opener(opener)
355        super(TestUrlopen, self).setUp()
356
357    def start_server(self, responses):
358        handler = GetRequestHandler(responses)
359
360        self.server = LoopbackHttpServerThread(handler)
361        self.server.start()
362        self.server.ready.wait()
363        port = self.server.port
364        handler.port = port
365        return handler
366
367
368    def test_redirection(self):
369        expected_response = 'We got here...'
370        responses = [
371            (302, [('Location', 'http://localhost:%s/somewhere_else')], ''),
372            (200, [], expected_response)
373        ]
374
375        handler = self.start_server(responses)
376
377        try:
378            f = urllib2.urlopen('http://localhost:%s/' % handler.port)
379            data = f.read()
380            f.close()
381
382            self.assertEqual(data, expected_response)
383            self.assertEqual(handler.requests, ['/', '/somewhere_else'])
384        finally:
385            self.server.stop()
386
387
388    def test_404(self):
389        expected_response = 'Bad bad bad...'
390        handler = self.start_server([(404, [], expected_response)])
391
392        try:
393            try:
394                urllib2.urlopen('http://localhost:%s/weeble' % handler.port)
395            except urllib2.URLError, f:
396                pass
397            else:
398                self.fail('404 should raise URLError')
399
400            data = f.read()
401            f.close()
402
403            self.assertEqual(data, expected_response)
404            self.assertEqual(handler.requests, ['/weeble'])
405        finally:
406            self.server.stop()
407
408
409    def test_200(self):
410        expected_response = 'pycon 2008...'
411        handler = self.start_server([(200, [], expected_response)])
412
413        try:
414            f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port)
415            data = f.read()
416            f.close()
417
418            self.assertEqual(data, expected_response)
419            self.assertEqual(handler.requests, ['/bizarre'])
420        finally:
421            self.server.stop()
422
423    def test_200_with_parameters(self):
424        expected_response = 'pycon 2008...'
425        handler = self.start_server([(200, [], expected_response)])
426
427        try:
428            f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port, 'get=with_feeling')
429            data = f.read()
430            f.close()
431
432            self.assertEqual(data, expected_response)
433            self.assertEqual(handler.requests, ['/bizarre', 'get=with_feeling'])
434        finally:
435            self.server.stop()
436
437
438    def test_sending_headers(self):
439        handler = self.start_server([(200, [], "we don't care")])
440
441        try:
442            req = urllib2.Request("http://localhost:%s/" % handler.port,
443                                  headers={'Range': 'bytes=20-39'})
444            urllib2.urlopen(req)
445            self.assertEqual(handler.headers_received['Range'], 'bytes=20-39')
446        finally:
447            self.server.stop()
448
449    def test_basic(self):
450        handler = self.start_server([(200, [], "we don't care")])
451
452        try:
453            open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
454            for attr in ("read", "close", "info", "geturl"):
455                self.assertTrue(hasattr(open_url, attr), "object returned from "
456                             "urlopen lacks the %s attribute" % attr)
457            try:
458                self.assertTrue(open_url.read(), "calling 'read' failed")
459            finally:
460                open_url.close()
461        finally:
462            self.server.stop()
463
464    def test_info(self):
465        handler = self.start_server([(200, [], "we don't care")])
466
467        try:
468            open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
469            info_obj = open_url.info()
470            self.assertIsInstance(info_obj, mimetools.Message,
471                                  "object returned by 'info' is not an "
472                                  "instance of mimetools.Message")
473            self.assertEqual(info_obj.getsubtype(), "plain")
474        finally:
475            self.server.stop()
476
477    def test_geturl(self):
478        # Make sure same URL as opened is returned by geturl.
479        handler = self.start_server([(200, [], "we don't care")])
480
481        try:
482            open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
483            url = open_url.geturl()
484            self.assertEqual(url, "http://localhost:%s" % handler.port)
485        finally:
486            self.server.stop()
487
488
489    def test_bad_address(self):
490        # Make sure proper exception is raised when connecting to a bogus
491        # address.
492
493        # as indicated by the comment below, this might fail with some ISP,
494        # so we run the test only when -unetwork/-uall is specified to
495        # mitigate the problem a bit (see #17564)
496        test_support.requires('network')
497        self.assertRaises(IOError,
498                          # Given that both VeriSign and various ISPs have in
499                          # the past or are presently hijacking various invalid
500                          # domain name requests in an attempt to boost traffic
501                          # to their own sites, finding a domain name to use
502                          # for this test is difficult.  RFC2606 leads one to
503                          # believe that '.invalid' should work, but experience
504                          # seemed to indicate otherwise.  Single character
505                          # TLDs are likely to remain invalid, so this seems to
506                          # be the best choice. The trailing '.' prevents a
507                          # related problem: The normal DNS resolver appends
508                          # the domain names from the search path if there is
509                          # no '.' the end and, and if one of those domains
510                          # implements a '*' rule a result is returned.
511                          # However, none of this will prevent the test from
512                          # failing if the ISP hijacks all invalid domain
513                          # requests.  The real solution would be to be able to
514                          # parameterize the framework with a mock resolver.
515                          urllib2.urlopen, "http://sadflkjsasf.i.nvali.d./")
516
517    def test_iteration(self):
518        expected_response = "pycon 2008..."
519        handler = self.start_server([(200, [], expected_response)])
520        try:
521            data = urllib2.urlopen("http://localhost:%s" % handler.port)
522            for line in data:
523                self.assertEqual(line, expected_response)
524        finally:
525            self.server.stop()
526
527    def ztest_line_iteration(self):
528        lines = ["We\n", "got\n", "here\n", "verylong " * 8192 + "\n"]
529        expected_response = "".join(lines)
530        handler = self.start_server([(200, [], expected_response)])
531        try:
532            data = urllib2.urlopen("http://localhost:%s" % handler.port)
533            for index, line in enumerate(data):
534                self.assertEqual(line, lines[index],
535                                 "Fetched line number %s doesn't match expected:\n"
536                                 "    Expected length was %s, got %s" %
537                                 (index, len(lines[index]), len(line)))
538        finally:
539            self.server.stop()
540        self.assertEqual(index + 1, len(lines))
541
542def test_main():
543    # We will NOT depend on the network resource flag
544    # (Lib/test/regrtest.py -u network) since all tests here are only
545    # localhost.  However, if this is a bad rationale, then uncomment
546    # the next line.
547    #test_support.requires("network")
548
549    test_support.run_unittest(ProxyAuthTests, TestUrlopen)
550
551if __name__ == "__main__":
552    test_main()
553