1# Copyright 2012, Google Inc.
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met:
7#
8#     * Redistributions of source code must retain the above copyright
9# notice, this list of conditions and the following disclaimer.
10#     * Redistributions in binary form must reproduce the above
11# copyright notice, this list of conditions and the following disclaimer
12# in the documentation and/or other materials provided with the
13# distribution.
14#     * Neither the name of Google Inc. nor the names of its
15# contributors may be used to endorse or promote products derived from
16# this software without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31"""This file provides classes and helper functions for parsing/building frames
32of the WebSocket protocol (RFC 6455).
33
34Specification:
35http://tools.ietf.org/html/rfc6455
36"""
37
38
39from collections import deque
40import logging
41import os
42import struct
43import time
44
45from mod_pywebsocket import common
46from mod_pywebsocket import util
47from mod_pywebsocket._stream_base import BadOperationException
48from mod_pywebsocket._stream_base import ConnectionTerminatedException
49from mod_pywebsocket._stream_base import InvalidFrameException
50from mod_pywebsocket._stream_base import InvalidUTF8Exception
51from mod_pywebsocket._stream_base import StreamBase
52from mod_pywebsocket._stream_base import UnsupportedFrameException
53
54
55_NOOP_MASKER = util.NoopMasker()
56
57
58class Frame(object):
59
60    def __init__(self, fin=1, rsv1=0, rsv2=0, rsv3=0,
61                 opcode=None, payload=''):
62        self.fin = fin
63        self.rsv1 = rsv1
64        self.rsv2 = rsv2
65        self.rsv3 = rsv3
66        self.opcode = opcode
67        self.payload = payload
68
69
70# Helper functions made public to be used for writing unittests for WebSocket
71# clients.
72
73
74def create_length_header(length, mask):
75    """Creates a length header.
76
77    Args:
78        length: Frame length. Must be less than 2^63.
79        mask: Mask bit. Must be boolean.
80
81    Raises:
82        ValueError: when bad data is given.
83    """
84
85    if mask:
86        mask_bit = 1 << 7
87    else:
88        mask_bit = 0
89
90    if length < 0:
91        raise ValueError('length must be non negative integer')
92    elif length <= 125:
93        return chr(mask_bit | length)
94    elif length < (1 << 16):
95        return chr(mask_bit | 126) + struct.pack('!H', length)
96    elif length < (1 << 63):
97        return chr(mask_bit | 127) + struct.pack('!Q', length)
98    else:
99        raise ValueError('Payload is too big for one frame')
100
101
102def create_header(opcode, payload_length, fin, rsv1, rsv2, rsv3, mask):
103    """Creates a frame header.
104
105    Raises:
106        Exception: when bad data is given.
107    """
108
109    if opcode < 0 or 0xf < opcode:
110        raise ValueError('Opcode out of range')
111
112    if payload_length < 0 or (1 << 63) <= payload_length:
113        raise ValueError('payload_length out of range')
114
115    if (fin | rsv1 | rsv2 | rsv3) & ~1:
116        raise ValueError('FIN bit and Reserved bit parameter must be 0 or 1')
117
118    header = ''
119
120    first_byte = ((fin << 7)
121                  | (rsv1 << 6) | (rsv2 << 5) | (rsv3 << 4)
122                  | opcode)
123    header += chr(first_byte)
124    header += create_length_header(payload_length, mask)
125
126    return header
127
128
129def _build_frame(header, body, mask):
130    if not mask:
131        return header + body
132
133    masking_nonce = os.urandom(4)
134    masker = util.RepeatedXorMasker(masking_nonce)
135
136    return header + masking_nonce + masker.mask(body)
137
138
139def _filter_and_format_frame_object(frame, mask, frame_filters):
140    for frame_filter in frame_filters:
141        frame_filter.filter(frame)
142
143    header = create_header(
144        frame.opcode, len(frame.payload), frame.fin,
145        frame.rsv1, frame.rsv2, frame.rsv3, mask)
146    return _build_frame(header, frame.payload, mask)
147
148
149def create_binary_frame(
150    message, opcode=common.OPCODE_BINARY, fin=1, mask=False, frame_filters=[]):
151    """Creates a simple binary frame with no extension, reserved bit."""
152
153    frame = Frame(fin=fin, opcode=opcode, payload=message)
154    return _filter_and_format_frame_object(frame, mask, frame_filters)
155
156
157def create_text_frame(
158    message, opcode=common.OPCODE_TEXT, fin=1, mask=False, frame_filters=[]):
159    """Creates a simple text frame with no extension, reserved bit."""
160
161    encoded_message = message.encode('utf-8')
162    return create_binary_frame(encoded_message, opcode, fin, mask,
163                               frame_filters)
164
165
166def parse_frame(receive_bytes, logger=None,
167                ws_version=common.VERSION_HYBI_LATEST,
168                unmask_receive=True):
169    """Parses a frame. Returns a tuple containing each header field and
170    payload.
171
172    Args:
173        receive_bytes: a function that reads frame data from a stream or
174            something similar. The function takes length of the bytes to be
175            read. The function must raise ConnectionTerminatedException if
176            there is not enough data to be read.
177        logger: a logging object.
178        ws_version: the version of WebSocket protocol.
179        unmask_receive: unmask received frames. When received unmasked
180            frame, raises InvalidFrameException.
181
182    Raises:
183        ConnectionTerminatedException: when receive_bytes raises it.
184        InvalidFrameException: when the frame contains invalid data.
185    """
186
187    if not logger:
188        logger = logging.getLogger()
189
190    logger.log(common.LOGLEVEL_FINE, 'Receive the first 2 octets of a frame')
191
192    received = receive_bytes(2)
193
194    first_byte = ord(received[0])
195    fin = (first_byte >> 7) & 1
196    rsv1 = (first_byte >> 6) & 1
197    rsv2 = (first_byte >> 5) & 1
198    rsv3 = (first_byte >> 4) & 1
199    opcode = first_byte & 0xf
200
201    second_byte = ord(received[1])
202    mask = (second_byte >> 7) & 1
203    payload_length = second_byte & 0x7f
204
205    logger.log(common.LOGLEVEL_FINE,
206               'FIN=%s, RSV1=%s, RSV2=%s, RSV3=%s, opcode=%s, '
207               'Mask=%s, Payload_length=%s',
208               fin, rsv1, rsv2, rsv3, opcode, mask, payload_length)
209
210    if (mask == 1) != unmask_receive:
211        raise InvalidFrameException(
212            'Mask bit on the received frame did\'nt match masking '
213            'configuration for received frames')
214
215    # The HyBi and later specs disallow putting a value in 0x0-0xFFFF
216    # into the 8-octet extended payload length field (or 0x0-0xFD in
217    # 2-octet field).
218    valid_length_encoding = True
219    length_encoding_bytes = 1
220    if payload_length == 127:
221        logger.log(common.LOGLEVEL_FINE,
222                   'Receive 8-octet extended payload length')
223
224        extended_payload_length = receive_bytes(8)
225        payload_length = struct.unpack(
226            '!Q', extended_payload_length)[0]
227        if payload_length > 0x7FFFFFFFFFFFFFFF:
228            raise InvalidFrameException(
229                'Extended payload length >= 2^63')
230        if ws_version >= 13 and payload_length < 0x10000:
231            valid_length_encoding = False
232            length_encoding_bytes = 8
233
234        logger.log(common.LOGLEVEL_FINE,
235                   'Decoded_payload_length=%s', payload_length)
236    elif payload_length == 126:
237        logger.log(common.LOGLEVEL_FINE,
238                   'Receive 2-octet extended payload length')
239
240        extended_payload_length = receive_bytes(2)
241        payload_length = struct.unpack(
242            '!H', extended_payload_length)[0]
243        if ws_version >= 13 and payload_length < 126:
244            valid_length_encoding = False
245            length_encoding_bytes = 2
246
247        logger.log(common.LOGLEVEL_FINE,
248                   'Decoded_payload_length=%s', payload_length)
249
250    if not valid_length_encoding:
251        logger.warning(
252            'Payload length is not encoded using the minimal number of '
253            'bytes (%d is encoded using %d bytes)',
254            payload_length,
255            length_encoding_bytes)
256
257    if mask == 1:
258        logger.log(common.LOGLEVEL_FINE, 'Receive mask')
259
260        masking_nonce = receive_bytes(4)
261        masker = util.RepeatedXorMasker(masking_nonce)
262
263        logger.log(common.LOGLEVEL_FINE, 'Mask=%r', masking_nonce)
264    else:
265        masker = _NOOP_MASKER
266
267    logger.log(common.LOGLEVEL_FINE, 'Receive payload data')
268    if logger.isEnabledFor(common.LOGLEVEL_FINE):
269        receive_start = time.time()
270
271    raw_payload_bytes = receive_bytes(payload_length)
272
273    if logger.isEnabledFor(common.LOGLEVEL_FINE):
274        logger.log(
275            common.LOGLEVEL_FINE,
276            'Done receiving payload data at %s MB/s',
277            payload_length / (time.time() - receive_start) / 1000 / 1000)
278    logger.log(common.LOGLEVEL_FINE, 'Unmask payload data')
279
280    if logger.isEnabledFor(common.LOGLEVEL_FINE):
281        unmask_start = time.time()
282
283    unmasked_bytes = masker.mask(raw_payload_bytes)
284
285    if logger.isEnabledFor(common.LOGLEVEL_FINE):
286        logger.log(
287            common.LOGLEVEL_FINE,
288            'Done unmasking payload data at %s MB/s',
289            payload_length / (time.time() - unmask_start) / 1000 / 1000)
290
291    return opcode, unmasked_bytes, fin, rsv1, rsv2, rsv3
292
293
294class FragmentedFrameBuilder(object):
295    """A stateful class to send a message as fragments."""
296
297    def __init__(self, mask, frame_filters=[], encode_utf8=True):
298        """Constructs an instance."""
299
300        self._mask = mask
301        self._frame_filters = frame_filters
302        # This is for skipping UTF-8 encoding when building text type frames
303        # from compressed data.
304        self._encode_utf8 = encode_utf8
305
306        self._started = False
307
308        # Hold opcode of the first frame in messages to verify types of other
309        # frames in the message are all the same.
310        self._opcode = common.OPCODE_TEXT
311
312    def build(self, payload_data, end, binary):
313        if binary:
314            frame_type = common.OPCODE_BINARY
315        else:
316            frame_type = common.OPCODE_TEXT
317        if self._started:
318            if self._opcode != frame_type:
319                raise ValueError('Message types are different in frames for '
320                                 'the same message')
321            opcode = common.OPCODE_CONTINUATION
322        else:
323            opcode = frame_type
324            self._opcode = frame_type
325
326        if end:
327            self._started = False
328            fin = 1
329        else:
330            self._started = True
331            fin = 0
332
333        if binary or not self._encode_utf8:
334            return create_binary_frame(
335                payload_data, opcode, fin, self._mask, self._frame_filters)
336        else:
337            return create_text_frame(
338                payload_data, opcode, fin, self._mask, self._frame_filters)
339
340
341def _create_control_frame(opcode, body, mask, frame_filters):
342    frame = Frame(opcode=opcode, payload=body)
343
344    for frame_filter in frame_filters:
345        frame_filter.filter(frame)
346
347    if len(frame.payload) > 125:
348        raise BadOperationException(
349            'Payload data size of control frames must be 125 bytes or less')
350
351    header = create_header(
352        frame.opcode, len(frame.payload), frame.fin,
353        frame.rsv1, frame.rsv2, frame.rsv3, mask)
354    return _build_frame(header, frame.payload, mask)
355
356
357def create_ping_frame(body, mask=False, frame_filters=[]):
358    return _create_control_frame(common.OPCODE_PING, body, mask, frame_filters)
359
360
361def create_pong_frame(body, mask=False, frame_filters=[]):
362    return _create_control_frame(common.OPCODE_PONG, body, mask, frame_filters)
363
364
365def create_close_frame(body, mask=False, frame_filters=[]):
366    return _create_control_frame(
367        common.OPCODE_CLOSE, body, mask, frame_filters)
368
369
370def create_closing_handshake_body(code, reason):
371    body = ''
372    if code is not None:
373        if (code > common.STATUS_USER_PRIVATE_MAX or
374            code < common.STATUS_NORMAL_CLOSURE):
375            raise BadOperationException('Status code is out of range')
376        if (code == common.STATUS_NO_STATUS_RECEIVED or
377            code == common.STATUS_ABNORMAL_CLOSURE or
378            code == common.STATUS_TLS_HANDSHAKE):
379            raise BadOperationException('Status code is reserved pseudo '
380                'code')
381        encoded_reason = reason.encode('utf-8')
382        body = struct.pack('!H', code) + encoded_reason
383    return body
384
385
386class StreamOptions(object):
387    """Holds option values to configure Stream objects."""
388
389    def __init__(self):
390        """Constructs StreamOptions."""
391
392        # Filters applied to frames.
393        self.outgoing_frame_filters = []
394        self.incoming_frame_filters = []
395
396        # Filters applied to messages. Control frames are not affected by them.
397        self.outgoing_message_filters = []
398        self.incoming_message_filters = []
399
400        self.encode_text_message_to_utf8 = True
401        self.mask_send = False
402        self.unmask_receive = True
403
404
405class Stream(StreamBase):
406    """A class for parsing/building frames of the WebSocket protocol
407    (RFC 6455).
408    """
409
410    def __init__(self, request, options):
411        """Constructs an instance.
412
413        Args:
414            request: mod_python request.
415        """
416
417        StreamBase.__init__(self, request)
418
419        self._logger = util.get_class_logger(self)
420
421        self._options = options
422
423        self._request.client_terminated = False
424        self._request.server_terminated = False
425
426        # Holds body of received fragments.
427        self._received_fragments = []
428        # Holds the opcode of the first fragment.
429        self._original_opcode = None
430
431        self._writer = FragmentedFrameBuilder(
432            self._options.mask_send, self._options.outgoing_frame_filters,
433            self._options.encode_text_message_to_utf8)
434
435        self._ping_queue = deque()
436
437    def _receive_frame(self):
438        """Receives a frame and return data in the frame as a tuple containing
439        each header field and payload separately.
440
441        Raises:
442            ConnectionTerminatedException: when read returns empty
443                string.
444            InvalidFrameException: when the frame contains invalid data.
445        """
446
447        def _receive_bytes(length):
448            return self.receive_bytes(length)
449
450        return parse_frame(receive_bytes=_receive_bytes,
451                           logger=self._logger,
452                           ws_version=self._request.ws_version,
453                           unmask_receive=self._options.unmask_receive)
454
455    def _receive_frame_as_frame_object(self):
456        opcode, unmasked_bytes, fin, rsv1, rsv2, rsv3 = self._receive_frame()
457
458        return Frame(fin=fin, rsv1=rsv1, rsv2=rsv2, rsv3=rsv3,
459                     opcode=opcode, payload=unmasked_bytes)
460
461    def receive_filtered_frame(self):
462        """Receives a frame and applies frame filters and message filters.
463        The frame to be received must satisfy following conditions:
464        - The frame is not fragmented.
465        - The opcode of the frame is TEXT or BINARY.
466
467        DO NOT USE this method except for testing purpose.
468        """
469
470        frame = self._receive_frame_as_frame_object()
471        if not frame.fin:
472            raise InvalidFrameException(
473                'Segmented frames must not be received via '
474                'receive_filtered_frame()')
475        if (frame.opcode != common.OPCODE_TEXT and
476            frame.opcode != common.OPCODE_BINARY):
477            raise InvalidFrameException(
478                'Control frames must not be received via '
479                'receive_filtered_frame()')
480
481        for frame_filter in self._options.incoming_frame_filters:
482            frame_filter.filter(frame)
483        for message_filter in self._options.incoming_message_filters:
484            frame.payload = message_filter.filter(frame.payload)
485        return frame
486
487    def send_message(self, message, end=True, binary=False):
488        """Send message.
489
490        Args:
491            message: text in unicode or binary in str to send.
492            binary: send message as binary frame.
493
494        Raises:
495            BadOperationException: when called on a server-terminated
496                connection or called with inconsistent message type or
497                binary parameter.
498        """
499
500        if self._request.server_terminated:
501            raise BadOperationException(
502                'Requested send_message after sending out a closing handshake')
503
504        if binary and isinstance(message, unicode):
505            raise BadOperationException(
506                'Message for binary frame must be instance of str')
507
508        for message_filter in self._options.outgoing_message_filters:
509            message = message_filter.filter(message, end, binary)
510
511        try:
512            # Set this to any positive integer to limit maximum size of data in
513            # payload data of each frame.
514            MAX_PAYLOAD_DATA_SIZE = -1
515
516            if MAX_PAYLOAD_DATA_SIZE <= 0:
517                self._write(self._writer.build(message, end, binary))
518                return
519
520            bytes_written = 0
521            while True:
522                end_for_this_frame = end
523                bytes_to_write = len(message) - bytes_written
524                if (MAX_PAYLOAD_DATA_SIZE > 0 and
525                    bytes_to_write > MAX_PAYLOAD_DATA_SIZE):
526                    end_for_this_frame = False
527                    bytes_to_write = MAX_PAYLOAD_DATA_SIZE
528
529                frame = self._writer.build(
530                    message[bytes_written:bytes_written + bytes_to_write],
531                    end_for_this_frame,
532                    binary)
533                self._write(frame)
534
535                bytes_written += bytes_to_write
536
537                # This if must be placed here (the end of while block) so that
538                # at least one frame is sent.
539                if len(message) <= bytes_written:
540                    break
541        except ValueError, e:
542            raise BadOperationException(e)
543
544    def _get_message_from_frame(self, frame):
545        """Gets a message from frame. If the message is composed of fragmented
546        frames and the frame is not the last fragmented frame, this method
547        returns None. The whole message will be returned when the last
548        fragmented frame is passed to this method.
549
550        Raises:
551            InvalidFrameException: when the frame doesn't match defragmentation
552                context, or the frame contains invalid data.
553        """
554
555        if frame.opcode == common.OPCODE_CONTINUATION:
556            if not self._received_fragments:
557                if frame.fin:
558                    raise InvalidFrameException(
559                        'Received a termination frame but fragmentation '
560                        'not started')
561                else:
562                    raise InvalidFrameException(
563                        'Received an intermediate frame but '
564                        'fragmentation not started')
565
566            if frame.fin:
567                # End of fragmentation frame
568                self._received_fragments.append(frame.payload)
569                message = ''.join(self._received_fragments)
570                self._received_fragments = []
571                return message
572            else:
573                # Intermediate frame
574                self._received_fragments.append(frame.payload)
575                return None
576        else:
577            if self._received_fragments:
578                if frame.fin:
579                    raise InvalidFrameException(
580                        'Received an unfragmented frame without '
581                        'terminating existing fragmentation')
582                else:
583                    raise InvalidFrameException(
584                        'New fragmentation started without terminating '
585                        'existing fragmentation')
586
587            if frame.fin:
588                # Unfragmented frame
589
590                self._original_opcode = frame.opcode
591                return frame.payload
592            else:
593                # Start of fragmentation frame
594
595                if common.is_control_opcode(frame.opcode):
596                    raise InvalidFrameException(
597                        'Control frames must not be fragmented')
598
599                self._original_opcode = frame.opcode
600                self._received_fragments.append(frame.payload)
601                return None
602
603    def _process_close_message(self, message):
604        """Processes close message.
605
606        Args:
607            message: close message.
608
609        Raises:
610            InvalidFrameException: when the message is invalid.
611        """
612
613        self._request.client_terminated = True
614
615        # Status code is optional. We can have status reason only if we
616        # have status code. Status reason can be empty string. So,
617        # allowed cases are
618        # - no application data: no code no reason
619        # - 2 octet of application data: has code but no reason
620        # - 3 or more octet of application data: both code and reason
621        if len(message) == 0:
622            self._logger.debug('Received close frame (empty body)')
623            self._request.ws_close_code = (
624                common.STATUS_NO_STATUS_RECEIVED)
625        elif len(message) == 1:
626            raise InvalidFrameException(
627                'If a close frame has status code, the length of '
628                'status code must be 2 octet')
629        elif len(message) >= 2:
630            self._request.ws_close_code = struct.unpack(
631                '!H', message[0:2])[0]
632            self._request.ws_close_reason = message[2:].decode(
633                'utf-8', 'replace')
634            self._logger.debug(
635                'Received close frame (code=%d, reason=%r)',
636                self._request.ws_close_code,
637                self._request.ws_close_reason)
638
639        # As we've received a close frame, no more data is coming over the
640        # socket. We can now safely close the socket without worrying about
641        # RST sending.
642
643        if self._request.server_terminated:
644            self._logger.debug(
645                'Received ack for server-initiated closing handshake')
646            return
647
648        self._logger.debug(
649            'Received client-initiated closing handshake')
650
651        code = common.STATUS_NORMAL_CLOSURE
652        reason = ''
653        if hasattr(self._request, '_dispatcher'):
654            dispatcher = self._request._dispatcher
655            code, reason = dispatcher.passive_closing_handshake(
656                self._request)
657            if code is None and reason is not None and len(reason) > 0:
658                self._logger.warning(
659                    'Handler specified reason despite code being None')
660                reason = ''
661            if reason is None:
662                reason = ''
663        self._send_closing_handshake(code, reason)
664        self._logger.debug(
665            'Acknowledged closing handshake initiated by the peer '
666            '(code=%r, reason=%r)', code, reason)
667
668    def _process_ping_message(self, message):
669        """Processes ping message.
670
671        Args:
672            message: ping message.
673        """
674
675        try:
676            handler = self._request.on_ping_handler
677            if handler:
678                handler(self._request, message)
679                return
680        except AttributeError, e:
681            pass
682        self._send_pong(message)
683
684    def _process_pong_message(self, message):
685        """Processes pong message.
686
687        Args:
688            message: pong message.
689        """
690
691        # TODO(tyoshino): Add ping timeout handling.
692
693        inflight_pings = deque()
694
695        while True:
696            try:
697                expected_body = self._ping_queue.popleft()
698                if expected_body == message:
699                    # inflight_pings contains pings ignored by the
700                    # other peer. Just forget them.
701                    self._logger.debug(
702                        'Ping %r is acked (%d pings were ignored)',
703                        expected_body, len(inflight_pings))
704                    break
705                else:
706                    inflight_pings.append(expected_body)
707            except IndexError, e:
708                # The received pong was unsolicited pong. Keep the
709                # ping queue as is.
710                self._ping_queue = inflight_pings
711                self._logger.debug('Received a unsolicited pong')
712                break
713
714        try:
715            handler = self._request.on_pong_handler
716            if handler:
717                handler(self._request, message)
718        except AttributeError, e:
719            pass
720
721    def receive_message(self):
722        """Receive a WebSocket frame and return its payload as a text in
723        unicode or a binary in str.
724
725        Returns:
726            payload data of the frame
727            - as unicode instance if received text frame
728            - as str instance if received binary frame
729            or None iff received closing handshake.
730        Raises:
731            BadOperationException: when called on a client-terminated
732                connection.
733            ConnectionTerminatedException: when read returns empty
734                string.
735            InvalidFrameException: when the frame contains invalid
736                data.
737            UnsupportedFrameException: when the received frame has
738                flags, opcode we cannot handle. You can ignore this
739                exception and continue receiving the next frame.
740        """
741
742        if self._request.client_terminated:
743            raise BadOperationException(
744                'Requested receive_message after receiving a closing '
745                'handshake')
746
747        while True:
748            # mp_conn.read will block if no bytes are available.
749            # Timeout is controlled by TimeOut directive of Apache.
750
751            frame = self._receive_frame_as_frame_object()
752
753            # Check the constraint on the payload size for control frames
754            # before extension processes the frame.
755            # See also http://tools.ietf.org/html/rfc6455#section-5.5
756            if (common.is_control_opcode(frame.opcode) and
757                len(frame.payload) > 125):
758                raise InvalidFrameException(
759                    'Payload data size of control frames must be 125 bytes or '
760                    'less')
761
762            for frame_filter in self._options.incoming_frame_filters:
763                frame_filter.filter(frame)
764
765            if frame.rsv1 or frame.rsv2 or frame.rsv3:
766                raise UnsupportedFrameException(
767                    'Unsupported flag is set (rsv = %d%d%d)' %
768                    (frame.rsv1, frame.rsv2, frame.rsv3))
769
770            message = self._get_message_from_frame(frame)
771            if message is None:
772                continue
773
774            for message_filter in self._options.incoming_message_filters:
775                message = message_filter.filter(message)
776
777            if self._original_opcode == common.OPCODE_TEXT:
778                # The WebSocket protocol section 4.4 specifies that invalid
779                # characters must be replaced with U+fffd REPLACEMENT
780                # CHARACTER.
781                try:
782                    return message.decode('utf-8')
783                except UnicodeDecodeError, e:
784                    raise InvalidUTF8Exception(e)
785            elif self._original_opcode == common.OPCODE_BINARY:
786                return message
787            elif self._original_opcode == common.OPCODE_CLOSE:
788                self._process_close_message(message)
789                return None
790            elif self._original_opcode == common.OPCODE_PING:
791                self._process_ping_message(message)
792            elif self._original_opcode == common.OPCODE_PONG:
793                self._process_pong_message(message)
794            else:
795                raise UnsupportedFrameException(
796                    'Opcode %d is not supported' % self._original_opcode)
797
798    def _send_closing_handshake(self, code, reason):
799        body = create_closing_handshake_body(code, reason)
800        frame = create_close_frame(
801            body, mask=self._options.mask_send,
802            frame_filters=self._options.outgoing_frame_filters)
803
804        self._request.server_terminated = True
805
806        self._write(frame)
807
808    def close_connection(self, code=common.STATUS_NORMAL_CLOSURE, reason='',
809                         wait_response=True):
810        """Closes a WebSocket connection.
811
812        Args:
813            code: Status code for close frame. If code is None, a close
814                frame with empty body will be sent.
815            reason: string representing close reason.
816            wait_response: True when caller want to wait the response.
817        Raises:
818            BadOperationException: when reason is specified with code None
819            or reason is not an instance of both str and unicode.
820        """
821
822        if self._request.server_terminated:
823            self._logger.debug(
824                'Requested close_connection but server is already terminated')
825            return
826
827        if code is None:
828            if reason is not None and len(reason) > 0:
829                raise BadOperationException(
830                    'close reason must not be specified if code is None')
831            reason = ''
832        else:
833            if not isinstance(reason, str) and not isinstance(reason, unicode):
834                raise BadOperationException(
835                    'close reason must be an instance of str or unicode')
836
837        self._send_closing_handshake(code, reason)
838        self._logger.debug(
839            'Initiated closing handshake (code=%r, reason=%r)',
840            code, reason)
841
842        if (code == common.STATUS_GOING_AWAY or
843            code == common.STATUS_PROTOCOL_ERROR) or not wait_response:
844            # It doesn't make sense to wait for a close frame if the reason is
845            # protocol error or that the server is going away. For some of
846            # other reasons, it might not make sense to wait for a close frame,
847            # but it's not clear, yet.
848            return
849
850        # TODO(ukai): 2. wait until the /client terminated/ flag has been set,
851        # or until a server-defined timeout expires.
852        #
853        # For now, we expect receiving closing handshake right after sending
854        # out closing handshake.
855        message = self.receive_message()
856        if message is not None:
857            raise ConnectionTerminatedException(
858                'Didn\'t receive valid ack for closing handshake')
859        # TODO: 3. close the WebSocket connection.
860        # note: mod_python Connection (mp_conn) doesn't have close method.
861
862    def send_ping(self, body=''):
863        frame = create_ping_frame(
864            body,
865            self._options.mask_send,
866            self._options.outgoing_frame_filters)
867        self._write(frame)
868
869        self._ping_queue.append(body)
870
871    def _send_pong(self, body):
872        frame = create_pong_frame(
873            body,
874            self._options.mask_send,
875            self._options.outgoing_frame_filters)
876        self._write(frame)
877
878    def get_last_received_opcode(self):
879        """Returns the opcode of the WebSocket message which the last received
880        frame belongs to. The return value is valid iff immediately after
881        receive_message call.
882        """
883
884        return self._original_opcode
885
886
887# vi:sts=4 sw=4 et
888