1import unittest 2from test import test_support 3from test.test_urllib2 import sanepathname2url 4 5import socket 6import urllib2 7import os 8import sys 9 10TIMEOUT = 60 # seconds 11 12 13def _retry_thrice(func, exc, *args, **kwargs): 14 for i in range(3): 15 try: 16 return func(*args, **kwargs) 17 except exc, last_exc: 18 continue 19 except: 20 raise 21 raise last_exc 22 23def _wrap_with_retry_thrice(func, exc): 24 def wrapped(*args, **kwargs): 25 return _retry_thrice(func, exc, *args, **kwargs) 26 return wrapped 27 28# Connecting to remote hosts is flaky. Make it more robust by retrying 29# the connection several times. 30_urlopen_with_retry = _wrap_with_retry_thrice(urllib2.urlopen, urllib2.URLError) 31 32 33class AuthTests(unittest.TestCase): 34 """Tests urllib2 authentication features.""" 35 36## Disabled at the moment since there is no page under python.org which 37## could be used to HTTP authentication. 38# 39# def test_basic_auth(self): 40# import httplib 41# 42# test_url = "http://www.python.org/test/test_urllib2/basic_auth" 43# test_hostport = "www.python.org" 44# test_realm = 'Test Realm' 45# test_user = 'test.test_urllib2net' 46# test_password = 'blah' 47# 48# # failure 49# try: 50# _urlopen_with_retry(test_url) 51# except urllib2.HTTPError, exc: 52# self.assertEqual(exc.code, 401) 53# else: 54# self.fail("urlopen() should have failed with 401") 55# 56# # success 57# auth_handler = urllib2.HTTPBasicAuthHandler() 58# auth_handler.add_password(test_realm, test_hostport, 59# test_user, test_password) 60# opener = urllib2.build_opener(auth_handler) 61# f = opener.open('http://localhost/') 62# response = _urlopen_with_retry("http://www.python.org/") 63# 64# # The 'userinfo' URL component is deprecated by RFC 3986 for security 65# # reasons, let's not implement it! (it's already implemented for proxy 66# # specification strings (that is, URLs or authorities specifying a 67# # proxy), so we must keep that) 68# self.assertRaises(httplib.InvalidURL, 69# urllib2.urlopen, "http://evil:thing@example.com") 70 71 72class CloseSocketTest(unittest.TestCase): 73 74 def test_close(self): 75 import httplib 76 77 # calling .close() on urllib2's response objects should close the 78 # underlying socket 79 80 # delve deep into response to fetch socket._socketobject 81 response = _urlopen_with_retry("http://www.example.com/") 82 abused_fileobject = response.fp 83 self.assertIs(abused_fileobject.__class__, socket._fileobject) 84 httpresponse = abused_fileobject._sock 85 self.assertIs(httpresponse.__class__, httplib.HTTPResponse) 86 fileobject = httpresponse.fp 87 self.assertIs(fileobject.__class__, socket._fileobject) 88 89 self.assertTrue(not fileobject.closed) 90 response.close() 91 self.assertTrue(fileobject.closed) 92 93class OtherNetworkTests(unittest.TestCase): 94 def setUp(self): 95 if 0: # for debugging 96 import logging 97 logger = logging.getLogger("test_urllib2net") 98 logger.addHandler(logging.StreamHandler()) 99 100 # XXX The rest of these tests aren't very good -- they don't check much. 101 # They do sometimes catch some major disasters, though. 102 103 def test_ftp(self): 104 urls = [ 105 'ftp://ftp.debian.org/debian/README', 106 ('ftp://ftp.debian.org/debian/non-existent-file', 107 None, urllib2.URLError), 108 ] 109 self._test_urls(urls, self._extra_handlers()) 110 111 def test_file(self): 112 TESTFN = test_support.TESTFN 113 f = open(TESTFN, 'w') 114 try: 115 f.write('hi there\n') 116 f.close() 117 urls = [ 118 'file:'+sanepathname2url(os.path.abspath(TESTFN)), 119 ('file:///nonsensename/etc/passwd', None, urllib2.URLError), 120 ] 121 self._test_urls(urls, self._extra_handlers(), retry=True) 122 finally: 123 os.remove(TESTFN) 124 125 self.assertRaises(ValueError, urllib2.urlopen,'./relative_path/to/file') 126 127 # XXX Following test depends on machine configurations that are internal 128 # to CNRI. Need to set up a public server with the right authentication 129 # configuration for test purposes. 130 131## def test_cnri(self): 132## if socket.gethostname() == 'bitdiddle': 133## localhost = 'bitdiddle.cnri.reston.va.us' 134## elif socket.gethostname() == 'bitdiddle.concentric.net': 135## localhost = 'localhost' 136## else: 137## localhost = None 138## if localhost is not None: 139## urls = [ 140## 'file://%s/etc/passwd' % localhost, 141## 'http://%s/simple/' % localhost, 142## 'http://%s/digest/' % localhost, 143## 'http://%s/not/found.h' % localhost, 144## ] 145 146## bauth = HTTPBasicAuthHandler() 147## bauth.add_password('basic_test_realm', localhost, 'jhylton', 148## 'password') 149## dauth = HTTPDigestAuthHandler() 150## dauth.add_password('digest_test_realm', localhost, 'jhylton', 151## 'password') 152 153## self._test_urls(urls, self._extra_handlers()+[bauth, dauth]) 154 155 def test_urlwithfrag(self): 156 urlwith_frag = "http://www.pythontest.net/index.html#frag" 157 with test_support.transient_internet(urlwith_frag): 158 req = urllib2.Request(urlwith_frag) 159 res = urllib2.urlopen(req) 160 self.assertEqual(res.geturl(), 161 "http://www.pythontest.net/index.html#frag") 162 163 def test_fileno(self): 164 req = urllib2.Request("http://www.example.com") 165 opener = urllib2.build_opener() 166 res = opener.open(req) 167 try: 168 res.fileno() 169 except AttributeError: 170 self.fail("HTTPResponse object should return a valid fileno") 171 finally: 172 res.close() 173 174 def test_custom_headers(self): 175 url = "http://www.example.com" 176 with test_support.transient_internet(url): 177 opener = urllib2.build_opener() 178 request = urllib2.Request(url) 179 self.assertFalse(request.header_items()) 180 opener.open(request) 181 self.assertTrue(request.header_items()) 182 self.assertTrue(request.has_header('User-agent')) 183 request.add_header('User-Agent','Test-Agent') 184 opener.open(request) 185 self.assertEqual(request.get_header('User-agent'),'Test-Agent') 186 187 def test_sites_no_connection_close(self): 188 # Some sites do not send Connection: close header. 189 # Verify that those work properly. (#issue12576) 190 191 URL = 'http://www.imdb.com' # No Connection:close 192 with test_support.transient_internet(URL): 193 req = urllib2.urlopen(URL) 194 res = req.read() 195 self.assertTrue(res) 196 197 def _test_urls(self, urls, handlers, retry=True): 198 import time 199 import logging 200 debug = logging.getLogger("test_urllib2").debug 201 202 urlopen = urllib2.build_opener(*handlers).open 203 if retry: 204 urlopen = _wrap_with_retry_thrice(urlopen, urllib2.URLError) 205 206 for url in urls: 207 if isinstance(url, tuple): 208 url, req, expected_err = url 209 else: 210 req = expected_err = None 211 with test_support.transient_internet(url): 212 debug(url) 213 try: 214 f = urlopen(url, req, TIMEOUT) 215 except EnvironmentError as err: 216 debug(err) 217 if expected_err: 218 msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" % 219 (expected_err, url, req, type(err), err)) 220 self.assertIsInstance(err, expected_err, msg) 221 except urllib2.URLError as err: 222 if isinstance(err[0], socket.timeout): 223 print >>sys.stderr, "<timeout: %s>" % url 224 continue 225 else: 226 raise 227 else: 228 try: 229 with test_support.transient_internet(url): 230 buf = f.read() 231 debug("read %d bytes" % len(buf)) 232 except socket.timeout: 233 print >>sys.stderr, "<timeout: %s>" % url 234 f.close() 235 debug("******** next url coming up...") 236 time.sleep(0.1) 237 238 def _extra_handlers(self): 239 handlers = [] 240 241 cfh = urllib2.CacheFTPHandler() 242 self.addCleanup(cfh.clear_cache) 243 cfh.setTimeout(1) 244 handlers.append(cfh) 245 246 return handlers 247 248 249class TimeoutTest(unittest.TestCase): 250 def test_http_basic(self): 251 self.assertIsNone(socket.getdefaulttimeout()) 252 url = "http://www.example.com" 253 with test_support.transient_internet(url, timeout=None): 254 u = _urlopen_with_retry(url) 255 self.assertIsNone(u.fp._sock.fp._sock.gettimeout()) 256 257 def test_http_default_timeout(self): 258 self.assertIsNone(socket.getdefaulttimeout()) 259 url = "http://www.example.com" 260 with test_support.transient_internet(url): 261 socket.setdefaulttimeout(60) 262 try: 263 u = _urlopen_with_retry(url) 264 finally: 265 socket.setdefaulttimeout(None) 266 self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 60) 267 268 def test_http_no_timeout(self): 269 self.assertIsNone(socket.getdefaulttimeout()) 270 url = "http://www.example.com" 271 with test_support.transient_internet(url): 272 socket.setdefaulttimeout(60) 273 try: 274 u = _urlopen_with_retry(url, timeout=None) 275 finally: 276 socket.setdefaulttimeout(None) 277 self.assertIsNone(u.fp._sock.fp._sock.gettimeout()) 278 279 def test_http_timeout(self): 280 url = "http://www.example.com" 281 with test_support.transient_internet(url): 282 u = _urlopen_with_retry(url, timeout=120) 283 self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 120) 284 285 FTP_HOST = 'ftp://ftp.debian.org/debian/' 286 287 def test_ftp_basic(self): 288 self.assertIsNone(socket.getdefaulttimeout()) 289 with test_support.transient_internet(self.FTP_HOST, timeout=None): 290 u = _urlopen_with_retry(self.FTP_HOST) 291 self.assertIsNone(u.fp.fp._sock.gettimeout()) 292 293 def test_ftp_default_timeout(self): 294 self.assertIsNone(socket.getdefaulttimeout()) 295 with test_support.transient_internet(self.FTP_HOST): 296 socket.setdefaulttimeout(60) 297 try: 298 u = _urlopen_with_retry(self.FTP_HOST) 299 finally: 300 socket.setdefaulttimeout(None) 301 self.assertEqual(u.fp.fp._sock.gettimeout(), 60) 302 303 def test_ftp_no_timeout(self): 304 self.assertIsNone(socket.getdefaulttimeout(),) 305 with test_support.transient_internet(self.FTP_HOST): 306 socket.setdefaulttimeout(60) 307 try: 308 u = _urlopen_with_retry(self.FTP_HOST, timeout=None) 309 finally: 310 socket.setdefaulttimeout(None) 311 self.assertIsNone(u.fp.fp._sock.gettimeout()) 312 313 def test_ftp_timeout(self): 314 with test_support.transient_internet(self.FTP_HOST): 315 u = _urlopen_with_retry(self.FTP_HOST, timeout=60) 316 self.assertEqual(u.fp.fp._sock.gettimeout(), 60) 317 318 319def test_main(): 320 test_support.requires("network") 321 test_support.run_unittest(AuthTests, 322 OtherNetworkTests, 323 CloseSocketTest, 324 TimeoutTest, 325 ) 326 327if __name__ == "__main__": 328 test_main() 329