1#!/usr/bin/env python 2 3import unittest 4from test import test_support 5from test.test_urllib2 import sanepathname2url 6 7import socket 8import urllib2 9import os 10import sys 11 12TIMEOUT = 60 # seconds 13 14 15def _retry_thrice(func, exc, *args, **kwargs): 16 for i in range(3): 17 try: 18 return func(*args, **kwargs) 19 except exc, last_exc: 20 continue 21 except: 22 raise 23 raise last_exc 24 25def _wrap_with_retry_thrice(func, exc): 26 def wrapped(*args, **kwargs): 27 return _retry_thrice(func, exc, *args, **kwargs) 28 return wrapped 29 30# Connecting to remote hosts is flaky. Make it more robust by retrying 31# the connection several times. 32_urlopen_with_retry = _wrap_with_retry_thrice(urllib2.urlopen, urllib2.URLError) 33 34 35class AuthTests(unittest.TestCase): 36 """Tests urllib2 authentication features.""" 37 38## Disabled at the moment since there is no page under python.org which 39## could be used to HTTP authentication. 40# 41# def test_basic_auth(self): 42# import httplib 43# 44# test_url = "http://www.python.org/test/test_urllib2/basic_auth" 45# test_hostport = "www.python.org" 46# test_realm = 'Test Realm' 47# test_user = 'test.test_urllib2net' 48# test_password = 'blah' 49# 50# # failure 51# try: 52# _urlopen_with_retry(test_url) 53# except urllib2.HTTPError, exc: 54# self.assertEqual(exc.code, 401) 55# else: 56# self.fail("urlopen() should have failed with 401") 57# 58# # success 59# auth_handler = urllib2.HTTPBasicAuthHandler() 60# auth_handler.add_password(test_realm, test_hostport, 61# test_user, test_password) 62# opener = urllib2.build_opener(auth_handler) 63# f = opener.open('http://localhost/') 64# response = _urlopen_with_retry("http://www.python.org/") 65# 66# # The 'userinfo' URL component is deprecated by RFC 3986 for security 67# # reasons, let's not implement it! (it's already implemented for proxy 68# # specification strings (that is, URLs or authorities specifying a 69# # proxy), so we must keep that) 70# self.assertRaises(httplib.InvalidURL, 71# urllib2.urlopen, "http://evil:thing@example.com") 72 73 74class CloseSocketTest(unittest.TestCase): 75 76 def test_close(self): 77 import httplib 78 79 # calling .close() on urllib2's response objects should close the 80 # underlying socket 81 82 # delve deep into response to fetch socket._socketobject 83 response = _urlopen_with_retry("http://www.python.org/") 84 abused_fileobject = response.fp 85 self.assertTrue(abused_fileobject.__class__ is socket._fileobject) 86 httpresponse = abused_fileobject._sock 87 self.assertTrue(httpresponse.__class__ is httplib.HTTPResponse) 88 fileobject = httpresponse.fp 89 self.assertTrue(fileobject.__class__ is socket._fileobject) 90 91 self.assertTrue(not fileobject.closed) 92 response.close() 93 self.assertTrue(fileobject.closed) 94 95class OtherNetworkTests(unittest.TestCase): 96 def setUp(self): 97 if 0: # for debugging 98 import logging 99 logger = logging.getLogger("test_urllib2net") 100 logger.addHandler(logging.StreamHandler()) 101 102 # XXX The rest of these tests aren't very good -- they don't check much. 103 # They do sometimes catch some major disasters, though. 104 105 def test_ftp(self): 106 urls = [ 107 'ftp://ftp.kernel.org/pub/linux/kernel/README', 108 'ftp://ftp.kernel.org/pub/linux/kernel/non-existent-file', 109 #'ftp://ftp.kernel.org/pub/leenox/kernel/test', 110 'ftp://gatekeeper.research.compaq.com/pub/DEC/SRC' 111 '/research-reports/00README-Legal-Rules-Regs', 112 ] 113 self._test_urls(urls, self._extra_handlers()) 114 115 def test_file(self): 116 TESTFN = test_support.TESTFN 117 f = open(TESTFN, 'w') 118 try: 119 f.write('hi there\n') 120 f.close() 121 urls = [ 122 'file:'+sanepathname2url(os.path.abspath(TESTFN)), 123 ('file:///nonsensename/etc/passwd', None, urllib2.URLError), 124 ] 125 self._test_urls(urls, self._extra_handlers(), retry=True) 126 finally: 127 os.remove(TESTFN) 128 129 self.assertRaises(ValueError, urllib2.urlopen,'./relative_path/to/file') 130 131 # XXX Following test depends on machine configurations that are internal 132 # to CNRI. Need to set up a public server with the right authentication 133 # configuration for test purposes. 134 135## def test_cnri(self): 136## if socket.gethostname() == 'bitdiddle': 137## localhost = 'bitdiddle.cnri.reston.va.us' 138## elif socket.gethostname() == 'bitdiddle.concentric.net': 139## localhost = 'localhost' 140## else: 141## localhost = None 142## if localhost is not None: 143## urls = [ 144## 'file://%s/etc/passwd' % localhost, 145## 'http://%s/simple/' % localhost, 146## 'http://%s/digest/' % localhost, 147## 'http://%s/not/found.h' % localhost, 148## ] 149 150## bauth = HTTPBasicAuthHandler() 151## bauth.add_password('basic_test_realm', localhost, 'jhylton', 152## 'password') 153## dauth = HTTPDigestAuthHandler() 154## dauth.add_password('digest_test_realm', localhost, 'jhylton', 155## 'password') 156 157## self._test_urls(urls, self._extra_handlers()+[bauth, dauth]) 158 159 def test_urlwithfrag(self): 160 urlwith_frag = "http://docs.python.org/2/glossary.html#glossary" 161 with test_support.transient_internet(urlwith_frag): 162 req = urllib2.Request(urlwith_frag) 163 res = urllib2.urlopen(req) 164 self.assertEqual(res.geturl(), 165 "http://docs.python.org/2/glossary.html#glossary") 166 167 def test_fileno(self): 168 req = urllib2.Request("http://www.python.org") 169 opener = urllib2.build_opener() 170 res = opener.open(req) 171 try: 172 res.fileno() 173 except AttributeError: 174 self.fail("HTTPResponse object should return a valid fileno") 175 finally: 176 res.close() 177 178 def test_custom_headers(self): 179 url = "http://www.example.com" 180 with test_support.transient_internet(url): 181 opener = urllib2.build_opener() 182 request = urllib2.Request(url) 183 self.assertFalse(request.header_items()) 184 opener.open(request) 185 self.assertTrue(request.header_items()) 186 self.assertTrue(request.has_header('User-agent')) 187 request.add_header('User-Agent','Test-Agent') 188 opener.open(request) 189 self.assertEqual(request.get_header('User-agent'),'Test-Agent') 190 191 def test_sites_no_connection_close(self): 192 # Some sites do not send Connection: close header. 193 # Verify that those work properly. (#issue12576) 194 195 URL = 'http://www.imdb.com' # No Connection:close 196 with test_support.transient_internet(URL): 197 req = urllib2.urlopen(URL) 198 res = req.read() 199 self.assertTrue(res) 200 201 def _test_urls(self, urls, handlers, retry=True): 202 import time 203 import logging 204 debug = logging.getLogger("test_urllib2").debug 205 206 urlopen = urllib2.build_opener(*handlers).open 207 if retry: 208 urlopen = _wrap_with_retry_thrice(urlopen, urllib2.URLError) 209 210 for url in urls: 211 if isinstance(url, tuple): 212 url, req, expected_err = url 213 else: 214 req = expected_err = None 215 with test_support.transient_internet(url): 216 debug(url) 217 try: 218 f = urlopen(url, req, TIMEOUT) 219 except EnvironmentError as err: 220 debug(err) 221 if expected_err: 222 msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" % 223 (expected_err, url, req, type(err), err)) 224 self.assertIsInstance(err, expected_err, msg) 225 except urllib2.URLError as err: 226 if isinstance(err[0], socket.timeout): 227 print >>sys.stderr, "<timeout: %s>" % url 228 continue 229 else: 230 raise 231 else: 232 try: 233 with test_support.transient_internet(url): 234 buf = f.read() 235 debug("read %d bytes" % len(buf)) 236 except socket.timeout: 237 print >>sys.stderr, "<timeout: %s>" % url 238 f.close() 239 debug("******** next url coming up...") 240 time.sleep(0.1) 241 242 def _extra_handlers(self): 243 handlers = [] 244 245 cfh = urllib2.CacheFTPHandler() 246 self.addCleanup(cfh.clear_cache) 247 cfh.setTimeout(1) 248 handlers.append(cfh) 249 250 return handlers 251 252 253class TimeoutTest(unittest.TestCase): 254 def test_http_basic(self): 255 self.assertTrue(socket.getdefaulttimeout() is None) 256 url = "http://www.python.org" 257 with test_support.transient_internet(url, timeout=None): 258 u = _urlopen_with_retry(url) 259 self.assertTrue(u.fp._sock.fp._sock.gettimeout() is None) 260 261 def test_http_default_timeout(self): 262 self.assertTrue(socket.getdefaulttimeout() is None) 263 url = "http://www.python.org" 264 with test_support.transient_internet(url): 265 socket.setdefaulttimeout(60) 266 try: 267 u = _urlopen_with_retry(url) 268 finally: 269 socket.setdefaulttimeout(None) 270 self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 60) 271 272 def test_http_no_timeout(self): 273 self.assertTrue(socket.getdefaulttimeout() is None) 274 url = "http://www.python.org" 275 with test_support.transient_internet(url): 276 socket.setdefaulttimeout(60) 277 try: 278 u = _urlopen_with_retry(url, timeout=None) 279 finally: 280 socket.setdefaulttimeout(None) 281 self.assertTrue(u.fp._sock.fp._sock.gettimeout() is None) 282 283 def test_http_timeout(self): 284 url = "http://www.python.org" 285 with test_support.transient_internet(url): 286 u = _urlopen_with_retry(url, timeout=120) 287 self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 120) 288 289 FTP_HOST = "ftp://ftp.mirror.nl/pub/gnu/" 290 291 def test_ftp_basic(self): 292 self.assertTrue(socket.getdefaulttimeout() is None) 293 with test_support.transient_internet(self.FTP_HOST, timeout=None): 294 u = _urlopen_with_retry(self.FTP_HOST) 295 self.assertTrue(u.fp.fp._sock.gettimeout() is None) 296 297 def test_ftp_default_timeout(self): 298 self.assertTrue(socket.getdefaulttimeout() is None) 299 with test_support.transient_internet(self.FTP_HOST): 300 socket.setdefaulttimeout(60) 301 try: 302 u = _urlopen_with_retry(self.FTP_HOST) 303 finally: 304 socket.setdefaulttimeout(None) 305 self.assertEqual(u.fp.fp._sock.gettimeout(), 60) 306 307 def test_ftp_no_timeout(self): 308 self.assertTrue(socket.getdefaulttimeout() is None) 309 with test_support.transient_internet(self.FTP_HOST): 310 socket.setdefaulttimeout(60) 311 try: 312 u = _urlopen_with_retry(self.FTP_HOST, timeout=None) 313 finally: 314 socket.setdefaulttimeout(None) 315 self.assertTrue(u.fp.fp._sock.gettimeout() is None) 316 317 def test_ftp_timeout(self): 318 with test_support.transient_internet(self.FTP_HOST): 319 u = _urlopen_with_retry(self.FTP_HOST, timeout=60) 320 self.assertEqual(u.fp.fp._sock.gettimeout(), 60) 321 322 323def test_main(): 324 test_support.requires("network") 325 test_support.run_unittest(AuthTests, 326 OtherNetworkTests, 327 CloseSocketTest, 328 TimeoutTest, 329 ) 330 331if __name__ == "__main__": 332 test_main() 333