1#!/usr/bin/env python
2#
3# Copyright 2009, Google Inc.
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions are
8# met:
9#
10#     * Redistributions of source code must retain the above copyright
11# notice, this list of conditions and the following disclaimer.
12#     * Redistributions in binary form must reproduce the above
13# copyright notice, this list of conditions and the following disclaimer
14# in the documentation and/or other materials provided with the
15# distribution.
16#     * Neither the name of Google Inc. nor the names of its
17# contributors may be used to endorse or promote products derived from
18# this software without specific prior written permission.
19#
20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31
32
33"""Tests for msgutil module."""
34
35
36import Queue
37import unittest
38
39import config  # This must be imported before mod_pywebsocket.
40from mod_pywebsocket import msgutil
41
42import mock
43
44
45def _create_request(read_data):
46    return mock.MockRequest(connection=mock.MockConn(read_data))
47
48def _create_blocking_request():
49    return mock.MockRequest(connection=mock.MockBlockingConn())
50
51class MessageTest(unittest.TestCase):
52    def test_send_message(self):
53        request = _create_request('')
54        msgutil.send_message(request, 'Hello')
55        self.assertEqual('\x00Hello\xff', request.connection.written_data())
56
57    def test_send_message_unicode(self):
58        request = _create_request('')
59        msgutil.send_message(request, u'\u65e5')
60        # U+65e5 is encoded as e6,97,a5 in UTF-8
61        self.assertEqual('\x00\xe6\x97\xa5\xff',
62                         request.connection.written_data())
63
64    def test_receive_message(self):
65        request = _create_request('\x00Hello\xff\x00World!\xff')
66        self.assertEqual('Hello', msgutil.receive_message(request))
67        self.assertEqual('World!', msgutil.receive_message(request))
68
69    def test_receive_message_unicode(self):
70        request = _create_request('\x00\xe6\x9c\xac\xff')
71        # U+672c is encoded as e6,9c,ac in UTF-8
72        self.assertEqual(u'\u672c', msgutil.receive_message(request))
73
74    def test_receive_message_erroneous_unicode(self):
75        # \x80 and \x81 are invalid as UTF-8.
76        request = _create_request('\x00\x80\x81\xff')
77        # Invalid characters should be replaced with
78        # U+fffd REPLACEMENT CHARACTER
79        self.assertEqual(u'\ufffd\ufffd', msgutil.receive_message(request))
80
81    def test_receive_message_discard(self):
82        request = _create_request('\x80\x06IGNORE\x00Hello\xff'
83                                '\x01DISREGARD\xff\x00World!\xff')
84        self.assertEqual('Hello', msgutil.receive_message(request))
85        self.assertEqual('World!', msgutil.receive_message(request))
86
87    def test_payload_length(self):
88        for length, bytes in ((0, '\x00'), (0x7f, '\x7f'), (0x80, '\x81\x00'),
89                              (0x1234, '\x80\xa4\x34')):
90            self.assertEqual(length,
91                             msgutil._payload_length(_create_request(bytes)))
92
93    def test_receive_bytes(self):
94        request = _create_request('abcdefg')
95        self.assertEqual('abc', msgutil._receive_bytes(request, 3))
96        self.assertEqual('defg', msgutil._receive_bytes(request, 4))
97
98    def test_read_until(self):
99        request = _create_request('abcXdefgX')
100        self.assertEqual('abc', msgutil._read_until(request, 'X'))
101        self.assertEqual('defg', msgutil._read_until(request, 'X'))
102
103
104class MessageReceiverTest(unittest.TestCase):
105    def test_queue(self):
106        request = _create_blocking_request()
107        receiver = msgutil.MessageReceiver(request)
108
109        self.assertEqual(None, receiver.receive_nowait())
110
111        request.connection.put_bytes('\x00Hello!\xff')
112        self.assertEqual('Hello!', receiver.receive())
113
114    def test_onmessage(self):
115        onmessage_queue = Queue.Queue()
116        def onmessage_handler(message):
117            onmessage_queue.put(message)
118
119        request = _create_blocking_request()
120        receiver = msgutil.MessageReceiver(request, onmessage_handler)
121
122        request.connection.put_bytes('\x00Hello!\xff')
123        self.assertEqual('Hello!', onmessage_queue.get())
124
125
126class MessageSenderTest(unittest.TestCase):
127    def test_send(self):
128        request = _create_blocking_request()
129        sender = msgutil.MessageSender(request)
130
131        sender.send('World')
132        self.assertEqual('\x00World\xff', request.connection.written_data())
133
134    def test_send_nowait(self):
135        # Use a queue to check the bytes written by MessageSender.
136        # request.connection.written_data() cannot be used here because
137        # MessageSender runs in a separate thread.
138        send_queue = Queue.Queue()
139        def write(bytes):
140            send_queue.put(bytes)
141        request = _create_blocking_request()
142        request.connection.write = write
143
144        sender = msgutil.MessageSender(request)
145
146        sender.send_nowait('Hello')
147        sender.send_nowait('World')
148        self.assertEqual('\x00Hello\xff', send_queue.get())
149        self.assertEqual('\x00World\xff', send_queue.get())
150
151
152if __name__ == '__main__':
153    unittest.main()
154
155
156# vi:sts=4 sw=4 et
157