1#!/usr/bin/env python
2#
3# Copyright 2011, Google Inc.
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions are
8# met:
9#
10#     * Redistributions of source code must retain the above copyright
11# notice, this list of conditions and the following disclaimer.
12#     * Redistributions in binary form must reproduce the above
13# copyright notice, this list of conditions and the following disclaimer
14# in the documentation and/or other materials provided with the
15# distribution.
16#     * Neither the name of Google Inc. nor the names of its
17# contributors may be used to endorse or promote products derived from
18# this software without specific prior written permission.
19#
20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31
32
33"""Tests for dispatch module."""
34
35
36import os
37import unittest
38
39import set_sys_path  # Update sys.path to locate mod_pywebsocket module.
40
41from mod_pywebsocket import dispatch
42from mod_pywebsocket import handshake
43from test import mock
44
45
46_TEST_HANDLERS_DIR = os.path.join(
47        os.path.split(__file__)[0], 'testdata', 'handlers')
48
49_TEST_HANDLERS_SUB_DIR = os.path.join(_TEST_HANDLERS_DIR, 'sub')
50
51
52class DispatcherTest(unittest.TestCase):
53    """A unittest for dispatch module."""
54
55    def test_normalize_path(self):
56        self.assertEqual(os.path.abspath('/a/b').replace('\\', '/'),
57                         dispatch._normalize_path('/a/b'))
58        self.assertEqual(os.path.abspath('/a/b').replace('\\', '/'),
59                         dispatch._normalize_path('\\a\\b'))
60        self.assertEqual(os.path.abspath('/a/b').replace('\\', '/'),
61                         dispatch._normalize_path('/a/c/../b'))
62        self.assertEqual(os.path.abspath('abc').replace('\\', '/'),
63                         dispatch._normalize_path('abc'))
64
65    def test_converter(self):
66        converter = dispatch._create_path_to_resource_converter('/a/b')
67        # Python built by MSC inserts a drive name like 'C:\' via realpath().
68        # Converter Generator expands provided path using realpath() and uses
69        # the path including a drive name to verify the prefix.
70        os_root = os.path.realpath('/')
71        self.assertEqual('/h', converter(os_root + 'a/b/h_wsh.py'))
72        self.assertEqual('/c/h', converter(os_root + 'a/b/c/h_wsh.py'))
73        self.assertEqual(None, converter(os_root + 'a/b/h.py'))
74        self.assertEqual(None, converter('a/b/h_wsh.py'))
75
76        converter = dispatch._create_path_to_resource_converter('a/b')
77        self.assertEqual('/h', converter(dispatch._normalize_path(
78            'a/b/h_wsh.py')))
79
80        converter = dispatch._create_path_to_resource_converter('/a/b///')
81        self.assertEqual('/h', converter(os_root + 'a/b/h_wsh.py'))
82        self.assertEqual('/h', converter(dispatch._normalize_path(
83            '/a/b/../b/h_wsh.py')))
84
85        converter = dispatch._create_path_to_resource_converter(
86            '/a/../a/b/../b/')
87        self.assertEqual('/h', converter(os_root + 'a/b/h_wsh.py'))
88
89        converter = dispatch._create_path_to_resource_converter(r'\a\b')
90        self.assertEqual('/h', converter(os_root + r'a\b\h_wsh.py'))
91        self.assertEqual('/h', converter(os_root + r'a/b/h_wsh.py'))
92
93    def test_enumerate_handler_file_paths(self):
94        paths = list(
95            dispatch._enumerate_handler_file_paths(_TEST_HANDLERS_DIR))
96        paths.sort()
97        self.assertEqual(8, len(paths))
98        expected_paths = [
99                os.path.join(_TEST_HANDLERS_DIR, 'abort_by_user_wsh.py'),
100                os.path.join(_TEST_HANDLERS_DIR, 'blank_wsh.py'),
101                os.path.join(_TEST_HANDLERS_DIR, 'origin_check_wsh.py'),
102                os.path.join(_TEST_HANDLERS_DIR, 'sub',
103                             'exception_in_transfer_wsh.py'),
104                os.path.join(_TEST_HANDLERS_DIR, 'sub', 'non_callable_wsh.py'),
105                os.path.join(_TEST_HANDLERS_DIR, 'sub', 'plain_wsh.py'),
106                os.path.join(_TEST_HANDLERS_DIR, 'sub',
107                             'wrong_handshake_sig_wsh.py'),
108                os.path.join(_TEST_HANDLERS_DIR, 'sub',
109                             'wrong_transfer_sig_wsh.py'),
110                ]
111        for expected, actual in zip(expected_paths, paths):
112            self.assertEqual(expected, actual)
113
114    def test_source_handler_file(self):
115        self.assertRaises(
116            dispatch.DispatchException, dispatch._source_handler_file, '')
117        self.assertRaises(
118            dispatch.DispatchException, dispatch._source_handler_file, 'def')
119        self.assertRaises(
120            dispatch.DispatchException, dispatch._source_handler_file, '1/0')
121        self.failUnless(dispatch._source_handler_file(
122                'def web_socket_do_extra_handshake(request):pass\n'
123                'def web_socket_transfer_data(request):pass\n'))
124
125    def test_source_warnings(self):
126        dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
127        warnings = dispatcher.source_warnings()
128        warnings.sort()
129        expected_warnings = [
130                (os.path.realpath(os.path.join(
131                    _TEST_HANDLERS_DIR, 'blank_wsh.py')) +
132                 ': web_socket_do_extra_handshake is not defined.'),
133                (os.path.realpath(os.path.join(
134                    _TEST_HANDLERS_DIR, 'sub', 'non_callable_wsh.py')) +
135                 ': web_socket_do_extra_handshake is not callable.'),
136                (os.path.realpath(os.path.join(
137                    _TEST_HANDLERS_DIR, 'sub', 'wrong_handshake_sig_wsh.py')) +
138                 ': web_socket_do_extra_handshake is not defined.'),
139                (os.path.realpath(os.path.join(
140                    _TEST_HANDLERS_DIR, 'sub', 'wrong_transfer_sig_wsh.py')) +
141                 ': web_socket_transfer_data is not defined.'),
142                ]
143        self.assertEquals(4, len(warnings))
144        for expected, actual in zip(expected_warnings, warnings):
145            self.assertEquals(expected, actual)
146
147    def test_do_extra_handshake(self):
148        dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
149        request = mock.MockRequest()
150        request.ws_resource = '/origin_check'
151        request.ws_origin = 'http://example.com'
152        dispatcher.do_extra_handshake(request)  # Must not raise exception.
153
154        request.ws_origin = 'http://bad.example.com'
155        try:
156            dispatcher.do_extra_handshake(request)
157            self.fail('Could not catch HandshakeException with 403 status')
158        except handshake.HandshakeException, e:
159            self.assertEquals(403, e.status)
160        except Exception, e:
161            self.fail('Unexpected exception: %r' % e)
162
163    def test_abort_extra_handshake(self):
164        dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
165        request = mock.MockRequest()
166        request.ws_resource = '/abort_by_user'
167        self.assertRaises(handshake.AbortedByUserException,
168                          dispatcher.do_extra_handshake, request)
169
170    def test_transfer_data(self):
171        dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
172
173        request = mock.MockRequest(connection=mock.MockConn('\xff\x00'))
174        request.ws_resource = '/origin_check'
175        request.ws_protocol = 'p1'
176        dispatcher.transfer_data(request)
177        self.assertEqual('origin_check_wsh.py is called for /origin_check, p1'
178                         '\xff\x00',
179                         request.connection.written_data())
180
181        request = mock.MockRequest(connection=mock.MockConn('\xff\x00'))
182        request.ws_resource = '/sub/plain'
183        request.ws_protocol = None
184        dispatcher.transfer_data(request)
185        self.assertEqual('sub/plain_wsh.py is called for /sub/plain, None'
186                         '\xff\x00',
187                         request.connection.written_data())
188
189        request = mock.MockRequest(connection=mock.MockConn('\xff\x00'))
190        request.ws_resource = '/sub/plain?'
191        request.ws_protocol = None
192        dispatcher.transfer_data(request)
193        self.assertEqual('sub/plain_wsh.py is called for /sub/plain?, None'
194                         '\xff\x00',
195                         request.connection.written_data())
196
197        request = mock.MockRequest(connection=mock.MockConn('\xff\x00'))
198        request.ws_resource = '/sub/plain?q=v'
199        request.ws_protocol = None
200        dispatcher.transfer_data(request)
201        self.assertEqual('sub/plain_wsh.py is called for /sub/plain?q=v, None'
202                         '\xff\x00',
203                         request.connection.written_data())
204
205    def test_transfer_data_no_handler(self):
206        dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
207        for resource in ['/blank', '/sub/non_callable',
208                         '/sub/no_wsh_at_the_end', '/does/not/exist']:
209            request = mock.MockRequest(connection=mock.MockConn(''))
210            request.ws_resource = resource
211            request.ws_protocol = 'p2'
212            try:
213                dispatcher.transfer_data(request)
214                self.fail()
215            except dispatch.DispatchException, e:
216                self.failUnless(str(e).find('No handler') != -1)
217            except Exception:
218                self.fail()
219
220    def test_transfer_data_handler_exception(self):
221        dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
222        request = mock.MockRequest(connection=mock.MockConn(''))
223        request.ws_resource = '/sub/exception_in_transfer'
224        request.ws_protocol = 'p3'
225        try:
226            dispatcher.transfer_data(request)
227            self.fail()
228        except Exception, e:
229            self.failUnless(str(e).find('Intentional') != -1,
230                            'Unexpected exception: %s' % e)
231
232    def test_abort_transfer_data(self):
233        dispatcher = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
234        request = mock.MockRequest()
235        request.ws_resource = '/abort_by_user'
236        self.assertRaises(handshake.AbortedByUserException,
237                          dispatcher.transfer_data, request)
238
239    def test_scan_dir(self):
240        disp = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
241        self.assertEqual(4, len(disp._handler_suite_map))
242        self.failUnless('/origin_check' in disp._handler_suite_map)
243        self.failUnless(
244            '/sub/exception_in_transfer' in disp._handler_suite_map)
245        self.failUnless('/sub/plain' in disp._handler_suite_map)
246
247    def test_scan_sub_dir(self):
248        disp = dispatch.Dispatcher(_TEST_HANDLERS_DIR, _TEST_HANDLERS_SUB_DIR)
249        self.assertEqual(2, len(disp._handler_suite_map))
250        self.failIf('/origin_check' in disp._handler_suite_map)
251        self.failUnless(
252            '/sub/exception_in_transfer' in disp._handler_suite_map)
253        self.failUnless('/sub/plain' in disp._handler_suite_map)
254
255    def test_scan_sub_dir_as_root(self):
256        disp = dispatch.Dispatcher(_TEST_HANDLERS_SUB_DIR,
257                                   _TEST_HANDLERS_SUB_DIR)
258        self.assertEqual(2, len(disp._handler_suite_map))
259        self.failIf('/origin_check' in disp._handler_suite_map)
260        self.failIf('/sub/exception_in_transfer' in disp._handler_suite_map)
261        self.failIf('/sub/plain' in disp._handler_suite_map)
262        self.failUnless('/exception_in_transfer' in disp._handler_suite_map)
263        self.failUnless('/plain' in disp._handler_suite_map)
264
265    def test_scan_dir_must_under_root(self):
266        dispatch.Dispatcher('a/b', 'a/b/c')  # OK
267        dispatch.Dispatcher('a/b///', 'a/b')  # OK
268        self.assertRaises(dispatch.DispatchException,
269                          dispatch.Dispatcher, 'a/b/c', 'a/b')
270
271    def test_resource_path_alias(self):
272        disp = dispatch.Dispatcher(_TEST_HANDLERS_DIR, None)
273        disp.add_resource_path_alias('/', '/origin_check')
274        self.assertEqual(5, len(disp._handler_suite_map))
275        self.failUnless('/origin_check' in disp._handler_suite_map)
276        self.failUnless(
277            '/sub/exception_in_transfer' in disp._handler_suite_map)
278        self.failUnless('/sub/plain' in disp._handler_suite_map)
279        self.failUnless('/' in disp._handler_suite_map)
280        self.assertRaises(dispatch.DispatchException,
281                          disp.add_resource_path_alias, '/alias', '/not-exist')
282
283
284if __name__ == '__main__':
285    unittest.main()
286
287
288# vi:sts=4 sw=4 et
289