1# Copyright 2009, Google Inc. 2# All rights reserved. 3# 4# Redistribution and use in source and binary forms, with or without 5# modification, are permitted provided that the following conditions are 6# met: 7# 8# * Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# * Redistributions in binary form must reproduce the above 11# copyright notice, this list of conditions and the following disclaimer 12# in the documentation and/or other materials provided with the 13# distribution. 14# * Neither the name of Google Inc. nor the names of its 15# contributors may be used to endorse or promote products derived from 16# this software without specific prior written permission. 17# 18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 19# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 20# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 21# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 22# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 23# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 24# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 25# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 26# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 28# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 30 31"""Message related utilities. 32 33Note: request.connection.write/read are used in this module, even though 34mod_python document says that they should be used only in connection handlers. 35Unfortunately, we have no other options. For example, request.write/read are 36not suitable because they don't allow direct raw bytes writing/reading. 37""" 38 39 40import Queue 41import threading 42import util 43 44 45class MsgUtilException(Exception): 46 pass 47 48 49def _read(request, length): 50 bytes = request.connection.read(length) 51 if not bytes: 52 raise MsgUtilException( 53 'Failed to receive message from %r' % 54 (request.connection.remote_addr,)) 55 return bytes 56 57 58def _write(request, bytes): 59 try: 60 request.connection.write(bytes) 61 except Exception, e: 62 util.prepend_message_to_exception( 63 'Failed to send message to %r: ' % 64 (request.connection.remote_addr,), 65 e) 66 raise 67 68 69def send_message(request, message): 70 """Send message. 71 72 Args: 73 request: mod_python request. 74 message: unicode string to send. 75 """ 76 77 _write(request, '\x00' + message.encode('utf-8') + '\xff') 78 79 80def receive_message(request): 81 """Receive a Web Socket frame and return its payload as unicode string. 82 83 Args: 84 request: mod_python request. 85 """ 86 87 while True: 88 # Read 1 byte. 89 # mp_conn.read will block if no bytes are available. 90 # Timeout is controlled by TimeOut directive of Apache. 91 frame_type_str = _read(request, 1) 92 frame_type = ord(frame_type_str[0]) 93 if (frame_type & 0x80) == 0x80: 94 # The payload length is specified in the frame. 95 # Read and discard. 96 length = _payload_length(request) 97 _receive_bytes(request, length) 98 else: 99 # The payload is delimited with \xff. 100 bytes = _read_until(request, '\xff') 101 # The Web Socket protocol section 4.4 specifies that invalid 102 # characters must be replaced with U+fffd REPLACEMENT CHARACTER. 103 message = bytes.decode('utf-8', 'replace') 104 if frame_type == 0x00: 105 return message 106 # Discard data of other types. 107 108 109def _payload_length(request): 110 length = 0 111 while True: 112 b_str = _read(request, 1) 113 b = ord(b_str[0]) 114 length = length * 128 + (b & 0x7f) 115 if (b & 0x80) == 0: 116 break 117 return length 118 119 120def _receive_bytes(request, length): 121 bytes = [] 122 while length > 0: 123 new_bytes = _read(request, length) 124 bytes.append(new_bytes) 125 length -= len(new_bytes) 126 return ''.join(bytes) 127 128 129def _read_until(request, delim_char): 130 bytes = [] 131 while True: 132 ch = _read(request, 1) 133 if ch == delim_char: 134 break 135 bytes.append(ch) 136 return ''.join(bytes) 137 138 139class MessageReceiver(threading.Thread): 140 """This class receives messages from the client. 141 142 This class provides three ways to receive messages: blocking, non-blocking, 143 and via callback. Callback has the highest precedence. 144 145 Note: This class should not be used with the standalone server for wss 146 because pyOpenSSL used by the server raises a fatal error if the socket 147 is accessed from multiple threads. 148 """ 149 def __init__(self, request, onmessage=None): 150 """Construct an instance. 151 152 Args: 153 request: mod_python request. 154 onmessage: a function to be called when a message is received. 155 May be None. If not None, the function is called on 156 another thread. In that case, MessageReceiver.receive 157 and MessageReceiver.receive_nowait are useless because 158 they will never return any messages. 159 """ 160 threading.Thread.__init__(self) 161 self._request = request 162 self._queue = Queue.Queue() 163 self._onmessage = onmessage 164 self._stop_requested = False 165 self.setDaemon(True) 166 self.start() 167 168 def run(self): 169 while not self._stop_requested: 170 message = receive_message(self._request) 171 if self._onmessage: 172 self._onmessage(message) 173 else: 174 self._queue.put(message) 175 176 def receive(self): 177 """ Receive a message from the channel, blocking. 178 179 Returns: 180 message as a unicode string. 181 """ 182 return self._queue.get() 183 184 def receive_nowait(self): 185 """ Receive a message from the channel, non-blocking. 186 187 Returns: 188 message as a unicode string if available. None otherwise. 189 """ 190 try: 191 message = self._queue.get_nowait() 192 except Queue.Empty: 193 message = None 194 return message 195 196 def stop(self): 197 """Request to stop this instance. 198 199 The instance will be stopped after receiving the next message. 200 This method may not be very useful, but there is no clean way 201 in Python to forcefully stop a running thread. 202 """ 203 self._stop_requested = True 204 205 206class MessageSender(threading.Thread): 207 """This class sends messages to the client. 208 209 This class provides both synchronous and asynchronous ways to send 210 messages. 211 212 Note: This class should not be used with the standalone server for wss 213 because pyOpenSSL used by the server raises a fatal error if the socket 214 is accessed from multiple threads. 215 """ 216 def __init__(self, request): 217 """Construct an instance. 218 219 Args: 220 request: mod_python request. 221 """ 222 threading.Thread.__init__(self) 223 self._request = request 224 self._queue = Queue.Queue() 225 self.setDaemon(True) 226 self.start() 227 228 def run(self): 229 while True: 230 message, condition = self._queue.get() 231 condition.acquire() 232 send_message(self._request, message) 233 condition.notify() 234 condition.release() 235 236 def send(self, message): 237 """Send a message, blocking.""" 238 239 condition = threading.Condition() 240 condition.acquire() 241 self._queue.put((message, condition)) 242 condition.wait() 243 244 def send_nowait(self, message): 245 """Send a message, non-blocking.""" 246 247 self._queue.put((message, threading.Condition())) 248 249 250# vi:sts=4 sw=4 et 251