1#!/usr/bin/env python 2# 3# Copyright 2009, Google Inc. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are 8# met: 9# 10# * Redistributions of source code must retain the above copyright 11# notice, this list of conditions and the following disclaimer. 12# * Redistributions in binary form must reproduce the above 13# copyright notice, this list of conditions and the following disclaimer 14# in the documentation and/or other materials provided with the 15# distribution. 16# * Neither the name of Google Inc. nor the names of its 17# contributors may be used to endorse or promote products derived from 18# this software without specific prior written permission. 19# 20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 21# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 22# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 23# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 24# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 25# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 26# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 27# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 28# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 29# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 30# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 31 32 33"""Tests for msgutil module.""" 34 35 36import Queue 37import unittest 38 39import config # This must be imported before mod_pywebsocket. 40from mod_pywebsocket import msgutil 41 42import mock 43 44 45def _create_request(read_data): 46 return mock.MockRequest(connection=mock.MockConn(read_data)) 47 48def _create_blocking_request(): 49 return mock.MockRequest(connection=mock.MockBlockingConn()) 50 51class MessageTest(unittest.TestCase): 52 def test_send_message(self): 53 request = _create_request('') 54 msgutil.send_message(request, 'Hello') 55 self.assertEqual('\x00Hello\xff', request.connection.written_data()) 56 57 def test_send_message_unicode(self): 58 request = _create_request('') 59 msgutil.send_message(request, u'\u65e5') 60 # U+65e5 is encoded as e6,97,a5 in UTF-8 61 self.assertEqual('\x00\xe6\x97\xa5\xff', 62 request.connection.written_data()) 63 64 def test_receive_message(self): 65 request = _create_request('\x00Hello\xff\x00World!\xff') 66 self.assertEqual('Hello', msgutil.receive_message(request)) 67 self.assertEqual('World!', msgutil.receive_message(request)) 68 69 def test_receive_message_unicode(self): 70 request = _create_request('\x00\xe6\x9c\xac\xff') 71 # U+672c is encoded as e6,9c,ac in UTF-8 72 self.assertEqual(u'\u672c', msgutil.receive_message(request)) 73 74 def test_receive_message_erroneous_unicode(self): 75 # \x80 and \x81 are invalid as UTF-8. 76 request = _create_request('\x00\x80\x81\xff') 77 # Invalid characters should be replaced with 78 # U+fffd REPLACEMENT CHARACTER 79 self.assertEqual(u'\ufffd\ufffd', msgutil.receive_message(request)) 80 81 def test_receive_message_discard(self): 82 request = _create_request('\x80\x06IGNORE\x00Hello\xff' 83 '\x01DISREGARD\xff\x00World!\xff') 84 self.assertEqual('Hello', msgutil.receive_message(request)) 85 self.assertEqual('World!', msgutil.receive_message(request)) 86 87 def test_payload_length(self): 88 for length, bytes in ((0, '\x00'), (0x7f, '\x7f'), (0x80, '\x81\x00'), 89 (0x1234, '\x80\xa4\x34')): 90 self.assertEqual(length, 91 msgutil._payload_length(_create_request(bytes))) 92 93 def test_receive_bytes(self): 94 request = _create_request('abcdefg') 95 self.assertEqual('abc', msgutil._receive_bytes(request, 3)) 96 self.assertEqual('defg', msgutil._receive_bytes(request, 4)) 97 98 def test_read_until(self): 99 request = _create_request('abcXdefgX') 100 self.assertEqual('abc', msgutil._read_until(request, 'X')) 101 self.assertEqual('defg', msgutil._read_until(request, 'X')) 102 103 104class MessageReceiverTest(unittest.TestCase): 105 def test_queue(self): 106 request = _create_blocking_request() 107 receiver = msgutil.MessageReceiver(request) 108 109 self.assertEqual(None, receiver.receive_nowait()) 110 111 request.connection.put_bytes('\x00Hello!\xff') 112 self.assertEqual('Hello!', receiver.receive()) 113 114 def test_onmessage(self): 115 onmessage_queue = Queue.Queue() 116 def onmessage_handler(message): 117 onmessage_queue.put(message) 118 119 request = _create_blocking_request() 120 receiver = msgutil.MessageReceiver(request, onmessage_handler) 121 122 request.connection.put_bytes('\x00Hello!\xff') 123 self.assertEqual('Hello!', onmessage_queue.get()) 124 125 126class MessageSenderTest(unittest.TestCase): 127 def test_send(self): 128 request = _create_blocking_request() 129 sender = msgutil.MessageSender(request) 130 131 sender.send('World') 132 self.assertEqual('\x00World\xff', request.connection.written_data()) 133 134 def test_send_nowait(self): 135 # Use a queue to check the bytes written by MessageSender. 136 # request.connection.written_data() cannot be used here because 137 # MessageSender runs in a separate thread. 138 send_queue = Queue.Queue() 139 def write(bytes): 140 send_queue.put(bytes) 141 request = _create_blocking_request() 142 request.connection.write = write 143 144 sender = msgutil.MessageSender(request) 145 146 sender.send_nowait('Hello') 147 sender.send_nowait('World') 148 self.assertEqual('\x00Hello\xff', send_queue.get()) 149 self.assertEqual('\x00World\xff', send_queue.get()) 150 151 152if __name__ == '__main__': 153 unittest.main() 154 155 156# vi:sts=4 sw=4 et 157