15c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# Copyright 2011, Google Inc.
25c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# All rights reserved.
35c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)#
45c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# Redistribution and use in source and binary forms, with or without
55c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# modification, are permitted provided that the following conditions are
65c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# met:
75c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)#
85c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)#     * Redistributions of source code must retain the above copyright
95c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# notice, this list of conditions and the following disclaimer.
105c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)#     * Redistributions in binary form must reproduce the above
115c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# copyright notice, this list of conditions and the following disclaimer
125c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# in the documentation and/or other materials provided with the
135c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# distribution.
145c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)#     * Neither the name of Google Inc. nor the names of its
155c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# contributors may be used to endorse or promote products derived from
165c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# this software without specific prior written permission.
175c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)#
185c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
195c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
205c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
215c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
225c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
235c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
245c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
255c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
265c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
275c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
285c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
295c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
305c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
315c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)"""Message related utilities.
325c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
335c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)Note: request.connection.write/read are used in this module, even though
345c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)mod_python document says that they should be used only in connection
355c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)handlers. Unfortunately, we have no other options. For example,
365c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)request.write/read are not suitable because they don't allow direct raw
375c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)bytes writing/reading.
385c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)"""
395c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
405c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
415c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)import Queue
425c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)import threading
435c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
445c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
455c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# Export Exception symbols from msgutil for backward compatibility
465c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)from mod_pywebsocket._stream_base import ConnectionTerminatedException
475c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)from mod_pywebsocket._stream_base import InvalidFrameException
485c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)from mod_pywebsocket._stream_base import BadOperationException
495c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)from mod_pywebsocket._stream_base import UnsupportedFrameException
505c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
515c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
525c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# An API for handler to send/receive WebSocket messages.
535c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)def close_connection(request):
545c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    """Close connection.
555c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
565c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    Args:
575c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        request: mod_python request.
585c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    """
595c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    request.ws_stream.close_connection()
605c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
615c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
625c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)def send_message(request, payload_data, end=True, binary=False):
635c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    """Send a message (or part of a message).
645c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
655c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    Args:
665c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        request: mod_python request.
675c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        payload_data: unicode text or str binary to send.
685c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        end: True to terminate a message.
695c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)             False to send payload_data as part of a message that is to be
705c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)             terminated by next or later send_message call with end=True.
715c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        binary: send payload_data as binary frame(s).
725c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    Raises:
735c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        BadOperationException: when server already terminated.
745c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    """
755c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    request.ws_stream.send_message(payload_data, end, binary)
765c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
775c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
785c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)def receive_message(request):
795c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    """Receive a WebSocket frame and return its payload as a text in
805c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    unicode or a binary in str.
815c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
825c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    Args:
835c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        request: mod_python request.
845c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    Raises:
855c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        InvalidFrameException:     when client send invalid frame.
865c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        UnsupportedFrameException: when client send unsupported frame e.g. some
875c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                                   of reserved bit is set but no extension can
885c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                                   recognize it.
895c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        InvalidUTF8Exception:      when client send a text frame containing any
905c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                                   invalid UTF-8 string.
915c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        ConnectionTerminatedException: when the connection is closed
925c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                                   unexpectedly.
935c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        BadOperationException:     when client already terminated.
945c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    """
955c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    return request.ws_stream.receive_message()
965c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
975c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
985c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)def send_ping(request, body=''):
995c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    request.ws_stream.send_ping(body)
1005c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1015c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1025c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)class MessageReceiver(threading.Thread):
1035c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    """This class receives messages from the client.
1045c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1055c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    This class provides three ways to receive messages: blocking,
1065c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    non-blocking, and via callback. Callback has the highest precedence.
1075c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1085c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    Note: This class should not be used with the standalone server for wss
1095c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    because pyOpenSSL used by the server raises a fatal error if the socket
1105c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    is accessed from multiple threads.
1115c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    """
1125c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1135c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def __init__(self, request, onmessage=None):
1145c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        """Construct an instance.
1155c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1165c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        Args:
1175c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            request: mod_python request.
1185c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            onmessage: a function to be called when a message is received.
1195c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                       May be None. If not None, the function is called on
1205c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                       another thread. In that case, MessageReceiver.receive
1215c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                       and MessageReceiver.receive_nowait are useless
1225c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                       because they will never return any messages.
1235c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        """
1245c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1255c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        threading.Thread.__init__(self)
1265c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._request = request
1275c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._queue = Queue.Queue()
1285c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._onmessage = onmessage
1295c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._stop_requested = False
1305c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self.setDaemon(True)
1315c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self.start()
1325c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1335c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def run(self):
1345c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        try:
1355c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            while not self._stop_requested:
1365c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                message = receive_message(self._request)
1375c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                if self._onmessage:
1385c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                    self._onmessage(message)
1395c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                else:
1405c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)                    self._queue.put(message)
1415c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        finally:
1425c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            close_connection(self._request)
1435c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1445c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def receive(self):
1455c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        """ Receive a message from the channel, blocking.
1465c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1475c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        Returns:
1485c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            message as a unicode string.
1495c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        """
1505c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        return self._queue.get()
1515c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1525c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def receive_nowait(self):
1535c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        """ Receive a message from the channel, non-blocking.
1545c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1555c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        Returns:
1565c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            message as a unicode string if available. None otherwise.
1575c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        """
1585c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        try:
1595c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            message = self._queue.get_nowait()
1605c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        except Queue.Empty:
1615c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            message = None
1625c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        return message
1635c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1645c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def stop(self):
1655c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        """Request to stop this instance.
1665c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1675c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        The instance will be stopped after receiving the next message.
1685c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        This method may not be very useful, but there is no clean way
1695c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        in Python to forcefully stop a running thread.
1705c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        """
1715c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._stop_requested = True
1725c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1735c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1745c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)class MessageSender(threading.Thread):
1755c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    """This class sends messages to the client.
1765c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1775c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    This class provides both synchronous and asynchronous ways to send
1785c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    messages.
1795c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1805c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    Note: This class should not be used with the standalone server for wss
1815c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    because pyOpenSSL used by the server raises a fatal error if the socket
1825c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    is accessed from multiple threads.
1835c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    """
1845c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1855c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def __init__(self, request):
1865c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        """Construct an instance.
1875c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1885c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        Args:
1895c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            request: mod_python request.
1905c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        """
1915c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        threading.Thread.__init__(self)
1925c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._request = request
1935c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._queue = Queue.Queue()
1945c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self.setDaemon(True)
1955c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self.start()
1965c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
1975c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def run(self):
1985c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        while True:
1995c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            message, condition = self._queue.get()
2005c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            condition.acquire()
2015c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            send_message(self._request, message)
2025c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            condition.notify()
2035c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)            condition.release()
2045c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
2055c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def send(self, message):
2065c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        """Send a message, blocking."""
2075c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
2085c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        condition = threading.Condition()
2095c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        condition.acquire()
2105c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._queue.put((message, condition))
2115c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        condition.wait()
2125c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
2135c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)    def send_nowait(self, message):
2145c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        """Send a message, non-blocking."""
2155c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
2165c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)        self._queue.put((message, threading.Condition()))
2175c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
2185c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)
2195c87bf8b86a7c82ef50fb7a89697d8e02e2553beTorne (Richard Coles)# vi:sts=4 sw=4 et
220