1import unittest
2from test import test_support
3from test.test_urllib2 import sanepathname2url
4
5import socket
6import urllib2
7import os
8import sys
9
10TIMEOUT = 60  # seconds
11
12
13def _retry_thrice(func, exc, *args, **kwargs):
14    for i in range(3):
15        try:
16            return func(*args, **kwargs)
17        except exc, last_exc:
18            continue
19        except:
20            raise
21    raise last_exc
22
23def _wrap_with_retry_thrice(func, exc):
24    def wrapped(*args, **kwargs):
25        return _retry_thrice(func, exc, *args, **kwargs)
26    return wrapped
27
28# Connecting to remote hosts is flaky.  Make it more robust by retrying
29# the connection several times.
30_urlopen_with_retry = _wrap_with_retry_thrice(urllib2.urlopen, urllib2.URLError)
31
32
33class AuthTests(unittest.TestCase):
34    """Tests urllib2 authentication features."""
35
36## Disabled at the moment since there is no page under python.org which
37## could be used to HTTP authentication.
38#
39#    def test_basic_auth(self):
40#        import httplib
41#
42#        test_url = "http://www.python.org/test/test_urllib2/basic_auth"
43#        test_hostport = "www.python.org"
44#        test_realm = 'Test Realm'
45#        test_user = 'test.test_urllib2net'
46#        test_password = 'blah'
47#
48#        # failure
49#        try:
50#            _urlopen_with_retry(test_url)
51#        except urllib2.HTTPError, exc:
52#            self.assertEqual(exc.code, 401)
53#        else:
54#            self.fail("urlopen() should have failed with 401")
55#
56#        # success
57#        auth_handler = urllib2.HTTPBasicAuthHandler()
58#        auth_handler.add_password(test_realm, test_hostport,
59#                                  test_user, test_password)
60#        opener = urllib2.build_opener(auth_handler)
61#        f = opener.open('http://localhost/')
62#        response = _urlopen_with_retry("http://www.python.org/")
63#
64#        # The 'userinfo' URL component is deprecated by RFC 3986 for security
65#        # reasons, let's not implement it!  (it's already implemented for proxy
66#        # specification strings (that is, URLs or authorities specifying a
67#        # proxy), so we must keep that)
68#        self.assertRaises(httplib.InvalidURL,
69#                          urllib2.urlopen, "http://evil:thing@example.com")
70
71
72class CloseSocketTest(unittest.TestCase):
73
74    def test_close(self):
75        import httplib
76
77        # calling .close() on urllib2's response objects should close the
78        # underlying socket
79
80        # delve deep into response to fetch socket._socketobject
81        response = _urlopen_with_retry("http://www.example.com/")
82        abused_fileobject = response.fp
83        self.assertIs(abused_fileobject.__class__, socket._fileobject)
84        httpresponse = abused_fileobject._sock
85        self.assertIs(httpresponse.__class__, httplib.HTTPResponse)
86        fileobject = httpresponse.fp
87        self.assertIs(fileobject.__class__, socket._fileobject)
88
89        self.assertTrue(not fileobject.closed)
90        response.close()
91        self.assertTrue(fileobject.closed)
92
93class OtherNetworkTests(unittest.TestCase):
94    def setUp(self):
95        if 0:  # for debugging
96            import logging
97            logger = logging.getLogger("test_urllib2net")
98            logger.addHandler(logging.StreamHandler())
99
100    # XXX The rest of these tests aren't very good -- they don't check much.
101    # They do sometimes catch some major disasters, though.
102
103    def test_ftp(self):
104        urls = [
105            'ftp://ftp.debian.org/debian/README',
106            ('ftp://ftp.debian.org/debian/non-existent-file',
107             None, urllib2.URLError),
108            ]
109        self._test_urls(urls, self._extra_handlers())
110
111    def test_file(self):
112        TESTFN = test_support.TESTFN
113        f = open(TESTFN, 'w')
114        try:
115            f.write('hi there\n')
116            f.close()
117            urls = [
118                'file:'+sanepathname2url(os.path.abspath(TESTFN)),
119                ('file:///nonsensename/etc/passwd', None, urllib2.URLError),
120                ]
121            self._test_urls(urls, self._extra_handlers(), retry=True)
122        finally:
123            os.remove(TESTFN)
124
125        self.assertRaises(ValueError, urllib2.urlopen,'./relative_path/to/file')
126
127    # XXX Following test depends on machine configurations that are internal
128    # to CNRI.  Need to set up a public server with the right authentication
129    # configuration for test purposes.
130
131##     def test_cnri(self):
132##         if socket.gethostname() == 'bitdiddle':
133##             localhost = 'bitdiddle.cnri.reston.va.us'
134##         elif socket.gethostname() == 'bitdiddle.concentric.net':
135##             localhost = 'localhost'
136##         else:
137##             localhost = None
138##         if localhost is not None:
139##             urls = [
140##                 'file://%s/etc/passwd' % localhost,
141##                 'http://%s/simple/' % localhost,
142##                 'http://%s/digest/' % localhost,
143##                 'http://%s/not/found.h' % localhost,
144##                 ]
145
146##             bauth = HTTPBasicAuthHandler()
147##             bauth.add_password('basic_test_realm', localhost, 'jhylton',
148##                                'password')
149##             dauth = HTTPDigestAuthHandler()
150##             dauth.add_password('digest_test_realm', localhost, 'jhylton',
151##                                'password')
152
153##             self._test_urls(urls, self._extra_handlers()+[bauth, dauth])
154
155    def test_urlwithfrag(self):
156        urlwith_frag = "http://www.pythontest.net/index.html#frag"
157        with test_support.transient_internet(urlwith_frag):
158            req = urllib2.Request(urlwith_frag)
159            res = urllib2.urlopen(req)
160            self.assertEqual(res.geturl(),
161                    "http://www.pythontest.net/index.html#frag")
162
163    def test_fileno(self):
164        req = urllib2.Request("http://www.example.com")
165        opener = urllib2.build_opener()
166        res = opener.open(req)
167        try:
168            res.fileno()
169        except AttributeError:
170            self.fail("HTTPResponse object should return a valid fileno")
171        finally:
172            res.close()
173
174    def test_custom_headers(self):
175        url = "http://www.example.com"
176        with test_support.transient_internet(url):
177            opener = urllib2.build_opener()
178            request = urllib2.Request(url)
179            self.assertFalse(request.header_items())
180            opener.open(request)
181            self.assertTrue(request.header_items())
182            self.assertTrue(request.has_header('User-agent'))
183            request.add_header('User-Agent','Test-Agent')
184            opener.open(request)
185            self.assertEqual(request.get_header('User-agent'),'Test-Agent')
186
187    def test_sites_no_connection_close(self):
188        # Some sites do not send Connection: close header.
189        # Verify that those work properly. (#issue12576)
190
191        URL = 'http://www.imdb.com' # No Connection:close
192        with test_support.transient_internet(URL):
193            req = urllib2.urlopen(URL)
194            res = req.read()
195            self.assertTrue(res)
196
197    def _test_urls(self, urls, handlers, retry=True):
198        import time
199        import logging
200        debug = logging.getLogger("test_urllib2").debug
201
202        urlopen = urllib2.build_opener(*handlers).open
203        if retry:
204            urlopen = _wrap_with_retry_thrice(urlopen, urllib2.URLError)
205
206        for url in urls:
207            if isinstance(url, tuple):
208                url, req, expected_err = url
209            else:
210                req = expected_err = None
211            with test_support.transient_internet(url):
212                debug(url)
213                try:
214                    f = urlopen(url, req, TIMEOUT)
215                except EnvironmentError as err:
216                    debug(err)
217                    if expected_err:
218                        msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" %
219                               (expected_err, url, req, type(err), err))
220                        self.assertIsInstance(err, expected_err, msg)
221                except urllib2.URLError as err:
222                    if isinstance(err[0], socket.timeout):
223                        print >>sys.stderr, "<timeout: %s>" % url
224                        continue
225                    else:
226                        raise
227                else:
228                    try:
229                        with test_support.transient_internet(url):
230                            buf = f.read()
231                            debug("read %d bytes" % len(buf))
232                    except socket.timeout:
233                        print >>sys.stderr, "<timeout: %s>" % url
234                    f.close()
235            debug("******** next url coming up...")
236            time.sleep(0.1)
237
238    def _extra_handlers(self):
239        handlers = []
240
241        cfh = urllib2.CacheFTPHandler()
242        self.addCleanup(cfh.clear_cache)
243        cfh.setTimeout(1)
244        handlers.append(cfh)
245
246        return handlers
247
248
249class TimeoutTest(unittest.TestCase):
250    def test_http_basic(self):
251        self.assertIsNone(socket.getdefaulttimeout())
252        url = "http://www.example.com"
253        with test_support.transient_internet(url, timeout=None):
254            u = _urlopen_with_retry(url)
255            self.assertIsNone(u.fp._sock.fp._sock.gettimeout())
256
257    def test_http_default_timeout(self):
258        self.assertIsNone(socket.getdefaulttimeout())
259        url = "http://www.example.com"
260        with test_support.transient_internet(url):
261            socket.setdefaulttimeout(60)
262            try:
263                u = _urlopen_with_retry(url)
264            finally:
265                socket.setdefaulttimeout(None)
266            self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 60)
267
268    def test_http_no_timeout(self):
269        self.assertIsNone(socket.getdefaulttimeout())
270        url = "http://www.example.com"
271        with test_support.transient_internet(url):
272            socket.setdefaulttimeout(60)
273            try:
274                u = _urlopen_with_retry(url, timeout=None)
275            finally:
276                socket.setdefaulttimeout(None)
277            self.assertIsNone(u.fp._sock.fp._sock.gettimeout())
278
279    def test_http_timeout(self):
280        url = "http://www.example.com"
281        with test_support.transient_internet(url):
282            u = _urlopen_with_retry(url, timeout=120)
283            self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 120)
284
285    FTP_HOST = 'ftp://ftp.debian.org/debian/'
286
287    def test_ftp_basic(self):
288        self.assertIsNone(socket.getdefaulttimeout())
289        with test_support.transient_internet(self.FTP_HOST, timeout=None):
290            u = _urlopen_with_retry(self.FTP_HOST)
291            self.assertIsNone(u.fp.fp._sock.gettimeout())
292
293    def test_ftp_default_timeout(self):
294        self.assertIsNone(socket.getdefaulttimeout())
295        with test_support.transient_internet(self.FTP_HOST):
296            socket.setdefaulttimeout(60)
297            try:
298                u = _urlopen_with_retry(self.FTP_HOST)
299            finally:
300                socket.setdefaulttimeout(None)
301            self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
302
303    def test_ftp_no_timeout(self):
304        self.assertIsNone(socket.getdefaulttimeout(),)
305        with test_support.transient_internet(self.FTP_HOST):
306            socket.setdefaulttimeout(60)
307            try:
308                u = _urlopen_with_retry(self.FTP_HOST, timeout=None)
309            finally:
310                socket.setdefaulttimeout(None)
311            self.assertIsNone(u.fp.fp._sock.gettimeout())
312
313    def test_ftp_timeout(self):
314        with test_support.transient_internet(self.FTP_HOST):
315            u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
316            self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
317
318
319def test_main():
320    test_support.requires("network")
321    test_support.run_unittest(AuthTests,
322                              OtherNetworkTests,
323                              CloseSocketTest,
324                              TimeoutTest,
325                              )
326
327if __name__ == "__main__":
328    test_main()
329