1import unittest
2from test import support
3from test.test_urllib2 import sanepathname2url
4
5import os
6import socket
7import urllib.error
8import urllib.request
9import sys
10
11support.requires("network")
12
13TIMEOUT = 60  # seconds
14
15
16def _retry_thrice(func, exc, *args, **kwargs):
17    for i in range(3):
18        try:
19            return func(*args, **kwargs)
20        except exc as e:
21            last_exc = e
22            continue
23    raise last_exc
24
25def _wrap_with_retry_thrice(func, exc):
26    def wrapped(*args, **kwargs):
27        return _retry_thrice(func, exc, *args, **kwargs)
28    return wrapped
29
30# Connecting to remote hosts is flaky.  Make it more robust by retrying
31# the connection several times.
32_urlopen_with_retry = _wrap_with_retry_thrice(urllib.request.urlopen,
33                                              urllib.error.URLError)
34
35
36class AuthTests(unittest.TestCase):
37    """Tests urllib2 authentication features."""
38
39## Disabled at the moment since there is no page under python.org which
40## could be used to HTTP authentication.
41#
42#    def test_basic_auth(self):
43#        import http.client
44#
45#        test_url = "http://www.python.org/test/test_urllib2/basic_auth"
46#        test_hostport = "www.python.org"
47#        test_realm = 'Test Realm'
48#        test_user = 'test.test_urllib2net'
49#        test_password = 'blah'
50#
51#        # failure
52#        try:
53#            _urlopen_with_retry(test_url)
54#        except urllib2.HTTPError, exc:
55#            self.assertEqual(exc.code, 401)
56#        else:
57#            self.fail("urlopen() should have failed with 401")
58#
59#        # success
60#        auth_handler = urllib2.HTTPBasicAuthHandler()
61#        auth_handler.add_password(test_realm, test_hostport,
62#                                  test_user, test_password)
63#        opener = urllib2.build_opener(auth_handler)
64#        f = opener.open('http://localhost/')
65#        response = _urlopen_with_retry("http://www.python.org/")
66#
67#        # The 'userinfo' URL component is deprecated by RFC 3986 for security
68#        # reasons, let's not implement it!  (it's already implemented for proxy
69#        # specification strings (that is, URLs or authorities specifying a
70#        # proxy), so we must keep that)
71#        self.assertRaises(http.client.InvalidURL,
72#                          urllib2.urlopen, "http://evil:thing@example.com")
73
74
75class CloseSocketTest(unittest.TestCase):
76
77    def test_close(self):
78        # calling .close() on urllib2's response objects should close the
79        # underlying socket
80        url = "http://www.example.com/"
81        with support.transient_internet(url):
82            response = _urlopen_with_retry(url)
83            sock = response.fp
84            self.assertFalse(sock.closed)
85            response.close()
86            self.assertTrue(sock.closed)
87
88class OtherNetworkTests(unittest.TestCase):
89    def setUp(self):
90        if 0:  # for debugging
91            import logging
92            logger = logging.getLogger("test_urllib2net")
93            logger.addHandler(logging.StreamHandler())
94
95    # XXX The rest of these tests aren't very good -- they don't check much.
96    # They do sometimes catch some major disasters, though.
97
98    def test_ftp(self):
99        urls = [
100            'ftp://ftp.debian.org/debian/README',
101            ('ftp://ftp.debian.org/debian/non-existent-file',
102             None, urllib.error.URLError),
103            ]
104        self._test_urls(urls, self._extra_handlers())
105
106    def test_file(self):
107        TESTFN = support.TESTFN
108        f = open(TESTFN, 'w')
109        try:
110            f.write('hi there\n')
111            f.close()
112            urls = [
113                'file:' + sanepathname2url(os.path.abspath(TESTFN)),
114                ('file:///nonsensename/etc/passwd', None,
115                 urllib.error.URLError),
116                ]
117            self._test_urls(urls, self._extra_handlers(), retry=True)
118        finally:
119            os.remove(TESTFN)
120
121        self.assertRaises(ValueError, urllib.request.urlopen,'./relative_path/to/file')
122
123    # XXX Following test depends on machine configurations that are internal
124    # to CNRI.  Need to set up a public server with the right authentication
125    # configuration for test purposes.
126
127##     def test_cnri(self):
128##         if socket.gethostname() == 'bitdiddle':
129##             localhost = 'bitdiddle.cnri.reston.va.us'
130##         elif socket.gethostname() == 'bitdiddle.concentric.net':
131##             localhost = 'localhost'
132##         else:
133##             localhost = None
134##         if localhost is not None:
135##             urls = [
136##                 'file://%s/etc/passwd' % localhost,
137##                 'http://%s/simple/' % localhost,
138##                 'http://%s/digest/' % localhost,
139##                 'http://%s/not/found.h' % localhost,
140##                 ]
141
142##             bauth = HTTPBasicAuthHandler()
143##             bauth.add_password('basic_test_realm', localhost, 'jhylton',
144##                                'password')
145##             dauth = HTTPDigestAuthHandler()
146##             dauth.add_password('digest_test_realm', localhost, 'jhylton',
147##                                'password')
148
149##             self._test_urls(urls, self._extra_handlers()+[bauth, dauth])
150
151    def test_urlwithfrag(self):
152        urlwith_frag = "http://www.pythontest.net/index.html#frag"
153        with support.transient_internet(urlwith_frag):
154            req = urllib.request.Request(urlwith_frag)
155            res = urllib.request.urlopen(req)
156            self.assertEqual(res.geturl(),
157                    "http://www.pythontest.net/index.html#frag")
158
159    def test_redirect_url_withfrag(self):
160        redirect_url_with_frag = "http://www.pythontest.net/redir/with_frag/"
161        with support.transient_internet(redirect_url_with_frag):
162            req = urllib.request.Request(redirect_url_with_frag)
163            res = urllib.request.urlopen(req)
164            self.assertEqual(res.geturl(),
165                    "http://www.pythontest.net/elsewhere/#frag")
166
167    def test_custom_headers(self):
168        url = "http://www.example.com"
169        with support.transient_internet(url):
170            opener = urllib.request.build_opener()
171            request = urllib.request.Request(url)
172            self.assertFalse(request.header_items())
173            opener.open(request)
174            self.assertTrue(request.header_items())
175            self.assertTrue(request.has_header('User-agent'))
176            request.add_header('User-Agent','Test-Agent')
177            opener.open(request)
178            self.assertEqual(request.get_header('User-agent'),'Test-Agent')
179
180    def test_sites_no_connection_close(self):
181        # Some sites do not send Connection: close header.
182        # Verify that those work properly. (#issue12576)
183
184        URL = 'http://www.imdb.com' # mangles Connection:close
185
186        with support.transient_internet(URL):
187            try:
188                with urllib.request.urlopen(URL) as res:
189                    pass
190            except ValueError as e:
191                self.fail("urlopen failed for site not sending \
192                           Connection:close")
193            else:
194                self.assertTrue(res)
195
196            req = urllib.request.urlopen(URL)
197            res = req.read()
198            self.assertTrue(res)
199
200    def _test_urls(self, urls, handlers, retry=True):
201        import time
202        import logging
203        debug = logging.getLogger("test_urllib2").debug
204
205        urlopen = urllib.request.build_opener(*handlers).open
206        if retry:
207            urlopen = _wrap_with_retry_thrice(urlopen, urllib.error.URLError)
208
209        for url in urls:
210            with self.subTest(url=url):
211                if isinstance(url, tuple):
212                    url, req, expected_err = url
213                else:
214                    req = expected_err = None
215
216                with support.transient_internet(url):
217                    try:
218                        f = urlopen(url, req, TIMEOUT)
219                    # urllib.error.URLError is a subclass of OSError
220                    except OSError as err:
221                        if expected_err:
222                            msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" %
223                                   (expected_err, url, req, type(err), err))
224                            self.assertIsInstance(err, expected_err, msg)
225                        else:
226                            raise
227                    else:
228                        try:
229                            with support.time_out, \
230                                 support.socket_peer_reset, \
231                                 support.ioerror_peer_reset:
232                                buf = f.read()
233                                debug("read %d bytes" % len(buf))
234                        except socket.timeout:
235                            print("<timeout: %s>" % url, file=sys.stderr)
236                        f.close()
237                time.sleep(0.1)
238
239    def _extra_handlers(self):
240        handlers = []
241
242        cfh = urllib.request.CacheFTPHandler()
243        self.addCleanup(cfh.clear_cache)
244        cfh.setTimeout(1)
245        handlers.append(cfh)
246
247        return handlers
248
249
250class TimeoutTest(unittest.TestCase):
251    def test_http_basic(self):
252        self.assertIsNone(socket.getdefaulttimeout())
253        url = "http://www.example.com"
254        with support.transient_internet(url, timeout=None):
255            u = _urlopen_with_retry(url)
256            self.addCleanup(u.close)
257            self.assertIsNone(u.fp.raw._sock.gettimeout())
258
259    def test_http_default_timeout(self):
260        self.assertIsNone(socket.getdefaulttimeout())
261        url = "http://www.example.com"
262        with support.transient_internet(url):
263            socket.setdefaulttimeout(60)
264            try:
265                u = _urlopen_with_retry(url)
266                self.addCleanup(u.close)
267            finally:
268                socket.setdefaulttimeout(None)
269            self.assertEqual(u.fp.raw._sock.gettimeout(), 60)
270
271    def test_http_no_timeout(self):
272        self.assertIsNone(socket.getdefaulttimeout())
273        url = "http://www.example.com"
274        with support.transient_internet(url):
275            socket.setdefaulttimeout(60)
276            try:
277                u = _urlopen_with_retry(url, timeout=None)
278                self.addCleanup(u.close)
279            finally:
280                socket.setdefaulttimeout(None)
281            self.assertIsNone(u.fp.raw._sock.gettimeout())
282
283    def test_http_timeout(self):
284        url = "http://www.example.com"
285        with support.transient_internet(url):
286            u = _urlopen_with_retry(url, timeout=120)
287            self.addCleanup(u.close)
288            self.assertEqual(u.fp.raw._sock.gettimeout(), 120)
289
290    FTP_HOST = 'ftp://ftp.debian.org/debian/'
291
292    def test_ftp_basic(self):
293        self.assertIsNone(socket.getdefaulttimeout())
294        with support.transient_internet(self.FTP_HOST, timeout=None):
295            u = _urlopen_with_retry(self.FTP_HOST)
296            self.addCleanup(u.close)
297            self.assertIsNone(u.fp.fp.raw._sock.gettimeout())
298
299    def test_ftp_default_timeout(self):
300        self.assertIsNone(socket.getdefaulttimeout())
301        with support.transient_internet(self.FTP_HOST):
302            socket.setdefaulttimeout(60)
303            try:
304                u = _urlopen_with_retry(self.FTP_HOST)
305                self.addCleanup(u.close)
306            finally:
307                socket.setdefaulttimeout(None)
308            self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60)
309
310    def test_ftp_no_timeout(self):
311        self.assertIsNone(socket.getdefaulttimeout())
312        with support.transient_internet(self.FTP_HOST):
313            socket.setdefaulttimeout(60)
314            try:
315                u = _urlopen_with_retry(self.FTP_HOST, timeout=None)
316                self.addCleanup(u.close)
317            finally:
318                socket.setdefaulttimeout(None)
319            self.assertIsNone(u.fp.fp.raw._sock.gettimeout())
320
321    def test_ftp_timeout(self):
322        with support.transient_internet(self.FTP_HOST):
323            u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
324            self.addCleanup(u.close)
325            self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60)
326
327
328if __name__ == "__main__":
329    unittest.main()
330