1"""Unittests for the various HTTPServer modules. 2 3Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>, 4Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest. 5""" 6 7from http.server import BaseHTTPRequestHandler, HTTPServer, \ 8 SimpleHTTPRequestHandler, CGIHTTPRequestHandler 9from http import server, HTTPStatus 10 11import os 12import sys 13import re 14import base64 15import ntpath 16import shutil 17import urllib.parse 18import html 19import http.client 20import tempfile 21import time 22from io import BytesIO 23 24import unittest 25from test import support 26threading = support.import_module('threading') 27 28class NoLogRequestHandler: 29 def log_message(self, *args): 30 # don't write log messages to stderr 31 pass 32 33 def read(self, n=None): 34 return '' 35 36 37class TestServerThread(threading.Thread): 38 def __init__(self, test_object, request_handler): 39 threading.Thread.__init__(self) 40 self.request_handler = request_handler 41 self.test_object = test_object 42 43 def run(self): 44 self.server = HTTPServer(('localhost', 0), self.request_handler) 45 self.test_object.HOST, self.test_object.PORT = self.server.socket.getsockname() 46 self.test_object.server_started.set() 47 self.test_object = None 48 try: 49 self.server.serve_forever(0.05) 50 finally: 51 self.server.server_close() 52 53 def stop(self): 54 self.server.shutdown() 55 56 57class BaseTestCase(unittest.TestCase): 58 def setUp(self): 59 self._threads = support.threading_setup() 60 os.environ = support.EnvironmentVarGuard() 61 self.server_started = threading.Event() 62 self.thread = TestServerThread(self, self.request_handler) 63 self.thread.start() 64 self.server_started.wait() 65 66 def tearDown(self): 67 self.thread.stop() 68 self.thread = None 69 os.environ.__exit__() 70 support.threading_cleanup(*self._threads) 71 72 def request(self, uri, method='GET', body=None, headers={}): 73 self.connection = http.client.HTTPConnection(self.HOST, self.PORT) 74 self.connection.request(method, uri, body, headers) 75 return self.connection.getresponse() 76 77 78class BaseHTTPServerTestCase(BaseTestCase): 79 class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler): 80 protocol_version = 'HTTP/1.1' 81 default_request_version = 'HTTP/1.1' 82 83 def do_TEST(self): 84 self.send_response(HTTPStatus.NO_CONTENT) 85 self.send_header('Content-Type', 'text/html') 86 self.send_header('Connection', 'close') 87 self.end_headers() 88 89 def do_KEEP(self): 90 self.send_response(HTTPStatus.NO_CONTENT) 91 self.send_header('Content-Type', 'text/html') 92 self.send_header('Connection', 'keep-alive') 93 self.end_headers() 94 95 def do_KEYERROR(self): 96 self.send_error(999) 97 98 def do_NOTFOUND(self): 99 self.send_error(HTTPStatus.NOT_FOUND) 100 101 def do_EXPLAINERROR(self): 102 self.send_error(999, "Short Message", 103 "This is a long \n explanation") 104 105 def do_CUSTOM(self): 106 self.send_response(999) 107 self.send_header('Content-Type', 'text/html') 108 self.send_header('Connection', 'close') 109 self.end_headers() 110 111 def do_LATINONEHEADER(self): 112 self.send_response(999) 113 self.send_header('X-Special', 'Dängerous Mind') 114 self.send_header('Connection', 'close') 115 self.end_headers() 116 body = self.headers['x-special-incoming'].encode('utf-8') 117 self.wfile.write(body) 118 119 def do_SEND_ERROR(self): 120 self.send_error(int(self.path[1:])) 121 122 def do_HEAD(self): 123 self.send_error(int(self.path[1:])) 124 125 def setUp(self): 126 BaseTestCase.setUp(self) 127 self.con = http.client.HTTPConnection(self.HOST, self.PORT) 128 self.con.connect() 129 130 def test_command(self): 131 self.con.request('GET', '/') 132 res = self.con.getresponse() 133 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 134 135 def test_request_line_trimming(self): 136 self.con._http_vsn_str = 'HTTP/1.1\n' 137 self.con.putrequest('XYZBOGUS', '/') 138 self.con.endheaders() 139 res = self.con.getresponse() 140 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 141 142 def test_version_bogus(self): 143 self.con._http_vsn_str = 'FUBAR' 144 self.con.putrequest('GET', '/') 145 self.con.endheaders() 146 res = self.con.getresponse() 147 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 148 149 def test_version_digits(self): 150 self.con._http_vsn_str = 'HTTP/9.9.9' 151 self.con.putrequest('GET', '/') 152 self.con.endheaders() 153 res = self.con.getresponse() 154 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 155 156 def test_version_none_get(self): 157 self.con._http_vsn_str = '' 158 self.con.putrequest('GET', '/') 159 self.con.endheaders() 160 res = self.con.getresponse() 161 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 162 163 def test_version_none(self): 164 # Test that a valid method is rejected when not HTTP/1.x 165 self.con._http_vsn_str = '' 166 self.con.putrequest('CUSTOM', '/') 167 self.con.endheaders() 168 res = self.con.getresponse() 169 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 170 171 def test_version_invalid(self): 172 self.con._http_vsn = 99 173 self.con._http_vsn_str = 'HTTP/9.9' 174 self.con.putrequest('GET', '/') 175 self.con.endheaders() 176 res = self.con.getresponse() 177 self.assertEqual(res.status, HTTPStatus.HTTP_VERSION_NOT_SUPPORTED) 178 179 def test_send_blank(self): 180 self.con._http_vsn_str = '' 181 self.con.putrequest('', '') 182 self.con.endheaders() 183 res = self.con.getresponse() 184 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 185 186 def test_header_close(self): 187 self.con.putrequest('GET', '/') 188 self.con.putheader('Connection', 'close') 189 self.con.endheaders() 190 res = self.con.getresponse() 191 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 192 193 def test_header_keep_alive(self): 194 self.con._http_vsn_str = 'HTTP/1.1' 195 self.con.putrequest('GET', '/') 196 self.con.putheader('Connection', 'keep-alive') 197 self.con.endheaders() 198 res = self.con.getresponse() 199 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 200 201 def test_handler(self): 202 self.con.request('TEST', '/') 203 res = self.con.getresponse() 204 self.assertEqual(res.status, HTTPStatus.NO_CONTENT) 205 206 def test_return_header_keep_alive(self): 207 self.con.request('KEEP', '/') 208 res = self.con.getresponse() 209 self.assertEqual(res.getheader('Connection'), 'keep-alive') 210 self.con.request('TEST', '/') 211 self.addCleanup(self.con.close) 212 213 def test_internal_key_error(self): 214 self.con.request('KEYERROR', '/') 215 res = self.con.getresponse() 216 self.assertEqual(res.status, 999) 217 218 def test_return_custom_status(self): 219 self.con.request('CUSTOM', '/') 220 res = self.con.getresponse() 221 self.assertEqual(res.status, 999) 222 223 def test_return_explain_error(self): 224 self.con.request('EXPLAINERROR', '/') 225 res = self.con.getresponse() 226 self.assertEqual(res.status, 999) 227 self.assertTrue(int(res.getheader('Content-Length'))) 228 229 def test_latin1_header(self): 230 self.con.request('LATINONEHEADER', '/', headers={ 231 'X-Special-Incoming': 'Ärger mit Unicode' 232 }) 233 res = self.con.getresponse() 234 self.assertEqual(res.getheader('X-Special'), 'Dängerous Mind') 235 self.assertEqual(res.read(), 'Ärger mit Unicode'.encode('utf-8')) 236 237 def test_error_content_length(self): 238 # Issue #16088: standard error responses should have a content-length 239 self.con.request('NOTFOUND', '/') 240 res = self.con.getresponse() 241 self.assertEqual(res.status, HTTPStatus.NOT_FOUND) 242 243 data = res.read() 244 self.assertEqual(int(res.getheader('Content-Length')), len(data)) 245 246 def test_send_error(self): 247 allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED, 248 HTTPStatus.RESET_CONTENT) 249 for code in (HTTPStatus.NO_CONTENT, HTTPStatus.NOT_MODIFIED, 250 HTTPStatus.PROCESSING, HTTPStatus.RESET_CONTENT, 251 HTTPStatus.SWITCHING_PROTOCOLS): 252 self.con.request('SEND_ERROR', '/{}'.format(code)) 253 res = self.con.getresponse() 254 self.assertEqual(code, res.status) 255 self.assertEqual(None, res.getheader('Content-Length')) 256 self.assertEqual(None, res.getheader('Content-Type')) 257 if code not in allow_transfer_encoding_codes: 258 self.assertEqual(None, res.getheader('Transfer-Encoding')) 259 260 data = res.read() 261 self.assertEqual(b'', data) 262 263 def test_head_via_send_error(self): 264 allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED, 265 HTTPStatus.RESET_CONTENT) 266 for code in (HTTPStatus.OK, HTTPStatus.NO_CONTENT, 267 HTTPStatus.NOT_MODIFIED, HTTPStatus.RESET_CONTENT, 268 HTTPStatus.SWITCHING_PROTOCOLS): 269 self.con.request('HEAD', '/{}'.format(code)) 270 res = self.con.getresponse() 271 self.assertEqual(code, res.status) 272 if code == HTTPStatus.OK: 273 self.assertTrue(int(res.getheader('Content-Length')) > 0) 274 self.assertIn('text/html', res.getheader('Content-Type')) 275 else: 276 self.assertEqual(None, res.getheader('Content-Length')) 277 self.assertEqual(None, res.getheader('Content-Type')) 278 if code not in allow_transfer_encoding_codes: 279 self.assertEqual(None, res.getheader('Transfer-Encoding')) 280 281 data = res.read() 282 self.assertEqual(b'', data) 283 284 285class RequestHandlerLoggingTestCase(BaseTestCase): 286 class request_handler(BaseHTTPRequestHandler): 287 protocol_version = 'HTTP/1.1' 288 default_request_version = 'HTTP/1.1' 289 290 def do_GET(self): 291 self.send_response(HTTPStatus.OK) 292 self.end_headers() 293 294 def do_ERROR(self): 295 self.send_error(HTTPStatus.NOT_FOUND, 'File not found') 296 297 def test_get(self): 298 self.con = http.client.HTTPConnection(self.HOST, self.PORT) 299 self.con.connect() 300 301 with support.captured_stderr() as err: 302 self.con.request('GET', '/') 303 self.con.getresponse() 304 305 self.assertTrue( 306 err.getvalue().endswith('"GET / HTTP/1.1" 200 -\n')) 307 308 def test_err(self): 309 self.con = http.client.HTTPConnection(self.HOST, self.PORT) 310 self.con.connect() 311 312 with support.captured_stderr() as err: 313 self.con.request('ERROR', '/') 314 self.con.getresponse() 315 316 lines = err.getvalue().split('\n') 317 self.assertTrue(lines[0].endswith('code 404, message File not found')) 318 self.assertTrue(lines[1].endswith('"ERROR / HTTP/1.1" 404 -')) 319 320 321class SimpleHTTPServerTestCase(BaseTestCase): 322 class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler): 323 pass 324 325 def setUp(self): 326 BaseTestCase.setUp(self) 327 self.cwd = os.getcwd() 328 basetempdir = tempfile.gettempdir() 329 os.chdir(basetempdir) 330 self.data = b'We are the knights who say Ni!' 331 self.tempdir = tempfile.mkdtemp(dir=basetempdir) 332 self.tempdir_name = os.path.basename(self.tempdir) 333 self.base_url = '/' + self.tempdir_name 334 with open(os.path.join(self.tempdir, 'test'), 'wb') as temp: 335 temp.write(self.data) 336 337 def tearDown(self): 338 try: 339 os.chdir(self.cwd) 340 try: 341 shutil.rmtree(self.tempdir) 342 except: 343 pass 344 finally: 345 BaseTestCase.tearDown(self) 346 347 def check_status_and_reason(self, response, status, data=None): 348 def close_conn(): 349 """Don't close reader yet so we can check if there was leftover 350 buffered input""" 351 nonlocal reader 352 reader = response.fp 353 response.fp = None 354 reader = None 355 response._close_conn = close_conn 356 357 body = response.read() 358 self.assertTrue(response) 359 self.assertEqual(response.status, status) 360 self.assertIsNotNone(response.reason) 361 if data: 362 self.assertEqual(data, body) 363 # Ensure the server has not set up a persistent connection, and has 364 # not sent any extra data 365 self.assertEqual(response.version, 10) 366 self.assertEqual(response.msg.get("Connection", "close"), "close") 367 self.assertEqual(reader.read(30), b'', 'Connection should be closed') 368 369 reader.close() 370 return body 371 372 @support.requires_mac_ver(10, 5) 373 @unittest.skipIf(sys.platform == 'win32', 374 'undecodable name cannot be decoded on win32') 375 @unittest.skipUnless(support.TESTFN_UNDECODABLE, 376 'need support.TESTFN_UNDECODABLE') 377 def test_undecodable_filename(self): 378 enc = sys.getfilesystemencoding() 379 filename = os.fsdecode(support.TESTFN_UNDECODABLE) + '.txt' 380 with open(os.path.join(self.tempdir, filename), 'wb') as f: 381 f.write(support.TESTFN_UNDECODABLE) 382 response = self.request(self.base_url + '/') 383 if sys.platform == 'darwin': 384 # On Mac OS the HFS+ filesystem replaces bytes that aren't valid 385 # UTF-8 into a percent-encoded value. 386 for name in os.listdir(self.tempdir): 387 if name != 'test': # Ignore a filename created in setUp(). 388 filename = name 389 break 390 body = self.check_status_and_reason(response, HTTPStatus.OK) 391 quotedname = urllib.parse.quote(filename, errors='surrogatepass') 392 self.assertIn(('href="%s"' % quotedname) 393 .encode(enc, 'surrogateescape'), body) 394 self.assertIn(('>%s<' % html.escape(filename, quote=False)) 395 .encode(enc, 'surrogateescape'), body) 396 response = self.request(self.base_url + '/' + quotedname) 397 self.check_status_and_reason(response, HTTPStatus.OK, 398 data=support.TESTFN_UNDECODABLE) 399 400 def test_get(self): 401 #constructs the path relative to the root directory of the HTTPServer 402 response = self.request(self.base_url + '/test') 403 self.check_status_and_reason(response, HTTPStatus.OK, data=self.data) 404 # check for trailing "/" which should return 404. See Issue17324 405 response = self.request(self.base_url + '/test/') 406 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 407 response = self.request(self.base_url + '/') 408 self.check_status_and_reason(response, HTTPStatus.OK) 409 response = self.request(self.base_url) 410 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 411 response = self.request(self.base_url + '/?hi=2') 412 self.check_status_and_reason(response, HTTPStatus.OK) 413 response = self.request(self.base_url + '?hi=1') 414 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 415 self.assertEqual(response.getheader("Location"), 416 self.base_url + "/?hi=1") 417 response = self.request('/ThisDoesNotExist') 418 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 419 response = self.request('/' + 'ThisDoesNotExist' + '/') 420 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 421 422 data = b"Dummy index file\r\n" 423 with open(os.path.join(self.tempdir_name, 'index.html'), 'wb') as f: 424 f.write(data) 425 response = self.request(self.base_url + '/') 426 self.check_status_and_reason(response, HTTPStatus.OK, data) 427 428 # chmod() doesn't work as expected on Windows, and filesystem 429 # permissions are ignored by root on Unix. 430 if os.name == 'posix' and os.geteuid() != 0: 431 os.chmod(self.tempdir, 0) 432 try: 433 response = self.request(self.base_url + '/') 434 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 435 finally: 436 os.chmod(self.tempdir, 0o755) 437 438 def test_head(self): 439 response = self.request( 440 self.base_url + '/test', method='HEAD') 441 self.check_status_and_reason(response, HTTPStatus.OK) 442 self.assertEqual(response.getheader('content-length'), 443 str(len(self.data))) 444 self.assertEqual(response.getheader('content-type'), 445 'application/octet-stream') 446 447 def test_invalid_requests(self): 448 response = self.request('/', method='FOO') 449 self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED) 450 # requests must be case sensitive,so this should fail too 451 response = self.request('/', method='custom') 452 self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED) 453 response = self.request('/', method='GETs') 454 self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED) 455 456 def test_path_without_leading_slash(self): 457 response = self.request(self.tempdir_name + '/test') 458 self.check_status_and_reason(response, HTTPStatus.OK, data=self.data) 459 response = self.request(self.tempdir_name + '/test/') 460 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 461 response = self.request(self.tempdir_name + '/') 462 self.check_status_and_reason(response, HTTPStatus.OK) 463 response = self.request(self.tempdir_name) 464 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 465 response = self.request(self.tempdir_name + '/?hi=2') 466 self.check_status_and_reason(response, HTTPStatus.OK) 467 response = self.request(self.tempdir_name + '?hi=1') 468 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 469 self.assertEqual(response.getheader("Location"), 470 self.tempdir_name + "/?hi=1") 471 472 def test_html_escape_filename(self): 473 filename = '<test&>.txt' 474 fullpath = os.path.join(self.tempdir, filename) 475 476 try: 477 open(fullpath, 'w').close() 478 except OSError: 479 raise unittest.SkipTest('Can not create file %s on current file ' 480 'system' % filename) 481 482 try: 483 response = self.request(self.base_url + '/') 484 body = self.check_status_and_reason(response, HTTPStatus.OK) 485 enc = response.headers.get_content_charset() 486 finally: 487 os.unlink(fullpath) # avoid affecting test_undecodable_filename 488 489 self.assertIsNotNone(enc) 490 html_text = '>%s<' % html.escape(filename, quote=False) 491 self.assertIn(html_text.encode(enc), body) 492 493 494cgi_file1 = """\ 495#!%s 496 497print("Content-type: text/html") 498print() 499print("Hello World") 500""" 501 502cgi_file2 = """\ 503#!%s 504import cgi 505 506print("Content-type: text/html") 507print() 508 509form = cgi.FieldStorage() 510print("%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"), 511 form.getfirst("bacon"))) 512""" 513 514cgi_file4 = """\ 515#!%s 516import os 517 518print("Content-type: text/html") 519print() 520 521print(os.environ["%s"]) 522""" 523 524 525@unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0, 526 "This test can't be run reliably as root (issue #13308).") 527class CGIHTTPServerTestCase(BaseTestCase): 528 class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler): 529 pass 530 531 linesep = os.linesep.encode('ascii') 532 533 def setUp(self): 534 BaseTestCase.setUp(self) 535 self.cwd = os.getcwd() 536 self.parent_dir = tempfile.mkdtemp() 537 self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin') 538 self.cgi_child_dir = os.path.join(self.cgi_dir, 'child-dir') 539 os.mkdir(self.cgi_dir) 540 os.mkdir(self.cgi_child_dir) 541 self.nocgi_path = None 542 self.file1_path = None 543 self.file2_path = None 544 self.file3_path = None 545 self.file4_path = None 546 547 # The shebang line should be pure ASCII: use symlink if possible. 548 # See issue #7668. 549 if support.can_symlink(): 550 self.pythonexe = os.path.join(self.parent_dir, 'python') 551 os.symlink(sys.executable, self.pythonexe) 552 else: 553 self.pythonexe = sys.executable 554 555 try: 556 # The python executable path is written as the first line of the 557 # CGI Python script. The encoding cookie cannot be used, and so the 558 # path should be encodable to the default script encoding (utf-8) 559 self.pythonexe.encode('utf-8') 560 except UnicodeEncodeError: 561 self.tearDown() 562 self.skipTest("Python executable path is not encodable to utf-8") 563 564 self.nocgi_path = os.path.join(self.parent_dir, 'nocgi.py') 565 with open(self.nocgi_path, 'w') as fp: 566 fp.write(cgi_file1 % self.pythonexe) 567 os.chmod(self.nocgi_path, 0o777) 568 569 self.file1_path = os.path.join(self.cgi_dir, 'file1.py') 570 with open(self.file1_path, 'w', encoding='utf-8') as file1: 571 file1.write(cgi_file1 % self.pythonexe) 572 os.chmod(self.file1_path, 0o777) 573 574 self.file2_path = os.path.join(self.cgi_dir, 'file2.py') 575 with open(self.file2_path, 'w', encoding='utf-8') as file2: 576 file2.write(cgi_file2 % self.pythonexe) 577 os.chmod(self.file2_path, 0o777) 578 579 self.file3_path = os.path.join(self.cgi_child_dir, 'file3.py') 580 with open(self.file3_path, 'w', encoding='utf-8') as file3: 581 file3.write(cgi_file1 % self.pythonexe) 582 os.chmod(self.file3_path, 0o777) 583 584 self.file4_path = os.path.join(self.cgi_dir, 'file4.py') 585 with open(self.file4_path, 'w', encoding='utf-8') as file4: 586 file4.write(cgi_file4 % (self.pythonexe, 'QUERY_STRING')) 587 os.chmod(self.file4_path, 0o777) 588 589 os.chdir(self.parent_dir) 590 591 def tearDown(self): 592 try: 593 os.chdir(self.cwd) 594 if self.pythonexe != sys.executable: 595 os.remove(self.pythonexe) 596 if self.nocgi_path: 597 os.remove(self.nocgi_path) 598 if self.file1_path: 599 os.remove(self.file1_path) 600 if self.file2_path: 601 os.remove(self.file2_path) 602 if self.file3_path: 603 os.remove(self.file3_path) 604 if self.file4_path: 605 os.remove(self.file4_path) 606 os.rmdir(self.cgi_child_dir) 607 os.rmdir(self.cgi_dir) 608 os.rmdir(self.parent_dir) 609 finally: 610 BaseTestCase.tearDown(self) 611 612 def test_url_collapse_path(self): 613 # verify tail is the last portion and head is the rest on proper urls 614 test_vectors = { 615 '': '//', 616 '..': IndexError, 617 '/.//..': IndexError, 618 '/': '//', 619 '//': '//', 620 '/\\': '//\\', 621 '/.//': '//', 622 'cgi-bin/file1.py': '/cgi-bin/file1.py', 623 '/cgi-bin/file1.py': '/cgi-bin/file1.py', 624 'a': '//a', 625 '/a': '//a', 626 '//a': '//a', 627 './a': '//a', 628 './C:/': '/C:/', 629 '/a/b': '/a/b', 630 '/a/b/': '/a/b/', 631 '/a/b/.': '/a/b/', 632 '/a/b/c/..': '/a/b/', 633 '/a/b/c/../d': '/a/b/d', 634 '/a/b/c/../d/e/../f': '/a/b/d/f', 635 '/a/b/c/../d/e/../../f': '/a/b/f', 636 '/a/b/c/../d/e/.././././..//f': '/a/b/f', 637 '../a/b/c/../d/e/.././././..//f': IndexError, 638 '/a/b/c/../d/e/../../../f': '/a/f', 639 '/a/b/c/../d/e/../../../../f': '//f', 640 '/a/b/c/../d/e/../../../../../f': IndexError, 641 '/a/b/c/../d/e/../../../../f/..': '//', 642 '/a/b/c/../d/e/../../../../f/../.': '//', 643 } 644 for path, expected in test_vectors.items(): 645 if isinstance(expected, type) and issubclass(expected, Exception): 646 self.assertRaises(expected, 647 server._url_collapse_path, path) 648 else: 649 actual = server._url_collapse_path(path) 650 self.assertEqual(expected, actual, 651 msg='path = %r\nGot: %r\nWanted: %r' % 652 (path, actual, expected)) 653 654 def test_headers_and_content(self): 655 res = self.request('/cgi-bin/file1.py') 656 self.assertEqual( 657 (res.read(), res.getheader('Content-type'), res.status), 658 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK)) 659 660 def test_issue19435(self): 661 res = self.request('///////////nocgi.py/../cgi-bin/nothere.sh') 662 self.assertEqual(res.status, HTTPStatus.NOT_FOUND) 663 664 def test_post(self): 665 params = urllib.parse.urlencode( 666 {'spam' : 1, 'eggs' : 'python', 'bacon' : 123456}) 667 headers = {'Content-type' : 'application/x-www-form-urlencoded'} 668 res = self.request('/cgi-bin/file2.py', 'POST', params, headers) 669 670 self.assertEqual(res.read(), b'1, python, 123456' + self.linesep) 671 672 def test_invaliduri(self): 673 res = self.request('/cgi-bin/invalid') 674 res.read() 675 self.assertEqual(res.status, HTTPStatus.NOT_FOUND) 676 677 def test_authorization(self): 678 headers = {b'Authorization' : b'Basic ' + 679 base64.b64encode(b'username:pass')} 680 res = self.request('/cgi-bin/file1.py', 'GET', headers=headers) 681 self.assertEqual( 682 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 683 (res.read(), res.getheader('Content-type'), res.status)) 684 685 def test_no_leading_slash(self): 686 # http://bugs.python.org/issue2254 687 res = self.request('cgi-bin/file1.py') 688 self.assertEqual( 689 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 690 (res.read(), res.getheader('Content-type'), res.status)) 691 692 def test_os_environ_is_not_altered(self): 693 signature = "Test CGI Server" 694 os.environ['SERVER_SOFTWARE'] = signature 695 res = self.request('/cgi-bin/file1.py') 696 self.assertEqual( 697 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 698 (res.read(), res.getheader('Content-type'), res.status)) 699 self.assertEqual(os.environ['SERVER_SOFTWARE'], signature) 700 701 def test_urlquote_decoding_in_cgi_check(self): 702 res = self.request('/cgi-bin%2ffile1.py') 703 self.assertEqual( 704 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 705 (res.read(), res.getheader('Content-type'), res.status)) 706 707 def test_nested_cgi_path_issue21323(self): 708 res = self.request('/cgi-bin/child-dir/file3.py') 709 self.assertEqual( 710 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 711 (res.read(), res.getheader('Content-type'), res.status)) 712 713 def test_query_with_multiple_question_mark(self): 714 res = self.request('/cgi-bin/file4.py?a=b?c=d') 715 self.assertEqual( 716 (b'a=b?c=d' + self.linesep, 'text/html', HTTPStatus.OK), 717 (res.read(), res.getheader('Content-type'), res.status)) 718 719 def test_query_with_continuous_slashes(self): 720 res = self.request('/cgi-bin/file4.py?k=aa%2F%2Fbb&//q//p//=//a//b//') 721 self.assertEqual( 722 (b'k=aa%2F%2Fbb&//q//p//=//a//b//' + self.linesep, 723 'text/html', HTTPStatus.OK), 724 (res.read(), res.getheader('Content-type'), res.status)) 725 726 727class SocketlessRequestHandler(SimpleHTTPRequestHandler): 728 def __init__(self): 729 self.get_called = False 730 self.protocol_version = "HTTP/1.1" 731 732 def do_GET(self): 733 self.get_called = True 734 self.send_response(HTTPStatus.OK) 735 self.send_header('Content-Type', 'text/html') 736 self.end_headers() 737 self.wfile.write(b'<html><body>Data</body></html>\r\n') 738 739 def log_message(self, format, *args): 740 pass 741 742class RejectingSocketlessRequestHandler(SocketlessRequestHandler): 743 def handle_expect_100(self): 744 self.send_error(HTTPStatus.EXPECTATION_FAILED) 745 return False 746 747 748class AuditableBytesIO: 749 750 def __init__(self): 751 self.datas = [] 752 753 def write(self, data): 754 self.datas.append(data) 755 756 def getData(self): 757 return b''.join(self.datas) 758 759 @property 760 def numWrites(self): 761 return len(self.datas) 762 763 764class BaseHTTPRequestHandlerTestCase(unittest.TestCase): 765 """Test the functionality of the BaseHTTPServer. 766 767 Test the support for the Expect 100-continue header. 768 """ 769 770 HTTPResponseMatch = re.compile(b'HTTP/1.[0-9]+ 200 OK') 771 772 def setUp (self): 773 self.handler = SocketlessRequestHandler() 774 775 def send_typical_request(self, message): 776 input = BytesIO(message) 777 output = BytesIO() 778 self.handler.rfile = input 779 self.handler.wfile = output 780 self.handler.handle_one_request() 781 output.seek(0) 782 return output.readlines() 783 784 def verify_get_called(self): 785 self.assertTrue(self.handler.get_called) 786 787 def verify_expected_headers(self, headers): 788 for fieldName in b'Server: ', b'Date: ', b'Content-Type: ': 789 self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1) 790 791 def verify_http_server_response(self, response): 792 match = self.HTTPResponseMatch.search(response) 793 self.assertIsNotNone(match) 794 795 def test_http_1_1(self): 796 result = self.send_typical_request(b'GET / HTTP/1.1\r\n\r\n') 797 self.verify_http_server_response(result[0]) 798 self.verify_expected_headers(result[1:-1]) 799 self.verify_get_called() 800 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 801 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 802 self.assertEqual(self.handler.command, 'GET') 803 self.assertEqual(self.handler.path, '/') 804 self.assertEqual(self.handler.request_version, 'HTTP/1.1') 805 self.assertSequenceEqual(self.handler.headers.items(), ()) 806 807 def test_http_1_0(self): 808 result = self.send_typical_request(b'GET / HTTP/1.0\r\n\r\n') 809 self.verify_http_server_response(result[0]) 810 self.verify_expected_headers(result[1:-1]) 811 self.verify_get_called() 812 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 813 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0') 814 self.assertEqual(self.handler.command, 'GET') 815 self.assertEqual(self.handler.path, '/') 816 self.assertEqual(self.handler.request_version, 'HTTP/1.0') 817 self.assertSequenceEqual(self.handler.headers.items(), ()) 818 819 def test_http_0_9(self): 820 result = self.send_typical_request(b'GET / HTTP/0.9\r\n\r\n') 821 self.assertEqual(len(result), 1) 822 self.assertEqual(result[0], b'<html><body>Data</body></html>\r\n') 823 self.verify_get_called() 824 825 def test_with_continue_1_0(self): 826 result = self.send_typical_request(b'GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n') 827 self.verify_http_server_response(result[0]) 828 self.verify_expected_headers(result[1:-1]) 829 self.verify_get_called() 830 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 831 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0') 832 self.assertEqual(self.handler.command, 'GET') 833 self.assertEqual(self.handler.path, '/') 834 self.assertEqual(self.handler.request_version, 'HTTP/1.0') 835 headers = (("Expect", "100-continue"),) 836 self.assertSequenceEqual(self.handler.headers.items(), headers) 837 838 def test_with_continue_1_1(self): 839 result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n') 840 self.assertEqual(result[0], b'HTTP/1.1 100 Continue\r\n') 841 self.assertEqual(result[1], b'\r\n') 842 self.assertEqual(result[2], b'HTTP/1.1 200 OK\r\n') 843 self.verify_expected_headers(result[2:-1]) 844 self.verify_get_called() 845 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 846 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 847 self.assertEqual(self.handler.command, 'GET') 848 self.assertEqual(self.handler.path, '/') 849 self.assertEqual(self.handler.request_version, 'HTTP/1.1') 850 headers = (("Expect", "100-continue"),) 851 self.assertSequenceEqual(self.handler.headers.items(), headers) 852 853 def test_header_buffering_of_send_error(self): 854 855 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') 856 output = AuditableBytesIO() 857 handler = SocketlessRequestHandler() 858 handler.rfile = input 859 handler.wfile = output 860 handler.request_version = 'HTTP/1.1' 861 handler.requestline = '' 862 handler.command = None 863 864 handler.send_error(418) 865 self.assertEqual(output.numWrites, 2) 866 867 def test_header_buffering_of_send_response_only(self): 868 869 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') 870 output = AuditableBytesIO() 871 handler = SocketlessRequestHandler() 872 handler.rfile = input 873 handler.wfile = output 874 handler.request_version = 'HTTP/1.1' 875 876 handler.send_response_only(418) 877 self.assertEqual(output.numWrites, 0) 878 handler.end_headers() 879 self.assertEqual(output.numWrites, 1) 880 881 def test_header_buffering_of_send_header(self): 882 883 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') 884 output = AuditableBytesIO() 885 handler = SocketlessRequestHandler() 886 handler.rfile = input 887 handler.wfile = output 888 handler.request_version = 'HTTP/1.1' 889 890 handler.send_header('Foo', 'foo') 891 handler.send_header('bar', 'bar') 892 self.assertEqual(output.numWrites, 0) 893 handler.end_headers() 894 self.assertEqual(output.getData(), b'Foo: foo\r\nbar: bar\r\n\r\n') 895 self.assertEqual(output.numWrites, 1) 896 897 def test_header_unbuffered_when_continue(self): 898 899 def _readAndReseek(f): 900 pos = f.tell() 901 f.seek(0) 902 data = f.read() 903 f.seek(pos) 904 return data 905 906 input = BytesIO(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n') 907 output = BytesIO() 908 self.handler.rfile = input 909 self.handler.wfile = output 910 self.handler.request_version = 'HTTP/1.1' 911 912 self.handler.handle_one_request() 913 self.assertNotEqual(_readAndReseek(output), b'') 914 result = _readAndReseek(output).split(b'\r\n') 915 self.assertEqual(result[0], b'HTTP/1.1 100 Continue') 916 self.assertEqual(result[1], b'') 917 self.assertEqual(result[2], b'HTTP/1.1 200 OK') 918 919 def test_with_continue_rejected(self): 920 usual_handler = self.handler # Save to avoid breaking any subsequent tests. 921 self.handler = RejectingSocketlessRequestHandler() 922 result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n') 923 self.assertEqual(result[0], b'HTTP/1.1 417 Expectation Failed\r\n') 924 self.verify_expected_headers(result[1:-1]) 925 # The expect handler should short circuit the usual get method by 926 # returning false here, so get_called should be false 927 self.assertFalse(self.handler.get_called) 928 self.assertEqual(sum(r == b'Connection: close\r\n' for r in result[1:-1]), 1) 929 self.handler = usual_handler # Restore to avoid breaking any subsequent tests. 930 931 def test_request_length(self): 932 # Issue #10714: huge request lines are discarded, to avoid Denial 933 # of Service attacks. 934 result = self.send_typical_request(b'GET ' + b'x' * 65537) 935 self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n') 936 self.assertFalse(self.handler.get_called) 937 self.assertIsInstance(self.handler.requestline, str) 938 939 def test_header_length(self): 940 # Issue #6791: same for headers 941 result = self.send_typical_request( 942 b'GET / HTTP/1.1\r\nX-Foo: bar' + b'r' * 65537 + b'\r\n\r\n') 943 self.assertEqual(result[0], b'HTTP/1.1 431 Line too long\r\n') 944 self.assertFalse(self.handler.get_called) 945 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 946 947 def test_too_many_headers(self): 948 result = self.send_typical_request( 949 b'GET / HTTP/1.1\r\n' + b'X-Foo: bar\r\n' * 101 + b'\r\n') 950 self.assertEqual(result[0], b'HTTP/1.1 431 Too many headers\r\n') 951 self.assertFalse(self.handler.get_called) 952 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 953 954 def test_html_escape_on_error(self): 955 result = self.send_typical_request( 956 b'<script>alert("hello")</script> / HTTP/1.1') 957 result = b''.join(result) 958 text = '<script>alert("hello")</script>' 959 self.assertIn(html.escape(text, quote=False).encode('ascii'), result) 960 961 def test_close_connection(self): 962 # handle_one_request() should be repeatedly called until 963 # it sets close_connection 964 def handle_one_request(): 965 self.handler.close_connection = next(close_values) 966 self.handler.handle_one_request = handle_one_request 967 968 close_values = iter((True,)) 969 self.handler.handle() 970 self.assertRaises(StopIteration, next, close_values) 971 972 close_values = iter((False, False, True)) 973 self.handler.handle() 974 self.assertRaises(StopIteration, next, close_values) 975 976 def test_date_time_string(self): 977 now = time.time() 978 # this is the old code that formats the timestamp 979 year, month, day, hh, mm, ss, wd, y, z = time.gmtime(now) 980 expected = "%s, %02d %3s %4d %02d:%02d:%02d GMT" % ( 981 self.handler.weekdayname[wd], 982 day, 983 self.handler.monthname[month], 984 year, hh, mm, ss 985 ) 986 self.assertEqual(self.handler.date_time_string(timestamp=now), expected) 987 988 989class SimpleHTTPRequestHandlerTestCase(unittest.TestCase): 990 """ Test url parsing """ 991 def setUp(self): 992 self.translated = os.getcwd() 993 self.translated = os.path.join(self.translated, 'filename') 994 self.handler = SocketlessRequestHandler() 995 996 def test_query_arguments(self): 997 path = self.handler.translate_path('/filename') 998 self.assertEqual(path, self.translated) 999 path = self.handler.translate_path('/filename?foo=bar') 1000 self.assertEqual(path, self.translated) 1001 path = self.handler.translate_path('/filename?a=b&spam=eggs#zot') 1002 self.assertEqual(path, self.translated) 1003 1004 def test_start_with_double_slash(self): 1005 path = self.handler.translate_path('//filename') 1006 self.assertEqual(path, self.translated) 1007 path = self.handler.translate_path('//filename?foo=bar') 1008 self.assertEqual(path, self.translated) 1009 1010 def test_windows_colon(self): 1011 with support.swap_attr(server.os, 'path', ntpath): 1012 path = self.handler.translate_path('c:c:c:foo/filename') 1013 path = path.replace(ntpath.sep, os.sep) 1014 self.assertEqual(path, self.translated) 1015 1016 path = self.handler.translate_path('\\c:../filename') 1017 path = path.replace(ntpath.sep, os.sep) 1018 self.assertEqual(path, self.translated) 1019 1020 path = self.handler.translate_path('c:\\c:..\\foo/filename') 1021 path = path.replace(ntpath.sep, os.sep) 1022 self.assertEqual(path, self.translated) 1023 1024 path = self.handler.translate_path('c:c:foo\\c:c:bar/filename') 1025 path = path.replace(ntpath.sep, os.sep) 1026 self.assertEqual(path, self.translated) 1027 1028 1029class MiscTestCase(unittest.TestCase): 1030 def test_all(self): 1031 expected = [] 1032 blacklist = {'executable', 'nobody_uid', 'test'} 1033 for name in dir(server): 1034 if name.startswith('_') or name in blacklist: 1035 continue 1036 module_object = getattr(server, name) 1037 if getattr(module_object, '__module__', None) == 'http.server': 1038 expected.append(name) 1039 self.assertCountEqual(server.__all__, expected) 1040 1041 1042def test_main(verbose=None): 1043 cwd = os.getcwd() 1044 try: 1045 support.run_unittest( 1046 RequestHandlerLoggingTestCase, 1047 BaseHTTPRequestHandlerTestCase, 1048 BaseHTTPServerTestCase, 1049 SimpleHTTPServerTestCase, 1050 CGIHTTPServerTestCase, 1051 SimpleHTTPRequestHandlerTestCase, 1052 MiscTestCase, 1053 ) 1054 finally: 1055 os.chdir(cwd) 1056 1057if __name__ == '__main__': 1058 test_main() 1059