1"""Unittests for the various HTTPServer modules.
2
3Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>,
4Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.
5"""
6
7from http.server import BaseHTTPRequestHandler, HTTPServer, \
8     SimpleHTTPRequestHandler, CGIHTTPRequestHandler
9from http import server, HTTPStatus
10
11import os
12import sys
13import re
14import base64
15import ntpath
16import shutil
17import urllib.parse
18import html
19import http.client
20import tempfile
21import time
22from io import BytesIO
23
24import unittest
25from test import support
26threading = support.import_module('threading')
27
28class NoLogRequestHandler:
29    def log_message(self, *args):
30        # don't write log messages to stderr
31        pass
32
33    def read(self, n=None):
34        return ''
35
36
37class TestServerThread(threading.Thread):
38    def __init__(self, test_object, request_handler):
39        threading.Thread.__init__(self)
40        self.request_handler = request_handler
41        self.test_object = test_object
42
43    def run(self):
44        self.server = HTTPServer(('localhost', 0), self.request_handler)
45        self.test_object.HOST, self.test_object.PORT = self.server.socket.getsockname()
46        self.test_object.server_started.set()
47        self.test_object = None
48        try:
49            self.server.serve_forever(0.05)
50        finally:
51            self.server.server_close()
52
53    def stop(self):
54        self.server.shutdown()
55
56
57class BaseTestCase(unittest.TestCase):
58    def setUp(self):
59        self._threads = support.threading_setup()
60        os.environ = support.EnvironmentVarGuard()
61        self.server_started = threading.Event()
62        self.thread = TestServerThread(self, self.request_handler)
63        self.thread.start()
64        self.server_started.wait()
65
66    def tearDown(self):
67        self.thread.stop()
68        self.thread = None
69        os.environ.__exit__()
70        support.threading_cleanup(*self._threads)
71
72    def request(self, uri, method='GET', body=None, headers={}):
73        self.connection = http.client.HTTPConnection(self.HOST, self.PORT)
74        self.connection.request(method, uri, body, headers)
75        return self.connection.getresponse()
76
77
78class BaseHTTPServerTestCase(BaseTestCase):
79    class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):
80        protocol_version = 'HTTP/1.1'
81        default_request_version = 'HTTP/1.1'
82
83        def do_TEST(self):
84            self.send_response(HTTPStatus.NO_CONTENT)
85            self.send_header('Content-Type', 'text/html')
86            self.send_header('Connection', 'close')
87            self.end_headers()
88
89        def do_KEEP(self):
90            self.send_response(HTTPStatus.NO_CONTENT)
91            self.send_header('Content-Type', 'text/html')
92            self.send_header('Connection', 'keep-alive')
93            self.end_headers()
94
95        def do_KEYERROR(self):
96            self.send_error(999)
97
98        def do_NOTFOUND(self):
99            self.send_error(HTTPStatus.NOT_FOUND)
100
101        def do_EXPLAINERROR(self):
102            self.send_error(999, "Short Message",
103                            "This is a long \n explanation")
104
105        def do_CUSTOM(self):
106            self.send_response(999)
107            self.send_header('Content-Type', 'text/html')
108            self.send_header('Connection', 'close')
109            self.end_headers()
110
111        def do_LATINONEHEADER(self):
112            self.send_response(999)
113            self.send_header('X-Special', 'Dängerous Mind')
114            self.send_header('Connection', 'close')
115            self.end_headers()
116            body = self.headers['x-special-incoming'].encode('utf-8')
117            self.wfile.write(body)
118
119        def do_SEND_ERROR(self):
120            self.send_error(int(self.path[1:]))
121
122        def do_HEAD(self):
123            self.send_error(int(self.path[1:]))
124
125    def setUp(self):
126        BaseTestCase.setUp(self)
127        self.con = http.client.HTTPConnection(self.HOST, self.PORT)
128        self.con.connect()
129
130    def test_command(self):
131        self.con.request('GET', '/')
132        res = self.con.getresponse()
133        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
134
135    def test_request_line_trimming(self):
136        self.con._http_vsn_str = 'HTTP/1.1\n'
137        self.con.putrequest('XYZBOGUS', '/')
138        self.con.endheaders()
139        res = self.con.getresponse()
140        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
141
142    def test_version_bogus(self):
143        self.con._http_vsn_str = 'FUBAR'
144        self.con.putrequest('GET', '/')
145        self.con.endheaders()
146        res = self.con.getresponse()
147        self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
148
149    def test_version_digits(self):
150        self.con._http_vsn_str = 'HTTP/9.9.9'
151        self.con.putrequest('GET', '/')
152        self.con.endheaders()
153        res = self.con.getresponse()
154        self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
155
156    def test_version_none_get(self):
157        self.con._http_vsn_str = ''
158        self.con.putrequest('GET', '/')
159        self.con.endheaders()
160        res = self.con.getresponse()
161        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
162
163    def test_version_none(self):
164        # Test that a valid method is rejected when not HTTP/1.x
165        self.con._http_vsn_str = ''
166        self.con.putrequest('CUSTOM', '/')
167        self.con.endheaders()
168        res = self.con.getresponse()
169        self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
170
171    def test_version_invalid(self):
172        self.con._http_vsn = 99
173        self.con._http_vsn_str = 'HTTP/9.9'
174        self.con.putrequest('GET', '/')
175        self.con.endheaders()
176        res = self.con.getresponse()
177        self.assertEqual(res.status, HTTPStatus.HTTP_VERSION_NOT_SUPPORTED)
178
179    def test_send_blank(self):
180        self.con._http_vsn_str = ''
181        self.con.putrequest('', '')
182        self.con.endheaders()
183        res = self.con.getresponse()
184        self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
185
186    def test_header_close(self):
187        self.con.putrequest('GET', '/')
188        self.con.putheader('Connection', 'close')
189        self.con.endheaders()
190        res = self.con.getresponse()
191        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
192
193    def test_header_keep_alive(self):
194        self.con._http_vsn_str = 'HTTP/1.1'
195        self.con.putrequest('GET', '/')
196        self.con.putheader('Connection', 'keep-alive')
197        self.con.endheaders()
198        res = self.con.getresponse()
199        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
200
201    def test_handler(self):
202        self.con.request('TEST', '/')
203        res = self.con.getresponse()
204        self.assertEqual(res.status, HTTPStatus.NO_CONTENT)
205
206    def test_return_header_keep_alive(self):
207        self.con.request('KEEP', '/')
208        res = self.con.getresponse()
209        self.assertEqual(res.getheader('Connection'), 'keep-alive')
210        self.con.request('TEST', '/')
211        self.addCleanup(self.con.close)
212
213    def test_internal_key_error(self):
214        self.con.request('KEYERROR', '/')
215        res = self.con.getresponse()
216        self.assertEqual(res.status, 999)
217
218    def test_return_custom_status(self):
219        self.con.request('CUSTOM', '/')
220        res = self.con.getresponse()
221        self.assertEqual(res.status, 999)
222
223    def test_return_explain_error(self):
224        self.con.request('EXPLAINERROR', '/')
225        res = self.con.getresponse()
226        self.assertEqual(res.status, 999)
227        self.assertTrue(int(res.getheader('Content-Length')))
228
229    def test_latin1_header(self):
230        self.con.request('LATINONEHEADER', '/', headers={
231            'X-Special-Incoming':       'Ärger mit Unicode'
232        })
233        res = self.con.getresponse()
234        self.assertEqual(res.getheader('X-Special'), 'Dängerous Mind')
235        self.assertEqual(res.read(), 'Ärger mit Unicode'.encode('utf-8'))
236
237    def test_error_content_length(self):
238        # Issue #16088: standard error responses should have a content-length
239        self.con.request('NOTFOUND', '/')
240        res = self.con.getresponse()
241        self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
242
243        data = res.read()
244        self.assertEqual(int(res.getheader('Content-Length')), len(data))
245
246    def test_send_error(self):
247        allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED,
248                                         HTTPStatus.RESET_CONTENT)
249        for code in (HTTPStatus.NO_CONTENT, HTTPStatus.NOT_MODIFIED,
250                     HTTPStatus.PROCESSING, HTTPStatus.RESET_CONTENT,
251                     HTTPStatus.SWITCHING_PROTOCOLS):
252            self.con.request('SEND_ERROR', '/{}'.format(code))
253            res = self.con.getresponse()
254            self.assertEqual(code, res.status)
255            self.assertEqual(None, res.getheader('Content-Length'))
256            self.assertEqual(None, res.getheader('Content-Type'))
257            if code not in allow_transfer_encoding_codes:
258                self.assertEqual(None, res.getheader('Transfer-Encoding'))
259
260            data = res.read()
261            self.assertEqual(b'', data)
262
263    def test_head_via_send_error(self):
264        allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED,
265                                         HTTPStatus.RESET_CONTENT)
266        for code in (HTTPStatus.OK, HTTPStatus.NO_CONTENT,
267                     HTTPStatus.NOT_MODIFIED, HTTPStatus.RESET_CONTENT,
268                     HTTPStatus.SWITCHING_PROTOCOLS):
269            self.con.request('HEAD', '/{}'.format(code))
270            res = self.con.getresponse()
271            self.assertEqual(code, res.status)
272            if code == HTTPStatus.OK:
273                self.assertTrue(int(res.getheader('Content-Length')) > 0)
274                self.assertIn('text/html', res.getheader('Content-Type'))
275            else:
276                self.assertEqual(None, res.getheader('Content-Length'))
277                self.assertEqual(None, res.getheader('Content-Type'))
278            if code not in allow_transfer_encoding_codes:
279                self.assertEqual(None, res.getheader('Transfer-Encoding'))
280
281            data = res.read()
282            self.assertEqual(b'', data)
283
284
285class RequestHandlerLoggingTestCase(BaseTestCase):
286    class request_handler(BaseHTTPRequestHandler):
287        protocol_version = 'HTTP/1.1'
288        default_request_version = 'HTTP/1.1'
289
290        def do_GET(self):
291            self.send_response(HTTPStatus.OK)
292            self.end_headers()
293
294        def do_ERROR(self):
295            self.send_error(HTTPStatus.NOT_FOUND, 'File not found')
296
297    def test_get(self):
298        self.con = http.client.HTTPConnection(self.HOST, self.PORT)
299        self.con.connect()
300
301        with support.captured_stderr() as err:
302            self.con.request('GET', '/')
303            self.con.getresponse()
304
305        self.assertTrue(
306            err.getvalue().endswith('"GET / HTTP/1.1" 200 -\n'))
307
308    def test_err(self):
309        self.con = http.client.HTTPConnection(self.HOST, self.PORT)
310        self.con.connect()
311
312        with support.captured_stderr() as err:
313            self.con.request('ERROR', '/')
314            self.con.getresponse()
315
316        lines = err.getvalue().split('\n')
317        self.assertTrue(lines[0].endswith('code 404, message File not found'))
318        self.assertTrue(lines[1].endswith('"ERROR / HTTP/1.1" 404 -'))
319
320
321class SimpleHTTPServerTestCase(BaseTestCase):
322    class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler):
323        pass
324
325    def setUp(self):
326        BaseTestCase.setUp(self)
327        self.cwd = os.getcwd()
328        basetempdir = tempfile.gettempdir()
329        os.chdir(basetempdir)
330        self.data = b'We are the knights who say Ni!'
331        self.tempdir = tempfile.mkdtemp(dir=basetempdir)
332        self.tempdir_name = os.path.basename(self.tempdir)
333        self.base_url = '/' + self.tempdir_name
334        with open(os.path.join(self.tempdir, 'test'), 'wb') as temp:
335            temp.write(self.data)
336
337    def tearDown(self):
338        try:
339            os.chdir(self.cwd)
340            try:
341                shutil.rmtree(self.tempdir)
342            except:
343                pass
344        finally:
345            BaseTestCase.tearDown(self)
346
347    def check_status_and_reason(self, response, status, data=None):
348        def close_conn():
349            """Don't close reader yet so we can check if there was leftover
350            buffered input"""
351            nonlocal reader
352            reader = response.fp
353            response.fp = None
354        reader = None
355        response._close_conn = close_conn
356
357        body = response.read()
358        self.assertTrue(response)
359        self.assertEqual(response.status, status)
360        self.assertIsNotNone(response.reason)
361        if data:
362            self.assertEqual(data, body)
363        # Ensure the server has not set up a persistent connection, and has
364        # not sent any extra data
365        self.assertEqual(response.version, 10)
366        self.assertEqual(response.msg.get("Connection", "close"), "close")
367        self.assertEqual(reader.read(30), b'', 'Connection should be closed')
368
369        reader.close()
370        return body
371
372    @support.requires_mac_ver(10, 5)
373    @unittest.skipIf(sys.platform == 'win32',
374                     'undecodable name cannot be decoded on win32')
375    @unittest.skipUnless(support.TESTFN_UNDECODABLE,
376                         'need support.TESTFN_UNDECODABLE')
377    def test_undecodable_filename(self):
378        enc = sys.getfilesystemencoding()
379        filename = os.fsdecode(support.TESTFN_UNDECODABLE) + '.txt'
380        with open(os.path.join(self.tempdir, filename), 'wb') as f:
381            f.write(support.TESTFN_UNDECODABLE)
382        response = self.request(self.base_url + '/')
383        if sys.platform == 'darwin':
384            # On Mac OS the HFS+ filesystem replaces bytes that aren't valid
385            # UTF-8 into a percent-encoded value.
386            for name in os.listdir(self.tempdir):
387                if name != 'test': # Ignore a filename created in setUp().
388                    filename = name
389                    break
390        body = self.check_status_and_reason(response, HTTPStatus.OK)
391        quotedname = urllib.parse.quote(filename, errors='surrogatepass')
392        self.assertIn(('href="%s"' % quotedname)
393                      .encode(enc, 'surrogateescape'), body)
394        self.assertIn(('>%s<' % html.escape(filename, quote=False))
395                      .encode(enc, 'surrogateescape'), body)
396        response = self.request(self.base_url + '/' + quotedname)
397        self.check_status_and_reason(response, HTTPStatus.OK,
398                                     data=support.TESTFN_UNDECODABLE)
399
400    def test_get(self):
401        #constructs the path relative to the root directory of the HTTPServer
402        response = self.request(self.base_url + '/test')
403        self.check_status_and_reason(response, HTTPStatus.OK, data=self.data)
404        # check for trailing "/" which should return 404. See Issue17324
405        response = self.request(self.base_url + '/test/')
406        self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
407        response = self.request(self.base_url + '/')
408        self.check_status_and_reason(response, HTTPStatus.OK)
409        response = self.request(self.base_url)
410        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
411        response = self.request(self.base_url + '/?hi=2')
412        self.check_status_and_reason(response, HTTPStatus.OK)
413        response = self.request(self.base_url + '?hi=1')
414        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
415        self.assertEqual(response.getheader("Location"),
416                         self.base_url + "/?hi=1")
417        response = self.request('/ThisDoesNotExist')
418        self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
419        response = self.request('/' + 'ThisDoesNotExist' + '/')
420        self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
421
422        data = b"Dummy index file\r\n"
423        with open(os.path.join(self.tempdir_name, 'index.html'), 'wb') as f:
424            f.write(data)
425        response = self.request(self.base_url + '/')
426        self.check_status_and_reason(response, HTTPStatus.OK, data)
427
428        # chmod() doesn't work as expected on Windows, and filesystem
429        # permissions are ignored by root on Unix.
430        if os.name == 'posix' and os.geteuid() != 0:
431            os.chmod(self.tempdir, 0)
432            try:
433                response = self.request(self.base_url + '/')
434                self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
435            finally:
436                os.chmod(self.tempdir, 0o755)
437
438    def test_head(self):
439        response = self.request(
440            self.base_url + '/test', method='HEAD')
441        self.check_status_and_reason(response, HTTPStatus.OK)
442        self.assertEqual(response.getheader('content-length'),
443                         str(len(self.data)))
444        self.assertEqual(response.getheader('content-type'),
445                         'application/octet-stream')
446
447    def test_invalid_requests(self):
448        response = self.request('/', method='FOO')
449        self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
450        # requests must be case sensitive,so this should fail too
451        response = self.request('/', method='custom')
452        self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
453        response = self.request('/', method='GETs')
454        self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
455
456    def test_path_without_leading_slash(self):
457        response = self.request(self.tempdir_name + '/test')
458        self.check_status_and_reason(response, HTTPStatus.OK, data=self.data)
459        response = self.request(self.tempdir_name + '/test/')
460        self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
461        response = self.request(self.tempdir_name + '/')
462        self.check_status_and_reason(response, HTTPStatus.OK)
463        response = self.request(self.tempdir_name)
464        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
465        response = self.request(self.tempdir_name + '/?hi=2')
466        self.check_status_and_reason(response, HTTPStatus.OK)
467        response = self.request(self.tempdir_name + '?hi=1')
468        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
469        self.assertEqual(response.getheader("Location"),
470                         self.tempdir_name + "/?hi=1")
471
472    def test_html_escape_filename(self):
473        filename = '<test&>.txt'
474        fullpath = os.path.join(self.tempdir, filename)
475
476        try:
477            open(fullpath, 'w').close()
478        except OSError:
479            raise unittest.SkipTest('Can not create file %s on current file '
480                                    'system' % filename)
481
482        try:
483            response = self.request(self.base_url + '/')
484            body = self.check_status_and_reason(response, HTTPStatus.OK)
485            enc = response.headers.get_content_charset()
486        finally:
487            os.unlink(fullpath)  # avoid affecting test_undecodable_filename
488
489        self.assertIsNotNone(enc)
490        html_text = '>%s<' % html.escape(filename, quote=False)
491        self.assertIn(html_text.encode(enc), body)
492
493
494cgi_file1 = """\
495#!%s
496
497print("Content-type: text/html")
498print()
499print("Hello World")
500"""
501
502cgi_file2 = """\
503#!%s
504import cgi
505
506print("Content-type: text/html")
507print()
508
509form = cgi.FieldStorage()
510print("%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"),
511                          form.getfirst("bacon")))
512"""
513
514cgi_file4 = """\
515#!%s
516import os
517
518print("Content-type: text/html")
519print()
520
521print(os.environ["%s"])
522"""
523
524
525@unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
526        "This test can't be run reliably as root (issue #13308).")
527class CGIHTTPServerTestCase(BaseTestCase):
528    class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler):
529        pass
530
531    linesep = os.linesep.encode('ascii')
532
533    def setUp(self):
534        BaseTestCase.setUp(self)
535        self.cwd = os.getcwd()
536        self.parent_dir = tempfile.mkdtemp()
537        self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin')
538        self.cgi_child_dir = os.path.join(self.cgi_dir, 'child-dir')
539        os.mkdir(self.cgi_dir)
540        os.mkdir(self.cgi_child_dir)
541        self.nocgi_path = None
542        self.file1_path = None
543        self.file2_path = None
544        self.file3_path = None
545        self.file4_path = None
546
547        # The shebang line should be pure ASCII: use symlink if possible.
548        # See issue #7668.
549        if support.can_symlink():
550            self.pythonexe = os.path.join(self.parent_dir, 'python')
551            os.symlink(sys.executable, self.pythonexe)
552        else:
553            self.pythonexe = sys.executable
554
555        try:
556            # The python executable path is written as the first line of the
557            # CGI Python script. The encoding cookie cannot be used, and so the
558            # path should be encodable to the default script encoding (utf-8)
559            self.pythonexe.encode('utf-8')
560        except UnicodeEncodeError:
561            self.tearDown()
562            self.skipTest("Python executable path is not encodable to utf-8")
563
564        self.nocgi_path = os.path.join(self.parent_dir, 'nocgi.py')
565        with open(self.nocgi_path, 'w') as fp:
566            fp.write(cgi_file1 % self.pythonexe)
567        os.chmod(self.nocgi_path, 0o777)
568
569        self.file1_path = os.path.join(self.cgi_dir, 'file1.py')
570        with open(self.file1_path, 'w', encoding='utf-8') as file1:
571            file1.write(cgi_file1 % self.pythonexe)
572        os.chmod(self.file1_path, 0o777)
573
574        self.file2_path = os.path.join(self.cgi_dir, 'file2.py')
575        with open(self.file2_path, 'w', encoding='utf-8') as file2:
576            file2.write(cgi_file2 % self.pythonexe)
577        os.chmod(self.file2_path, 0o777)
578
579        self.file3_path = os.path.join(self.cgi_child_dir, 'file3.py')
580        with open(self.file3_path, 'w', encoding='utf-8') as file3:
581            file3.write(cgi_file1 % self.pythonexe)
582        os.chmod(self.file3_path, 0o777)
583
584        self.file4_path = os.path.join(self.cgi_dir, 'file4.py')
585        with open(self.file4_path, 'w', encoding='utf-8') as file4:
586            file4.write(cgi_file4 % (self.pythonexe, 'QUERY_STRING'))
587        os.chmod(self.file4_path, 0o777)
588
589        os.chdir(self.parent_dir)
590
591    def tearDown(self):
592        try:
593            os.chdir(self.cwd)
594            if self.pythonexe != sys.executable:
595                os.remove(self.pythonexe)
596            if self.nocgi_path:
597                os.remove(self.nocgi_path)
598            if self.file1_path:
599                os.remove(self.file1_path)
600            if self.file2_path:
601                os.remove(self.file2_path)
602            if self.file3_path:
603                os.remove(self.file3_path)
604            if self.file4_path:
605                os.remove(self.file4_path)
606            os.rmdir(self.cgi_child_dir)
607            os.rmdir(self.cgi_dir)
608            os.rmdir(self.parent_dir)
609        finally:
610            BaseTestCase.tearDown(self)
611
612    def test_url_collapse_path(self):
613        # verify tail is the last portion and head is the rest on proper urls
614        test_vectors = {
615            '': '//',
616            '..': IndexError,
617            '/.//..': IndexError,
618            '/': '//',
619            '//': '//',
620            '/\\': '//\\',
621            '/.//': '//',
622            'cgi-bin/file1.py': '/cgi-bin/file1.py',
623            '/cgi-bin/file1.py': '/cgi-bin/file1.py',
624            'a': '//a',
625            '/a': '//a',
626            '//a': '//a',
627            './a': '//a',
628            './C:/': '/C:/',
629            '/a/b': '/a/b',
630            '/a/b/': '/a/b/',
631            '/a/b/.': '/a/b/',
632            '/a/b/c/..': '/a/b/',
633            '/a/b/c/../d': '/a/b/d',
634            '/a/b/c/../d/e/../f': '/a/b/d/f',
635            '/a/b/c/../d/e/../../f': '/a/b/f',
636            '/a/b/c/../d/e/.././././..//f': '/a/b/f',
637            '../a/b/c/../d/e/.././././..//f': IndexError,
638            '/a/b/c/../d/e/../../../f': '/a/f',
639            '/a/b/c/../d/e/../../../../f': '//f',
640            '/a/b/c/../d/e/../../../../../f': IndexError,
641            '/a/b/c/../d/e/../../../../f/..': '//',
642            '/a/b/c/../d/e/../../../../f/../.': '//',
643        }
644        for path, expected in test_vectors.items():
645            if isinstance(expected, type) and issubclass(expected, Exception):
646                self.assertRaises(expected,
647                                  server._url_collapse_path, path)
648            else:
649                actual = server._url_collapse_path(path)
650                self.assertEqual(expected, actual,
651                                 msg='path = %r\nGot:    %r\nWanted: %r' %
652                                 (path, actual, expected))
653
654    def test_headers_and_content(self):
655        res = self.request('/cgi-bin/file1.py')
656        self.assertEqual(
657            (res.read(), res.getheader('Content-type'), res.status),
658            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK))
659
660    def test_issue19435(self):
661        res = self.request('///////////nocgi.py/../cgi-bin/nothere.sh')
662        self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
663
664    def test_post(self):
665        params = urllib.parse.urlencode(
666            {'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})
667        headers = {'Content-type' : 'application/x-www-form-urlencoded'}
668        res = self.request('/cgi-bin/file2.py', 'POST', params, headers)
669
670        self.assertEqual(res.read(), b'1, python, 123456' + self.linesep)
671
672    def test_invaliduri(self):
673        res = self.request('/cgi-bin/invalid')
674        res.read()
675        self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
676
677    def test_authorization(self):
678        headers = {b'Authorization' : b'Basic ' +
679                   base64.b64encode(b'username:pass')}
680        res = self.request('/cgi-bin/file1.py', 'GET', headers=headers)
681        self.assertEqual(
682            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
683            (res.read(), res.getheader('Content-type'), res.status))
684
685    def test_no_leading_slash(self):
686        # http://bugs.python.org/issue2254
687        res = self.request('cgi-bin/file1.py')
688        self.assertEqual(
689            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
690            (res.read(), res.getheader('Content-type'), res.status))
691
692    def test_os_environ_is_not_altered(self):
693        signature = "Test CGI Server"
694        os.environ['SERVER_SOFTWARE'] = signature
695        res = self.request('/cgi-bin/file1.py')
696        self.assertEqual(
697            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
698            (res.read(), res.getheader('Content-type'), res.status))
699        self.assertEqual(os.environ['SERVER_SOFTWARE'], signature)
700
701    def test_urlquote_decoding_in_cgi_check(self):
702        res = self.request('/cgi-bin%2ffile1.py')
703        self.assertEqual(
704            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
705            (res.read(), res.getheader('Content-type'), res.status))
706
707    def test_nested_cgi_path_issue21323(self):
708        res = self.request('/cgi-bin/child-dir/file3.py')
709        self.assertEqual(
710            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
711            (res.read(), res.getheader('Content-type'), res.status))
712
713    def test_query_with_multiple_question_mark(self):
714        res = self.request('/cgi-bin/file4.py?a=b?c=d')
715        self.assertEqual(
716            (b'a=b?c=d' + self.linesep, 'text/html', HTTPStatus.OK),
717            (res.read(), res.getheader('Content-type'), res.status))
718
719    def test_query_with_continuous_slashes(self):
720        res = self.request('/cgi-bin/file4.py?k=aa%2F%2Fbb&//q//p//=//a//b//')
721        self.assertEqual(
722            (b'k=aa%2F%2Fbb&//q//p//=//a//b//' + self.linesep,
723             'text/html', HTTPStatus.OK),
724            (res.read(), res.getheader('Content-type'), res.status))
725
726
727class SocketlessRequestHandler(SimpleHTTPRequestHandler):
728    def __init__(self):
729        self.get_called = False
730        self.protocol_version = "HTTP/1.1"
731
732    def do_GET(self):
733        self.get_called = True
734        self.send_response(HTTPStatus.OK)
735        self.send_header('Content-Type', 'text/html')
736        self.end_headers()
737        self.wfile.write(b'<html><body>Data</body></html>\r\n')
738
739    def log_message(self, format, *args):
740        pass
741
742class RejectingSocketlessRequestHandler(SocketlessRequestHandler):
743    def handle_expect_100(self):
744        self.send_error(HTTPStatus.EXPECTATION_FAILED)
745        return False
746
747
748class AuditableBytesIO:
749
750    def __init__(self):
751        self.datas = []
752
753    def write(self, data):
754        self.datas.append(data)
755
756    def getData(self):
757        return b''.join(self.datas)
758
759    @property
760    def numWrites(self):
761        return len(self.datas)
762
763
764class BaseHTTPRequestHandlerTestCase(unittest.TestCase):
765    """Test the functionality of the BaseHTTPServer.
766
767       Test the support for the Expect 100-continue header.
768       """
769
770    HTTPResponseMatch = re.compile(b'HTTP/1.[0-9]+ 200 OK')
771
772    def setUp (self):
773        self.handler = SocketlessRequestHandler()
774
775    def send_typical_request(self, message):
776        input = BytesIO(message)
777        output = BytesIO()
778        self.handler.rfile = input
779        self.handler.wfile = output
780        self.handler.handle_one_request()
781        output.seek(0)
782        return output.readlines()
783
784    def verify_get_called(self):
785        self.assertTrue(self.handler.get_called)
786
787    def verify_expected_headers(self, headers):
788        for fieldName in b'Server: ', b'Date: ', b'Content-Type: ':
789            self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1)
790
791    def verify_http_server_response(self, response):
792        match = self.HTTPResponseMatch.search(response)
793        self.assertIsNotNone(match)
794
795    def test_http_1_1(self):
796        result = self.send_typical_request(b'GET / HTTP/1.1\r\n\r\n')
797        self.verify_http_server_response(result[0])
798        self.verify_expected_headers(result[1:-1])
799        self.verify_get_called()
800        self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
801        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
802        self.assertEqual(self.handler.command, 'GET')
803        self.assertEqual(self.handler.path, '/')
804        self.assertEqual(self.handler.request_version, 'HTTP/1.1')
805        self.assertSequenceEqual(self.handler.headers.items(), ())
806
807    def test_http_1_0(self):
808        result = self.send_typical_request(b'GET / HTTP/1.0\r\n\r\n')
809        self.verify_http_server_response(result[0])
810        self.verify_expected_headers(result[1:-1])
811        self.verify_get_called()
812        self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
813        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0')
814        self.assertEqual(self.handler.command, 'GET')
815        self.assertEqual(self.handler.path, '/')
816        self.assertEqual(self.handler.request_version, 'HTTP/1.0')
817        self.assertSequenceEqual(self.handler.headers.items(), ())
818
819    def test_http_0_9(self):
820        result = self.send_typical_request(b'GET / HTTP/0.9\r\n\r\n')
821        self.assertEqual(len(result), 1)
822        self.assertEqual(result[0], b'<html><body>Data</body></html>\r\n')
823        self.verify_get_called()
824
825    def test_with_continue_1_0(self):
826        result = self.send_typical_request(b'GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n')
827        self.verify_http_server_response(result[0])
828        self.verify_expected_headers(result[1:-1])
829        self.verify_get_called()
830        self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
831        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0')
832        self.assertEqual(self.handler.command, 'GET')
833        self.assertEqual(self.handler.path, '/')
834        self.assertEqual(self.handler.request_version, 'HTTP/1.0')
835        headers = (("Expect", "100-continue"),)
836        self.assertSequenceEqual(self.handler.headers.items(), headers)
837
838    def test_with_continue_1_1(self):
839        result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
840        self.assertEqual(result[0], b'HTTP/1.1 100 Continue\r\n')
841        self.assertEqual(result[1], b'\r\n')
842        self.assertEqual(result[2], b'HTTP/1.1 200 OK\r\n')
843        self.verify_expected_headers(result[2:-1])
844        self.verify_get_called()
845        self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
846        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
847        self.assertEqual(self.handler.command, 'GET')
848        self.assertEqual(self.handler.path, '/')
849        self.assertEqual(self.handler.request_version, 'HTTP/1.1')
850        headers = (("Expect", "100-continue"),)
851        self.assertSequenceEqual(self.handler.headers.items(), headers)
852
853    def test_header_buffering_of_send_error(self):
854
855        input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
856        output = AuditableBytesIO()
857        handler = SocketlessRequestHandler()
858        handler.rfile = input
859        handler.wfile = output
860        handler.request_version = 'HTTP/1.1'
861        handler.requestline = ''
862        handler.command = None
863
864        handler.send_error(418)
865        self.assertEqual(output.numWrites, 2)
866
867    def test_header_buffering_of_send_response_only(self):
868
869        input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
870        output = AuditableBytesIO()
871        handler = SocketlessRequestHandler()
872        handler.rfile = input
873        handler.wfile = output
874        handler.request_version = 'HTTP/1.1'
875
876        handler.send_response_only(418)
877        self.assertEqual(output.numWrites, 0)
878        handler.end_headers()
879        self.assertEqual(output.numWrites, 1)
880
881    def test_header_buffering_of_send_header(self):
882
883        input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
884        output = AuditableBytesIO()
885        handler = SocketlessRequestHandler()
886        handler.rfile = input
887        handler.wfile = output
888        handler.request_version = 'HTTP/1.1'
889
890        handler.send_header('Foo', 'foo')
891        handler.send_header('bar', 'bar')
892        self.assertEqual(output.numWrites, 0)
893        handler.end_headers()
894        self.assertEqual(output.getData(), b'Foo: foo\r\nbar: bar\r\n\r\n')
895        self.assertEqual(output.numWrites, 1)
896
897    def test_header_unbuffered_when_continue(self):
898
899        def _readAndReseek(f):
900            pos = f.tell()
901            f.seek(0)
902            data = f.read()
903            f.seek(pos)
904            return data
905
906        input = BytesIO(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
907        output = BytesIO()
908        self.handler.rfile = input
909        self.handler.wfile = output
910        self.handler.request_version = 'HTTP/1.1'
911
912        self.handler.handle_one_request()
913        self.assertNotEqual(_readAndReseek(output), b'')
914        result = _readAndReseek(output).split(b'\r\n')
915        self.assertEqual(result[0], b'HTTP/1.1 100 Continue')
916        self.assertEqual(result[1], b'')
917        self.assertEqual(result[2], b'HTTP/1.1 200 OK')
918
919    def test_with_continue_rejected(self):
920        usual_handler = self.handler        # Save to avoid breaking any subsequent tests.
921        self.handler = RejectingSocketlessRequestHandler()
922        result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
923        self.assertEqual(result[0], b'HTTP/1.1 417 Expectation Failed\r\n')
924        self.verify_expected_headers(result[1:-1])
925        # The expect handler should short circuit the usual get method by
926        # returning false here, so get_called should be false
927        self.assertFalse(self.handler.get_called)
928        self.assertEqual(sum(r == b'Connection: close\r\n' for r in result[1:-1]), 1)
929        self.handler = usual_handler        # Restore to avoid breaking any subsequent tests.
930
931    def test_request_length(self):
932        # Issue #10714: huge request lines are discarded, to avoid Denial
933        # of Service attacks.
934        result = self.send_typical_request(b'GET ' + b'x' * 65537)
935        self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n')
936        self.assertFalse(self.handler.get_called)
937        self.assertIsInstance(self.handler.requestline, str)
938
939    def test_header_length(self):
940        # Issue #6791: same for headers
941        result = self.send_typical_request(
942            b'GET / HTTP/1.1\r\nX-Foo: bar' + b'r' * 65537 + b'\r\n\r\n')
943        self.assertEqual(result[0], b'HTTP/1.1 431 Line too long\r\n')
944        self.assertFalse(self.handler.get_called)
945        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
946
947    def test_too_many_headers(self):
948        result = self.send_typical_request(
949            b'GET / HTTP/1.1\r\n' + b'X-Foo: bar\r\n' * 101 + b'\r\n')
950        self.assertEqual(result[0], b'HTTP/1.1 431 Too many headers\r\n')
951        self.assertFalse(self.handler.get_called)
952        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
953
954    def test_html_escape_on_error(self):
955        result = self.send_typical_request(
956            b'<script>alert("hello")</script> / HTTP/1.1')
957        result = b''.join(result)
958        text = '<script>alert("hello")</script>'
959        self.assertIn(html.escape(text, quote=False).encode('ascii'), result)
960
961    def test_close_connection(self):
962        # handle_one_request() should be repeatedly called until
963        # it sets close_connection
964        def handle_one_request():
965            self.handler.close_connection = next(close_values)
966        self.handler.handle_one_request = handle_one_request
967
968        close_values = iter((True,))
969        self.handler.handle()
970        self.assertRaises(StopIteration, next, close_values)
971
972        close_values = iter((False, False, True))
973        self.handler.handle()
974        self.assertRaises(StopIteration, next, close_values)
975
976    def test_date_time_string(self):
977        now = time.time()
978        # this is the old code that formats the timestamp
979        year, month, day, hh, mm, ss, wd, y, z = time.gmtime(now)
980        expected = "%s, %02d %3s %4d %02d:%02d:%02d GMT" % (
981            self.handler.weekdayname[wd],
982            day,
983            self.handler.monthname[month],
984            year, hh, mm, ss
985        )
986        self.assertEqual(self.handler.date_time_string(timestamp=now), expected)
987
988
989class SimpleHTTPRequestHandlerTestCase(unittest.TestCase):
990    """ Test url parsing """
991    def setUp(self):
992        self.translated = os.getcwd()
993        self.translated = os.path.join(self.translated, 'filename')
994        self.handler = SocketlessRequestHandler()
995
996    def test_query_arguments(self):
997        path = self.handler.translate_path('/filename')
998        self.assertEqual(path, self.translated)
999        path = self.handler.translate_path('/filename?foo=bar')
1000        self.assertEqual(path, self.translated)
1001        path = self.handler.translate_path('/filename?a=b&spam=eggs#zot')
1002        self.assertEqual(path, self.translated)
1003
1004    def test_start_with_double_slash(self):
1005        path = self.handler.translate_path('//filename')
1006        self.assertEqual(path, self.translated)
1007        path = self.handler.translate_path('//filename?foo=bar')
1008        self.assertEqual(path, self.translated)
1009
1010    def test_windows_colon(self):
1011        with support.swap_attr(server.os, 'path', ntpath):
1012            path = self.handler.translate_path('c:c:c:foo/filename')
1013            path = path.replace(ntpath.sep, os.sep)
1014            self.assertEqual(path, self.translated)
1015
1016            path = self.handler.translate_path('\\c:../filename')
1017            path = path.replace(ntpath.sep, os.sep)
1018            self.assertEqual(path, self.translated)
1019
1020            path = self.handler.translate_path('c:\\c:..\\foo/filename')
1021            path = path.replace(ntpath.sep, os.sep)
1022            self.assertEqual(path, self.translated)
1023
1024            path = self.handler.translate_path('c:c:foo\\c:c:bar/filename')
1025            path = path.replace(ntpath.sep, os.sep)
1026            self.assertEqual(path, self.translated)
1027
1028
1029class MiscTestCase(unittest.TestCase):
1030    def test_all(self):
1031        expected = []
1032        blacklist = {'executable', 'nobody_uid', 'test'}
1033        for name in dir(server):
1034            if name.startswith('_') or name in blacklist:
1035                continue
1036            module_object = getattr(server, name)
1037            if getattr(module_object, '__module__', None) == 'http.server':
1038                expected.append(name)
1039        self.assertCountEqual(server.__all__, expected)
1040
1041
1042def test_main(verbose=None):
1043    cwd = os.getcwd()
1044    try:
1045        support.run_unittest(
1046            RequestHandlerLoggingTestCase,
1047            BaseHTTPRequestHandlerTestCase,
1048            BaseHTTPServerTestCase,
1049            SimpleHTTPServerTestCase,
1050            CGIHTTPServerTestCase,
1051            SimpleHTTPRequestHandlerTestCase,
1052            MiscTestCase,
1053        )
1054    finally:
1055        os.chdir(cwd)
1056
1057if __name__ == '__main__':
1058    test_main()
1059