1#!/usr/bin/env python
2
3import unittest
4from test import test_support
5from test.test_urllib2 import sanepathname2url
6
7import socket
8import urllib2
9import os
10import sys
11
12TIMEOUT = 60  # seconds
13
14
15def _retry_thrice(func, exc, *args, **kwargs):
16    for i in range(3):
17        try:
18            return func(*args, **kwargs)
19        except exc, last_exc:
20            continue
21        except:
22            raise
23    raise last_exc
24
25def _wrap_with_retry_thrice(func, exc):
26    def wrapped(*args, **kwargs):
27        return _retry_thrice(func, exc, *args, **kwargs)
28    return wrapped
29
30# Connecting to remote hosts is flaky.  Make it more robust by retrying
31# the connection several times.
32_urlopen_with_retry = _wrap_with_retry_thrice(urllib2.urlopen, urllib2.URLError)
33
34
35class AuthTests(unittest.TestCase):
36    """Tests urllib2 authentication features."""
37
38## Disabled at the moment since there is no page under python.org which
39## could be used to HTTP authentication.
40#
41#    def test_basic_auth(self):
42#        import httplib
43#
44#        test_url = "http://www.python.org/test/test_urllib2/basic_auth"
45#        test_hostport = "www.python.org"
46#        test_realm = 'Test Realm'
47#        test_user = 'test.test_urllib2net'
48#        test_password = 'blah'
49#
50#        # failure
51#        try:
52#            _urlopen_with_retry(test_url)
53#        except urllib2.HTTPError, exc:
54#            self.assertEqual(exc.code, 401)
55#        else:
56#            self.fail("urlopen() should have failed with 401")
57#
58#        # success
59#        auth_handler = urllib2.HTTPBasicAuthHandler()
60#        auth_handler.add_password(test_realm, test_hostport,
61#                                  test_user, test_password)
62#        opener = urllib2.build_opener(auth_handler)
63#        f = opener.open('http://localhost/')
64#        response = _urlopen_with_retry("http://www.python.org/")
65#
66#        # The 'userinfo' URL component is deprecated by RFC 3986 for security
67#        # reasons, let's not implement it!  (it's already implemented for proxy
68#        # specification strings (that is, URLs or authorities specifying a
69#        # proxy), so we must keep that)
70#        self.assertRaises(httplib.InvalidURL,
71#                          urllib2.urlopen, "http://evil:thing@example.com")
72
73
74class CloseSocketTest(unittest.TestCase):
75
76    def test_close(self):
77        import httplib
78
79        # calling .close() on urllib2's response objects should close the
80        # underlying socket
81
82        # delve deep into response to fetch socket._socketobject
83        response = _urlopen_with_retry("http://www.python.org/")
84        abused_fileobject = response.fp
85        self.assertTrue(abused_fileobject.__class__ is socket._fileobject)
86        httpresponse = abused_fileobject._sock
87        self.assertTrue(httpresponse.__class__ is httplib.HTTPResponse)
88        fileobject = httpresponse.fp
89        self.assertTrue(fileobject.__class__ is socket._fileobject)
90
91        self.assertTrue(not fileobject.closed)
92        response.close()
93        self.assertTrue(fileobject.closed)
94
95class OtherNetworkTests(unittest.TestCase):
96    def setUp(self):
97        if 0:  # for debugging
98            import logging
99            logger = logging.getLogger("test_urllib2net")
100            logger.addHandler(logging.StreamHandler())
101
102    # XXX The rest of these tests aren't very good -- they don't check much.
103    # They do sometimes catch some major disasters, though.
104
105    def test_ftp(self):
106        urls = [
107            'ftp://ftp.kernel.org/pub/linux/kernel/README',
108            'ftp://ftp.kernel.org/pub/linux/kernel/non-existent-file',
109            #'ftp://ftp.kernel.org/pub/leenox/kernel/test',
110            'ftp://gatekeeper.research.compaq.com/pub/DEC/SRC'
111                '/research-reports/00README-Legal-Rules-Regs',
112            ]
113        self._test_urls(urls, self._extra_handlers())
114
115    def test_file(self):
116        TESTFN = test_support.TESTFN
117        f = open(TESTFN, 'w')
118        try:
119            f.write('hi there\n')
120            f.close()
121            urls = [
122                'file:'+sanepathname2url(os.path.abspath(TESTFN)),
123                ('file:///nonsensename/etc/passwd', None, urllib2.URLError),
124                ]
125            self._test_urls(urls, self._extra_handlers(), retry=True)
126        finally:
127            os.remove(TESTFN)
128
129        self.assertRaises(ValueError, urllib2.urlopen,'./relative_path/to/file')
130
131    # XXX Following test depends on machine configurations that are internal
132    # to CNRI.  Need to set up a public server with the right authentication
133    # configuration for test purposes.
134
135##     def test_cnri(self):
136##         if socket.gethostname() == 'bitdiddle':
137##             localhost = 'bitdiddle.cnri.reston.va.us'
138##         elif socket.gethostname() == 'bitdiddle.concentric.net':
139##             localhost = 'localhost'
140##         else:
141##             localhost = None
142##         if localhost is not None:
143##             urls = [
144##                 'file://%s/etc/passwd' % localhost,
145##                 'http://%s/simple/' % localhost,
146##                 'http://%s/digest/' % localhost,
147##                 'http://%s/not/found.h' % localhost,
148##                 ]
149
150##             bauth = HTTPBasicAuthHandler()
151##             bauth.add_password('basic_test_realm', localhost, 'jhylton',
152##                                'password')
153##             dauth = HTTPDigestAuthHandler()
154##             dauth.add_password('digest_test_realm', localhost, 'jhylton',
155##                                'password')
156
157##             self._test_urls(urls, self._extra_handlers()+[bauth, dauth])
158
159    def test_urlwithfrag(self):
160        urlwith_frag = "http://docs.python.org/2/glossary.html#glossary"
161        with test_support.transient_internet(urlwith_frag):
162            req = urllib2.Request(urlwith_frag)
163            res = urllib2.urlopen(req)
164            self.assertEqual(res.geturl(),
165                    "http://docs.python.org/2/glossary.html#glossary")
166
167    def test_fileno(self):
168        req = urllib2.Request("http://www.python.org")
169        opener = urllib2.build_opener()
170        res = opener.open(req)
171        try:
172            res.fileno()
173        except AttributeError:
174            self.fail("HTTPResponse object should return a valid fileno")
175        finally:
176            res.close()
177
178    def test_custom_headers(self):
179        url = "http://www.example.com"
180        with test_support.transient_internet(url):
181            opener = urllib2.build_opener()
182            request = urllib2.Request(url)
183            self.assertFalse(request.header_items())
184            opener.open(request)
185            self.assertTrue(request.header_items())
186            self.assertTrue(request.has_header('User-agent'))
187            request.add_header('User-Agent','Test-Agent')
188            opener.open(request)
189            self.assertEqual(request.get_header('User-agent'),'Test-Agent')
190
191    def test_sites_no_connection_close(self):
192        # Some sites do not send Connection: close header.
193        # Verify that those work properly. (#issue12576)
194
195        URL = 'http://www.imdb.com' # No Connection:close
196        with test_support.transient_internet(URL):
197            req = urllib2.urlopen(URL)
198            res = req.read()
199            self.assertTrue(res)
200
201    def _test_urls(self, urls, handlers, retry=True):
202        import time
203        import logging
204        debug = logging.getLogger("test_urllib2").debug
205
206        urlopen = urllib2.build_opener(*handlers).open
207        if retry:
208            urlopen = _wrap_with_retry_thrice(urlopen, urllib2.URLError)
209
210        for url in urls:
211            if isinstance(url, tuple):
212                url, req, expected_err = url
213            else:
214                req = expected_err = None
215            with test_support.transient_internet(url):
216                debug(url)
217                try:
218                    f = urlopen(url, req, TIMEOUT)
219                except EnvironmentError as err:
220                    debug(err)
221                    if expected_err:
222                        msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" %
223                               (expected_err, url, req, type(err), err))
224                        self.assertIsInstance(err, expected_err, msg)
225                except urllib2.URLError as err:
226                    if isinstance(err[0], socket.timeout):
227                        print >>sys.stderr, "<timeout: %s>" % url
228                        continue
229                    else:
230                        raise
231                else:
232                    try:
233                        with test_support.transient_internet(url):
234                            buf = f.read()
235                            debug("read %d bytes" % len(buf))
236                    except socket.timeout:
237                        print >>sys.stderr, "<timeout: %s>" % url
238                    f.close()
239            debug("******** next url coming up...")
240            time.sleep(0.1)
241
242    def _extra_handlers(self):
243        handlers = []
244
245        cfh = urllib2.CacheFTPHandler()
246        self.addCleanup(cfh.clear_cache)
247        cfh.setTimeout(1)
248        handlers.append(cfh)
249
250        return handlers
251
252
253class TimeoutTest(unittest.TestCase):
254    def test_http_basic(self):
255        self.assertTrue(socket.getdefaulttimeout() is None)
256        url = "http://www.python.org"
257        with test_support.transient_internet(url, timeout=None):
258            u = _urlopen_with_retry(url)
259            self.assertTrue(u.fp._sock.fp._sock.gettimeout() is None)
260
261    def test_http_default_timeout(self):
262        self.assertTrue(socket.getdefaulttimeout() is None)
263        url = "http://www.python.org"
264        with test_support.transient_internet(url):
265            socket.setdefaulttimeout(60)
266            try:
267                u = _urlopen_with_retry(url)
268            finally:
269                socket.setdefaulttimeout(None)
270            self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 60)
271
272    def test_http_no_timeout(self):
273        self.assertTrue(socket.getdefaulttimeout() is None)
274        url = "http://www.python.org"
275        with test_support.transient_internet(url):
276            socket.setdefaulttimeout(60)
277            try:
278                u = _urlopen_with_retry(url, timeout=None)
279            finally:
280                socket.setdefaulttimeout(None)
281            self.assertTrue(u.fp._sock.fp._sock.gettimeout() is None)
282
283    def test_http_timeout(self):
284        url = "http://www.python.org"
285        with test_support.transient_internet(url):
286            u = _urlopen_with_retry(url, timeout=120)
287            self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 120)
288
289    FTP_HOST = "ftp://ftp.mirror.nl/pub/gnu/"
290
291    def test_ftp_basic(self):
292        self.assertTrue(socket.getdefaulttimeout() is None)
293        with test_support.transient_internet(self.FTP_HOST, timeout=None):
294            u = _urlopen_with_retry(self.FTP_HOST)
295            self.assertTrue(u.fp.fp._sock.gettimeout() is None)
296
297    def test_ftp_default_timeout(self):
298        self.assertTrue(socket.getdefaulttimeout() is None)
299        with test_support.transient_internet(self.FTP_HOST):
300            socket.setdefaulttimeout(60)
301            try:
302                u = _urlopen_with_retry(self.FTP_HOST)
303            finally:
304                socket.setdefaulttimeout(None)
305            self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
306
307    def test_ftp_no_timeout(self):
308        self.assertTrue(socket.getdefaulttimeout() is None)
309        with test_support.transient_internet(self.FTP_HOST):
310            socket.setdefaulttimeout(60)
311            try:
312                u = _urlopen_with_retry(self.FTP_HOST, timeout=None)
313            finally:
314                socket.setdefaulttimeout(None)
315            self.assertTrue(u.fp.fp._sock.gettimeout() is None)
316
317    def test_ftp_timeout(self):
318        with test_support.transient_internet(self.FTP_HOST):
319            u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
320            self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
321
322
323def test_main():
324    test_support.requires("network")
325    test_support.run_unittest(AuthTests,
326                              OtherNetworkTests,
327                              CloseSocketTest,
328                              TimeoutTest,
329                              )
330
331if __name__ == "__main__":
332    test_main()
333