1"""Unittests for the various HTTPServer modules.
2
3Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>,
4Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.
5"""
6
7import os
8import sys
9import re
10import base64
11import shutil
12import urllib
13import httplib
14import tempfile
15import unittest
16import CGIHTTPServer
17
18
19from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
20from SimpleHTTPServer import SimpleHTTPRequestHandler
21from CGIHTTPServer import CGIHTTPRequestHandler
22from StringIO import StringIO
23from test import test_support
24
25
26threading = test_support.import_module('threading')
27
28
29class NoLogRequestHandler:
30    def log_message(self, *args):
31        # don't write log messages to stderr
32        pass
33
34class SocketlessRequestHandler(SimpleHTTPRequestHandler):
35    def __init__(self):
36        self.get_called = False
37        self.protocol_version = "HTTP/1.1"
38
39    def do_GET(self):
40        self.get_called = True
41        self.send_response(200)
42        self.send_header('Content-Type', 'text/html')
43        self.end_headers()
44        self.wfile.write(b'<html><body>Data</body></html>\r\n')
45
46    def log_message(self, fmt, *args):
47        pass
48
49
50class TestServerThread(threading.Thread):
51    def __init__(self, test_object, request_handler):
52        threading.Thread.__init__(self)
53        self.request_handler = request_handler
54        self.test_object = test_object
55
56    def run(self):
57        self.server = HTTPServer(('', 0), self.request_handler)
58        self.test_object.PORT = self.server.socket.getsockname()[1]
59        self.test_object.server_started.set()
60        self.test_object = None
61        try:
62            self.server.serve_forever(0.05)
63        finally:
64            self.server.server_close()
65
66    def stop(self):
67        self.server.shutdown()
68
69
70class BaseTestCase(unittest.TestCase):
71    def setUp(self):
72        self._threads = test_support.threading_setup()
73        os.environ = test_support.EnvironmentVarGuard()
74        self.server_started = threading.Event()
75        self.thread = TestServerThread(self, self.request_handler)
76        self.thread.start()
77        self.server_started.wait()
78
79    def tearDown(self):
80        self.thread.stop()
81        os.environ.__exit__()
82        test_support.threading_cleanup(*self._threads)
83
84    def request(self, uri, method='GET', body=None, headers={}):
85        self.connection = httplib.HTTPConnection('localhost', self.PORT)
86        self.connection.request(method, uri, body, headers)
87        return self.connection.getresponse()
88
89class BaseHTTPRequestHandlerTestCase(unittest.TestCase):
90    """Test the functionality of the BaseHTTPServer focussing on
91    BaseHTTPRequestHandler.
92    """
93
94    HTTPResponseMatch = re.compile('HTTP/1.[0-9]+ 200 OK')
95
96    def setUp (self):
97        self.handler = SocketlessRequestHandler()
98
99    def send_typical_request(self, message):
100        input_msg = StringIO(message)
101        output = StringIO()
102        self.handler.rfile = input_msg
103        self.handler.wfile = output
104        self.handler.handle_one_request()
105        output.seek(0)
106        return output.readlines()
107
108    def verify_get_called(self):
109        self.assertTrue(self.handler.get_called)
110
111    def verify_expected_headers(self, headers):
112        for fieldName in 'Server: ', 'Date: ', 'Content-Type: ':
113            self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1)
114
115    def verify_http_server_response(self, response):
116        match = self.HTTPResponseMatch.search(response)
117        self.assertTrue(match is not None)
118
119    def test_http_1_1(self):
120        result = self.send_typical_request('GET / HTTP/1.1\r\n\r\n')
121        self.verify_http_server_response(result[0])
122        self.verify_expected_headers(result[1:-1])
123        self.verify_get_called()
124        self.assertEqual(result[-1], '<html><body>Data</body></html>\r\n')
125
126    def test_http_1_0(self):
127        result = self.send_typical_request('GET / HTTP/1.0\r\n\r\n')
128        self.verify_http_server_response(result[0])
129        self.verify_expected_headers(result[1:-1])
130        self.verify_get_called()
131        self.assertEqual(result[-1], '<html><body>Data</body></html>\r\n')
132
133    def test_http_0_9(self):
134        result = self.send_typical_request('GET / HTTP/0.9\r\n\r\n')
135        self.assertEqual(len(result), 1)
136        self.assertEqual(result[0], '<html><body>Data</body></html>\r\n')
137        self.verify_get_called()
138
139    def test_with_continue_1_0(self):
140        result = self.send_typical_request('GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n')
141        self.verify_http_server_response(result[0])
142        self.verify_expected_headers(result[1:-1])
143        self.verify_get_called()
144        self.assertEqual(result[-1], '<html><body>Data</body></html>\r\n')
145
146    def test_request_length(self):
147        # Issue #10714: huge request lines are discarded, to avoid Denial
148        # of Service attacks.
149        result = self.send_typical_request(b'GET ' + b'x' * 65537)
150        self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n')
151        self.assertFalse(self.handler.get_called)
152
153
154class BaseHTTPServerTestCase(BaseTestCase):
155    class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):
156        protocol_version = 'HTTP/1.1'
157        default_request_version = 'HTTP/1.1'
158
159        def do_TEST(self):
160            self.send_response(204)
161            self.send_header('Content-Type', 'text/html')
162            self.send_header('Connection', 'close')
163            self.end_headers()
164
165        def do_KEEP(self):
166            self.send_response(204)
167            self.send_header('Content-Type', 'text/html')
168            self.send_header('Connection', 'keep-alive')
169            self.end_headers()
170
171        def do_KEYERROR(self):
172            self.send_error(999)
173
174        def do_CUSTOM(self):
175            self.send_response(999)
176            self.send_header('Content-Type', 'text/html')
177            self.send_header('Connection', 'close')
178            self.end_headers()
179
180    def setUp(self):
181        BaseTestCase.setUp(self)
182        self.con = httplib.HTTPConnection('localhost', self.PORT)
183        self.con.connect()
184
185    def test_command(self):
186        self.con.request('GET', '/')
187        res = self.con.getresponse()
188        self.assertEqual(res.status, 501)
189
190    def test_request_line_trimming(self):
191        self.con._http_vsn_str = 'HTTP/1.1\n'
192        self.con.putrequest('GET', '/')
193        self.con.endheaders()
194        res = self.con.getresponse()
195        self.assertEqual(res.status, 501)
196
197    def test_version_bogus(self):
198        self.con._http_vsn_str = 'FUBAR'
199        self.con.putrequest('GET', '/')
200        self.con.endheaders()
201        res = self.con.getresponse()
202        self.assertEqual(res.status, 400)
203
204    def test_version_digits(self):
205        self.con._http_vsn_str = 'HTTP/9.9.9'
206        self.con.putrequest('GET', '/')
207        self.con.endheaders()
208        res = self.con.getresponse()
209        self.assertEqual(res.status, 400)
210
211    def test_version_none_get(self):
212        self.con._http_vsn_str = ''
213        self.con.putrequest('GET', '/')
214        self.con.endheaders()
215        res = self.con.getresponse()
216        self.assertEqual(res.status, 501)
217
218    def test_version_none(self):
219        self.con._http_vsn_str = ''
220        self.con.putrequest('PUT', '/')
221        self.con.endheaders()
222        res = self.con.getresponse()
223        self.assertEqual(res.status, 400)
224
225    def test_version_invalid(self):
226        self.con._http_vsn = 99
227        self.con._http_vsn_str = 'HTTP/9.9'
228        self.con.putrequest('GET', '/')
229        self.con.endheaders()
230        res = self.con.getresponse()
231        self.assertEqual(res.status, 505)
232
233    def test_send_blank(self):
234        self.con._http_vsn_str = ''
235        self.con.putrequest('', '')
236        self.con.endheaders()
237        res = self.con.getresponse()
238        self.assertEqual(res.status, 400)
239
240    def test_header_close(self):
241        self.con.putrequest('GET', '/')
242        self.con.putheader('Connection', 'close')
243        self.con.endheaders()
244        res = self.con.getresponse()
245        self.assertEqual(res.status, 501)
246
247    def test_head_keep_alive(self):
248        self.con._http_vsn_str = 'HTTP/1.1'
249        self.con.putrequest('GET', '/')
250        self.con.putheader('Connection', 'keep-alive')
251        self.con.endheaders()
252        res = self.con.getresponse()
253        self.assertEqual(res.status, 501)
254
255    def test_handler(self):
256        self.con.request('TEST', '/')
257        res = self.con.getresponse()
258        self.assertEqual(res.status, 204)
259
260    def test_return_header_keep_alive(self):
261        self.con.request('KEEP', '/')
262        res = self.con.getresponse()
263        self.assertEqual(res.getheader('Connection'), 'keep-alive')
264        self.con.request('TEST', '/')
265        self.addCleanup(self.con.close)
266
267    def test_internal_key_error(self):
268        self.con.request('KEYERROR', '/')
269        res = self.con.getresponse()
270        self.assertEqual(res.status, 999)
271
272    def test_return_custom_status(self):
273        self.con.request('CUSTOM', '/')
274        res = self.con.getresponse()
275        self.assertEqual(res.status, 999)
276
277
278class SimpleHTTPServerTestCase(BaseTestCase):
279    class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler):
280        pass
281
282    def setUp(self):
283        BaseTestCase.setUp(self)
284        self.cwd = os.getcwd()
285        basetempdir = tempfile.gettempdir()
286        os.chdir(basetempdir)
287        self.data = 'We are the knights who say Ni!'
288        self.tempdir = tempfile.mkdtemp(dir=basetempdir)
289        self.tempdir_name = os.path.basename(self.tempdir)
290        temp = open(os.path.join(self.tempdir, 'test'), 'wb')
291        temp.write(self.data)
292        temp.close()
293
294    def tearDown(self):
295        try:
296            os.chdir(self.cwd)
297            try:
298                shutil.rmtree(self.tempdir)
299            except OSError:
300                pass
301        finally:
302            BaseTestCase.tearDown(self)
303
304    def check_status_and_reason(self, response, status, data=None):
305        body = response.read()
306        self.assertTrue(response)
307        self.assertEqual(response.status, status)
308        self.assertIsNotNone(response.reason)
309        if data:
310            self.assertEqual(data, body)
311
312    def test_get(self):
313        #constructs the path relative to the root directory of the HTTPServer
314        response = self.request(self.tempdir_name + '/test')
315        self.check_status_and_reason(response, 200, data=self.data)
316        response = self.request(self.tempdir_name + '/')
317        self.check_status_and_reason(response, 200)
318        response = self.request(self.tempdir_name)
319        self.check_status_and_reason(response, 301)
320        response = self.request('/ThisDoesNotExist')
321        self.check_status_and_reason(response, 404)
322        response = self.request('/' + 'ThisDoesNotExist' + '/')
323        self.check_status_and_reason(response, 404)
324        f = open(os.path.join(self.tempdir_name, 'index.html'), 'w')
325        response = self.request('/' + self.tempdir_name + '/')
326        self.check_status_and_reason(response, 200)
327
328        # chmod() doesn't work as expected on Windows, and filesystem
329        # permissions are ignored by root on Unix.
330        if os.name == 'posix' and os.geteuid() != 0:
331            os.chmod(self.tempdir, 0)
332            response = self.request(self.tempdir_name + '/')
333            self.check_status_and_reason(response, 404)
334            os.chmod(self.tempdir, 0755)
335
336    def test_head(self):
337        response = self.request(
338            self.tempdir_name + '/test', method='HEAD')
339        self.check_status_and_reason(response, 200)
340        self.assertEqual(response.getheader('content-length'),
341                         str(len(self.data)))
342        self.assertEqual(response.getheader('content-type'),
343                         'application/octet-stream')
344
345    def test_invalid_requests(self):
346        response = self.request('/', method='FOO')
347        self.check_status_and_reason(response, 501)
348        # requests must be case sensitive,so this should fail too
349        response = self.request('/', method='get')
350        self.check_status_and_reason(response, 501)
351        response = self.request('/', method='GETs')
352        self.check_status_and_reason(response, 501)
353
354
355cgi_file1 = """\
356#!%s
357
358print "Content-type: text/html"
359print
360print "Hello World"
361"""
362
363cgi_file2 = """\
364#!%s
365import cgi
366
367print "Content-type: text/html"
368print
369
370form = cgi.FieldStorage()
371print "%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"),
372                          form.getfirst("bacon"))
373"""
374
375
376@unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
377        "This test can't be run reliably as root (issue #13308).")
378class CGIHTTPServerTestCase(BaseTestCase):
379    class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler):
380        pass
381
382    def setUp(self):
383        BaseTestCase.setUp(self)
384        self.parent_dir = tempfile.mkdtemp()
385        self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin')
386        os.mkdir(self.cgi_dir)
387
388        # The shebang line should be pure ASCII: use symlink if possible.
389        # See issue #7668.
390        if hasattr(os, 'symlink'):
391            self.pythonexe = os.path.join(self.parent_dir, 'python')
392            os.symlink(sys.executable, self.pythonexe)
393        else:
394            self.pythonexe = sys.executable
395
396        self.file1_path = os.path.join(self.cgi_dir, 'file1.py')
397        with open(self.file1_path, 'w') as file1:
398            file1.write(cgi_file1 % self.pythonexe)
399        os.chmod(self.file1_path, 0777)
400
401        self.file2_path = os.path.join(self.cgi_dir, 'file2.py')
402        with open(self.file2_path, 'w') as file2:
403            file2.write(cgi_file2 % self.pythonexe)
404        os.chmod(self.file2_path, 0777)
405
406        self.cwd = os.getcwd()
407        os.chdir(self.parent_dir)
408
409    def tearDown(self):
410        try:
411            os.chdir(self.cwd)
412            if self.pythonexe != sys.executable:
413                os.remove(self.pythonexe)
414            os.remove(self.file1_path)
415            os.remove(self.file2_path)
416            os.rmdir(self.cgi_dir)
417            os.rmdir(self.parent_dir)
418        finally:
419            BaseTestCase.tearDown(self)
420
421    def test_url_collapse_path(self):
422        # verify tail is the last portion and head is the rest on proper urls
423        test_vectors = {
424            '': '//',
425            '..': IndexError,
426            '/.//..': IndexError,
427            '/': '//',
428            '//': '//',
429            '/\\': '//\\',
430            '/.//': '//',
431            'cgi-bin/file1.py': '/cgi-bin/file1.py',
432            '/cgi-bin/file1.py': '/cgi-bin/file1.py',
433            'a': '//a',
434            '/a': '//a',
435            '//a': '//a',
436            './a': '//a',
437            './C:/': '/C:/',
438            '/a/b': '/a/b',
439            '/a/b/': '/a/b/',
440            '/a/b/.': '/a/b/',
441            '/a/b/c/..': '/a/b/',
442            '/a/b/c/../d': '/a/b/d',
443            '/a/b/c/../d/e/../f': '/a/b/d/f',
444            '/a/b/c/../d/e/../../f': '/a/b/f',
445            '/a/b/c/../d/e/.././././..//f': '/a/b/f',
446            '../a/b/c/../d/e/.././././..//f': IndexError,
447            '/a/b/c/../d/e/../../../f': '/a/f',
448            '/a/b/c/../d/e/../../../../f': '//f',
449            '/a/b/c/../d/e/../../../../../f': IndexError,
450            '/a/b/c/../d/e/../../../../f/..': '//',
451            '/a/b/c/../d/e/../../../../f/../.': '//',
452        }
453        for path, expected in test_vectors.iteritems():
454            if isinstance(expected, type) and issubclass(expected, Exception):
455                self.assertRaises(expected,
456                                  CGIHTTPServer._url_collapse_path, path)
457            else:
458                actual = CGIHTTPServer._url_collapse_path(path)
459                self.assertEqual(expected, actual,
460                                 msg='path = %r\nGot:    %r\nWanted: %r' %
461                                 (path, actual, expected))
462
463    def test_headers_and_content(self):
464        res = self.request('/cgi-bin/file1.py')
465        self.assertEqual(('Hello World\n', 'text/html', 200),
466            (res.read(), res.getheader('Content-type'), res.status))
467
468    def test_post(self):
469        params = urllib.urlencode({'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})
470        headers = {'Content-type' : 'application/x-www-form-urlencoded'}
471        res = self.request('/cgi-bin/file2.py', 'POST', params, headers)
472
473        self.assertEqual(res.read(), '1, python, 123456\n')
474
475    def test_invaliduri(self):
476        res = self.request('/cgi-bin/invalid')
477        res.read()
478        self.assertEqual(res.status, 404)
479
480    def test_authorization(self):
481        headers = {'Authorization' : 'Basic %s' %
482                   base64.b64encode('username:pass')}
483        res = self.request('/cgi-bin/file1.py', 'GET', headers=headers)
484        self.assertEqual(('Hello World\n', 'text/html', 200),
485                (res.read(), res.getheader('Content-type'), res.status))
486
487    def test_no_leading_slash(self):
488        # http://bugs.python.org/issue2254
489        res = self.request('cgi-bin/file1.py')
490        self.assertEqual(('Hello World\n', 'text/html', 200),
491             (res.read(), res.getheader('Content-type'), res.status))
492
493    def test_os_environ_is_not_altered(self):
494        signature = "Test CGI Server"
495        os.environ['SERVER_SOFTWARE'] = signature
496        res = self.request('/cgi-bin/file1.py')
497        self.assertEqual((b'Hello World\n', 'text/html', 200),
498                (res.read(), res.getheader('Content-type'), res.status))
499        self.assertEqual(os.environ['SERVER_SOFTWARE'], signature)
500
501
502class SimpleHTTPRequestHandlerTestCase(unittest.TestCase):
503    """ Test url parsing """
504    def setUp(self):
505        self.translated = os.getcwd()
506        self.translated = os.path.join(self.translated, 'filename')
507        self.handler = SocketlessRequestHandler()
508
509    def test_query_arguments(self):
510        path = self.handler.translate_path('/filename')
511        self.assertEqual(path, self.translated)
512        path = self.handler.translate_path('/filename?foo=bar')
513        self.assertEqual(path, self.translated)
514        path = self.handler.translate_path('/filename?a=b&spam=eggs#zot')
515        self.assertEqual(path, self.translated)
516
517    def test_start_with_double_slash(self):
518        path = self.handler.translate_path('//filename')
519        self.assertEqual(path, self.translated)
520        path = self.handler.translate_path('//filename?foo=bar')
521        self.assertEqual(path, self.translated)
522
523
524def test_main(verbose=None):
525    try:
526        cwd = os.getcwd()
527        test_support.run_unittest(BaseHTTPRequestHandlerTestCase,
528                                  SimpleHTTPRequestHandlerTestCase,
529                                  BaseHTTPServerTestCase,
530                                  SimpleHTTPServerTestCase,
531                                  CGIHTTPServerTestCase
532                                 )
533    finally:
534        os.chdir(cwd)
535
536if __name__ == '__main__':
537    test_main()
538