1# Copyright 2012, Google Inc.
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met:
7#
8#     * Redistributions of source code must retain the above copyright
9# notice, this list of conditions and the following disclaimer.
10#     * Redistributions in binary form must reproduce the above
11# copyright notice, this list of conditions and the following disclaimer
12# in the documentation and/or other materials provided with the
13# distribution.
14#     * Neither the name of Google Inc. nor the names of its
15# contributors may be used to endorse or promote products derived from
16# this software without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31"""This file provides classes and helper functions for parsing/building frames
32of the WebSocket protocol (RFC 6455).
33
34Specification:
35http://tools.ietf.org/html/rfc6455
36"""
37
38
39from collections import deque
40import logging
41import os
42import struct
43import time
44
45from mod_pywebsocket import common
46from mod_pywebsocket import util
47from mod_pywebsocket._stream_base import BadOperationException
48from mod_pywebsocket._stream_base import ConnectionTerminatedException
49from mod_pywebsocket._stream_base import InvalidFrameException
50from mod_pywebsocket._stream_base import InvalidUTF8Exception
51from mod_pywebsocket._stream_base import StreamBase
52from mod_pywebsocket._stream_base import UnsupportedFrameException
53
54
55_NOOP_MASKER = util.NoopMasker()
56
57
58class Frame(object):
59
60    def __init__(self, fin=1, rsv1=0, rsv2=0, rsv3=0,
61                 opcode=None, payload=''):
62        self.fin = fin
63        self.rsv1 = rsv1
64        self.rsv2 = rsv2
65        self.rsv3 = rsv3
66        self.opcode = opcode
67        self.payload = payload
68
69
70# Helper functions made public to be used for writing unittests for WebSocket
71# clients.
72
73
74def create_length_header(length, mask):
75    """Creates a length header.
76
77    Args:
78        length: Frame length. Must be less than 2^63.
79        mask: Mask bit. Must be boolean.
80
81    Raises:
82        ValueError: when bad data is given.
83    """
84
85    if mask:
86        mask_bit = 1 << 7
87    else:
88        mask_bit = 0
89
90    if length < 0:
91        raise ValueError('length must be non negative integer')
92    elif length <= 125:
93        return chr(mask_bit | length)
94    elif length < (1 << 16):
95        return chr(mask_bit | 126) + struct.pack('!H', length)
96    elif length < (1 << 63):
97        return chr(mask_bit | 127) + struct.pack('!Q', length)
98    else:
99        raise ValueError('Payload is too big for one frame')
100
101
102def create_header(opcode, payload_length, fin, rsv1, rsv2, rsv3, mask):
103    """Creates a frame header.
104
105    Raises:
106        Exception: when bad data is given.
107    """
108
109    if opcode < 0 or 0xf < opcode:
110        raise ValueError('Opcode out of range')
111
112    if payload_length < 0 or (1 << 63) <= payload_length:
113        raise ValueError('payload_length out of range')
114
115    if (fin | rsv1 | rsv2 | rsv3) & ~1:
116        raise ValueError('FIN bit and Reserved bit parameter must be 0 or 1')
117
118    header = ''
119
120    first_byte = ((fin << 7)
121                  | (rsv1 << 6) | (rsv2 << 5) | (rsv3 << 4)
122                  | opcode)
123    header += chr(first_byte)
124    header += create_length_header(payload_length, mask)
125
126    return header
127
128
129def _build_frame(header, body, mask):
130    if not mask:
131        return header + body
132
133    masking_nonce = os.urandom(4)
134    masker = util.RepeatedXorMasker(masking_nonce)
135
136    return header + masking_nonce + masker.mask(body)
137
138
139def _filter_and_format_frame_object(frame, mask, frame_filters):
140    for frame_filter in frame_filters:
141        frame_filter.filter(frame)
142
143    header = create_header(
144        frame.opcode, len(frame.payload), frame.fin,
145        frame.rsv1, frame.rsv2, frame.rsv3, mask)
146    return _build_frame(header, frame.payload, mask)
147
148
149def create_binary_frame(
150    message, opcode=common.OPCODE_BINARY, fin=1, mask=False, frame_filters=[]):
151    """Creates a simple binary frame with no extension, reserved bit."""
152
153    frame = Frame(fin=fin, opcode=opcode, payload=message)
154    return _filter_and_format_frame_object(frame, mask, frame_filters)
155
156
157def create_text_frame(
158    message, opcode=common.OPCODE_TEXT, fin=1, mask=False, frame_filters=[]):
159    """Creates a simple text frame with no extension, reserved bit."""
160
161    encoded_message = message.encode('utf-8')
162    return create_binary_frame(encoded_message, opcode, fin, mask,
163                               frame_filters)
164
165
166def parse_frame(receive_bytes, logger=None,
167                ws_version=common.VERSION_HYBI_LATEST,
168                unmask_receive=True):
169    """Parses a frame. Returns a tuple containing each header field and
170    payload.
171
172    Args:
173        receive_bytes: a function that reads frame data from a stream or
174            something similar. The function takes length of the bytes to be
175            read. The function must raise ConnectionTerminatedException if
176            there is not enough data to be read.
177        logger: a logging object.
178        ws_version: the version of WebSocket protocol.
179        unmask_receive: unmask received frames. When received unmasked
180            frame, raises InvalidFrameException.
181
182    Raises:
183        ConnectionTerminatedException: when receive_bytes raises it.
184        InvalidFrameException: when the frame contains invalid data.
185    """
186
187    if not logger:
188        logger = logging.getLogger()
189
190    logger.log(common.LOGLEVEL_FINE, 'Receive the first 2 octets of a frame')
191
192    received = receive_bytes(2)
193
194    first_byte = ord(received[0])
195    fin = (first_byte >> 7) & 1
196    rsv1 = (first_byte >> 6) & 1
197    rsv2 = (first_byte >> 5) & 1
198    rsv3 = (first_byte >> 4) & 1
199    opcode = first_byte & 0xf
200
201    second_byte = ord(received[1])
202    mask = (second_byte >> 7) & 1
203    payload_length = second_byte & 0x7f
204
205    logger.log(common.LOGLEVEL_FINE,
206               'FIN=%s, RSV1=%s, RSV2=%s, RSV3=%s, opcode=%s, '
207               'Mask=%s, Payload_length=%s',
208               fin, rsv1, rsv2, rsv3, opcode, mask, payload_length)
209
210    if (mask == 1) != unmask_receive:
211        raise InvalidFrameException(
212            'Mask bit on the received frame did\'nt match masking '
213            'configuration for received frames')
214
215    # The HyBi and later specs disallow putting a value in 0x0-0xFFFF
216    # into the 8-octet extended payload length field (or 0x0-0xFD in
217    # 2-octet field).
218    valid_length_encoding = True
219    length_encoding_bytes = 1
220    if payload_length == 127:
221        logger.log(common.LOGLEVEL_FINE,
222                   'Receive 8-octet extended payload length')
223
224        extended_payload_length = receive_bytes(8)
225        payload_length = struct.unpack(
226            '!Q', extended_payload_length)[0]
227        if payload_length > 0x7FFFFFFFFFFFFFFF:
228            raise InvalidFrameException(
229                'Extended payload length >= 2^63')
230        if ws_version >= 13 and payload_length < 0x10000:
231            valid_length_encoding = False
232            length_encoding_bytes = 8
233
234        logger.log(common.LOGLEVEL_FINE,
235                   'Decoded_payload_length=%s', payload_length)
236    elif payload_length == 126:
237        logger.log(common.LOGLEVEL_FINE,
238                   'Receive 2-octet extended payload length')
239
240        extended_payload_length = receive_bytes(2)
241        payload_length = struct.unpack(
242            '!H', extended_payload_length)[0]
243        if ws_version >= 13 and payload_length < 126:
244            valid_length_encoding = False
245            length_encoding_bytes = 2
246
247        logger.log(common.LOGLEVEL_FINE,
248                   'Decoded_payload_length=%s', payload_length)
249
250    if not valid_length_encoding:
251        logger.warning(
252            'Payload length is not encoded using the minimal number of '
253            'bytes (%d is encoded using %d bytes)',
254            payload_length,
255            length_encoding_bytes)
256
257    if mask == 1:
258        logger.log(common.LOGLEVEL_FINE, 'Receive mask')
259
260        masking_nonce = receive_bytes(4)
261        masker = util.RepeatedXorMasker(masking_nonce)
262
263        logger.log(common.LOGLEVEL_FINE, 'Mask=%r', masking_nonce)
264    else:
265        masker = _NOOP_MASKER
266
267    logger.log(common.LOGLEVEL_FINE, 'Receive payload data')
268    if logger.isEnabledFor(common.LOGLEVEL_FINE):
269        receive_start = time.time()
270
271    raw_payload_bytes = receive_bytes(payload_length)
272
273    if logger.isEnabledFor(common.LOGLEVEL_FINE):
274        logger.log(
275            common.LOGLEVEL_FINE,
276            'Done receiving payload data at %s MB/s',
277            payload_length / (time.time() - receive_start) / 1000 / 1000)
278    logger.log(common.LOGLEVEL_FINE, 'Unmask payload data')
279
280    if logger.isEnabledFor(common.LOGLEVEL_FINE):
281        unmask_start = time.time()
282
283    bytes = masker.mask(raw_payload_bytes)
284
285    if logger.isEnabledFor(common.LOGLEVEL_FINE):
286        logger.log(
287            common.LOGLEVEL_FINE,
288            'Done unmasking payload data at %s MB/s',
289            payload_length / (time.time() - unmask_start) / 1000 / 1000)
290
291    return opcode, bytes, fin, rsv1, rsv2, rsv3
292
293
294class FragmentedFrameBuilder(object):
295    """A stateful class to send a message as fragments."""
296
297    def __init__(self, mask, frame_filters=[], encode_utf8=True):
298        """Constructs an instance."""
299
300        self._mask = mask
301        self._frame_filters = frame_filters
302        # This is for skipping UTF-8 encoding when building text type frames
303        # from compressed data.
304        self._encode_utf8 = encode_utf8
305
306        self._started = False
307
308        # Hold opcode of the first frame in messages to verify types of other
309        # frames in the message are all the same.
310        self._opcode = common.OPCODE_TEXT
311
312    def build(self, message, end, binary):
313        if binary:
314            frame_type = common.OPCODE_BINARY
315        else:
316            frame_type = common.OPCODE_TEXT
317        if self._started:
318            if self._opcode != frame_type:
319                raise ValueError('Message types are different in frames for '
320                                 'the same message')
321            opcode = common.OPCODE_CONTINUATION
322        else:
323            opcode = frame_type
324            self._opcode = frame_type
325
326        if end:
327            self._started = False
328            fin = 1
329        else:
330            self._started = True
331            fin = 0
332
333        if binary or not self._encode_utf8:
334            return create_binary_frame(
335                message, opcode, fin, self._mask, self._frame_filters)
336        else:
337            return create_text_frame(
338                message, opcode, fin, self._mask, self._frame_filters)
339
340
341def _create_control_frame(opcode, body, mask, frame_filters):
342    frame = Frame(opcode=opcode, payload=body)
343
344    for frame_filter in frame_filters:
345        frame_filter.filter(frame)
346
347    if len(frame.payload) > 125:
348        raise BadOperationException(
349            'Payload data size of control frames must be 125 bytes or less')
350
351    header = create_header(
352        frame.opcode, len(frame.payload), frame.fin,
353        frame.rsv1, frame.rsv2, frame.rsv3, mask)
354    return _build_frame(header, frame.payload, mask)
355
356
357def create_ping_frame(body, mask=False, frame_filters=[]):
358    return _create_control_frame(common.OPCODE_PING, body, mask, frame_filters)
359
360
361def create_pong_frame(body, mask=False, frame_filters=[]):
362    return _create_control_frame(common.OPCODE_PONG, body, mask, frame_filters)
363
364
365def create_close_frame(body, mask=False, frame_filters=[]):
366    return _create_control_frame(
367        common.OPCODE_CLOSE, body, mask, frame_filters)
368
369
370def create_closing_handshake_body(code, reason):
371    body = ''
372    if code is not None:
373        if (code > common.STATUS_USER_PRIVATE_MAX or
374            code < common.STATUS_NORMAL_CLOSURE):
375            raise BadOperationException('Status code is out of range')
376        if (code == common.STATUS_NO_STATUS_RECEIVED or
377            code == common.STATUS_ABNORMAL_CLOSURE or
378            code == common.STATUS_TLS_HANDSHAKE):
379            raise BadOperationException('Status code is reserved pseudo '
380                'code')
381        encoded_reason = reason.encode('utf-8')
382        body = struct.pack('!H', code) + encoded_reason
383    return body
384
385
386class StreamOptions(object):
387    """Holds option values to configure Stream objects."""
388
389    def __init__(self):
390        """Constructs StreamOptions."""
391
392        # Enables deflate-stream extension.
393        self.deflate_stream = False
394
395        # Filters applied to frames.
396        self.outgoing_frame_filters = []
397        self.incoming_frame_filters = []
398
399        # Filters applied to messages. Control frames are not affected by them.
400        self.outgoing_message_filters = []
401        self.incoming_message_filters = []
402
403        self.encode_text_message_to_utf8 = True
404        self.mask_send = False
405        self.unmask_receive = True
406        # RFC6455 disallows fragmented control frames, but mux extension
407        # relaxes the restriction.
408        self.allow_fragmented_control_frame = False
409
410
411class Stream(StreamBase):
412    """A class for parsing/building frames of the WebSocket protocol
413    (RFC 6455).
414    """
415
416    def __init__(self, request, options):
417        """Constructs an instance.
418
419        Args:
420            request: mod_python request.
421        """
422
423        StreamBase.__init__(self, request)
424
425        self._logger = util.get_class_logger(self)
426
427        self._options = options
428
429        if self._options.deflate_stream:
430            self._logger.debug('Setup filter for deflate-stream')
431            self._request = util.DeflateRequest(self._request)
432
433        self._request.client_terminated = False
434        self._request.server_terminated = False
435
436        # Holds body of received fragments.
437        self._received_fragments = []
438        # Holds the opcode of the first fragment.
439        self._original_opcode = None
440
441        self._writer = FragmentedFrameBuilder(
442            self._options.mask_send, self._options.outgoing_frame_filters,
443            self._options.encode_text_message_to_utf8)
444
445        self._ping_queue = deque()
446
447    def _receive_frame(self):
448        """Receives a frame and return data in the frame as a tuple containing
449        each header field and payload separately.
450
451        Raises:
452            ConnectionTerminatedException: when read returns empty
453                string.
454            InvalidFrameException: when the frame contains invalid data.
455        """
456
457        def _receive_bytes(length):
458            return self.receive_bytes(length)
459
460        return parse_frame(receive_bytes=_receive_bytes,
461                           logger=self._logger,
462                           ws_version=self._request.ws_version,
463                           unmask_receive=self._options.unmask_receive)
464
465    def _receive_frame_as_frame_object(self):
466        opcode, bytes, fin, rsv1, rsv2, rsv3 = self._receive_frame()
467
468        return Frame(fin=fin, rsv1=rsv1, rsv2=rsv2, rsv3=rsv3,
469                     opcode=opcode, payload=bytes)
470
471    def send_message(self, message, end=True, binary=False):
472        """Send message.
473
474        Args:
475            message: text in unicode or binary in str to send.
476            binary: send message as binary frame.
477
478        Raises:
479            BadOperationException: when called on a server-terminated
480                connection or called with inconsistent message type or
481                binary parameter.
482        """
483
484        if self._request.server_terminated:
485            raise BadOperationException(
486                'Requested send_message after sending out a closing handshake')
487
488        if binary and isinstance(message, unicode):
489            raise BadOperationException(
490                'Message for binary frame must be instance of str')
491
492        for message_filter in self._options.outgoing_message_filters:
493            message = message_filter.filter(message, end, binary)
494
495        try:
496            self._write(self._writer.build(message, end, binary))
497        except ValueError, e:
498            raise BadOperationException(e)
499
500    def _get_message_from_frame(self, frame):
501        """Gets a message from frame. If the message is composed of fragmented
502        frames and the frame is not the last fragmented frame, this method
503        returns None. The whole message will be returned when the last
504        fragmented frame is passed to this method.
505
506        Raises:
507            InvalidFrameException: when the frame doesn't match defragmentation
508                context, or the frame contains invalid data.
509        """
510
511        if frame.opcode == common.OPCODE_CONTINUATION:
512            if not self._received_fragments:
513                if frame.fin:
514                    raise InvalidFrameException(
515                        'Received a termination frame but fragmentation '
516                        'not started')
517                else:
518                    raise InvalidFrameException(
519                        'Received an intermediate frame but '
520                        'fragmentation not started')
521
522            if frame.fin:
523                # End of fragmentation frame
524                self._received_fragments.append(frame.payload)
525                message = ''.join(self._received_fragments)
526                self._received_fragments = []
527                return message
528            else:
529                # Intermediate frame
530                self._received_fragments.append(frame.payload)
531                return None
532        else:
533            if self._received_fragments:
534                if frame.fin:
535                    raise InvalidFrameException(
536                        'Received an unfragmented frame without '
537                        'terminating existing fragmentation')
538                else:
539                    raise InvalidFrameException(
540                        'New fragmentation started without terminating '
541                        'existing fragmentation')
542
543            if frame.fin:
544                # Unfragmented frame
545
546                self._original_opcode = frame.opcode
547                return frame.payload
548            else:
549                # Start of fragmentation frame
550
551                if (not self._options.allow_fragmented_control_frame and
552                    common.is_control_opcode(frame.opcode)):
553                    raise InvalidFrameException(
554                        'Control frames must not be fragmented')
555
556                self._original_opcode = frame.opcode
557                self._received_fragments.append(frame.payload)
558                return None
559
560    def _process_close_message(self, message):
561        """Processes close message.
562
563        Args:
564            message: close message.
565
566        Raises:
567            InvalidFrameException: when the message is invalid.
568        """
569
570        self._request.client_terminated = True
571
572        # Status code is optional. We can have status reason only if we
573        # have status code. Status reason can be empty string. So,
574        # allowed cases are
575        # - no application data: no code no reason
576        # - 2 octet of application data: has code but no reason
577        # - 3 or more octet of application data: both code and reason
578        if len(message) == 0:
579            self._logger.debug('Received close frame (empty body)')
580            self._request.ws_close_code = (
581                common.STATUS_NO_STATUS_RECEIVED)
582        elif len(message) == 1:
583            raise InvalidFrameException(
584                'If a close frame has status code, the length of '
585                'status code must be 2 octet')
586        elif len(message) >= 2:
587            self._request.ws_close_code = struct.unpack(
588                '!H', message[0:2])[0]
589            self._request.ws_close_reason = message[2:].decode(
590                'utf-8', 'replace')
591            self._logger.debug(
592                'Received close frame (code=%d, reason=%r)',
593                self._request.ws_close_code,
594                self._request.ws_close_reason)
595
596        # Drain junk data after the close frame if necessary.
597        self._drain_received_data()
598
599        if self._request.server_terminated:
600            self._logger.debug(
601                'Received ack for server-initiated closing handshake')
602            return
603
604        self._logger.debug(
605            'Received client-initiated closing handshake')
606
607        code = common.STATUS_NORMAL_CLOSURE
608        reason = ''
609        if hasattr(self._request, '_dispatcher'):
610            dispatcher = self._request._dispatcher
611            code, reason = dispatcher.passive_closing_handshake(
612                self._request)
613            if code is None and reason is not None and len(reason) > 0:
614                self._logger.warning(
615                    'Handler specified reason despite code being None')
616                reason = ''
617            if reason is None:
618                reason = ''
619        self._send_closing_handshake(code, reason)
620        self._logger.debug(
621            'Sent ack for client-initiated closing handshake '
622            '(code=%r, reason=%r)', code, reason)
623
624    def _process_ping_message(self, message):
625        """Processes ping message.
626
627        Args:
628            message: ping message.
629        """
630
631        try:
632            handler = self._request.on_ping_handler
633            if handler:
634                handler(self._request, message)
635                return
636        except AttributeError, e:
637            pass
638        self._send_pong(message)
639
640    def _process_pong_message(self, message):
641        """Processes pong message.
642
643        Args:
644            message: pong message.
645        """
646
647        # TODO(tyoshino): Add ping timeout handling.
648
649        inflight_pings = deque()
650
651        while True:
652            try:
653                expected_body = self._ping_queue.popleft()
654                if expected_body == message:
655                    # inflight_pings contains pings ignored by the
656                    # other peer. Just forget them.
657                    self._logger.debug(
658                        'Ping %r is acked (%d pings were ignored)',
659                        expected_body, len(inflight_pings))
660                    break
661                else:
662                    inflight_pings.append(expected_body)
663            except IndexError, e:
664                # The received pong was unsolicited pong. Keep the
665                # ping queue as is.
666                self._ping_queue = inflight_pings
667                self._logger.debug('Received a unsolicited pong')
668                break
669
670        try:
671            handler = self._request.on_pong_handler
672            if handler:
673                handler(self._request, message)
674        except AttributeError, e:
675            pass
676
677    def receive_message(self):
678        """Receive a WebSocket frame and return its payload as a text in
679        unicode or a binary in str.
680
681        Returns:
682            payload data of the frame
683            - as unicode instance if received text frame
684            - as str instance if received binary frame
685            or None iff received closing handshake.
686        Raises:
687            BadOperationException: when called on a client-terminated
688                connection.
689            ConnectionTerminatedException: when read returns empty
690                string.
691            InvalidFrameException: when the frame contains invalid
692                data.
693            UnsupportedFrameException: when the received frame has
694                flags, opcode we cannot handle. You can ignore this
695                exception and continue receiving the next frame.
696        """
697
698        if self._request.client_terminated:
699            raise BadOperationException(
700                'Requested receive_message after receiving a closing '
701                'handshake')
702
703        while True:
704            # mp_conn.read will block if no bytes are available.
705            # Timeout is controlled by TimeOut directive of Apache.
706
707            frame = self._receive_frame_as_frame_object()
708
709            # Check the constraint on the payload size for control frames
710            # before extension processes the frame.
711            # See also http://tools.ietf.org/html/rfc6455#section-5.5
712            if (common.is_control_opcode(frame.opcode) and
713                len(frame.payload) > 125):
714                raise InvalidFrameException(
715                    'Payload data size of control frames must be 125 bytes or '
716                    'less')
717
718            for frame_filter in self._options.incoming_frame_filters:
719                frame_filter.filter(frame)
720
721            if frame.rsv1 or frame.rsv2 or frame.rsv3:
722                raise UnsupportedFrameException(
723                    'Unsupported flag is set (rsv = %d%d%d)' %
724                    (frame.rsv1, frame.rsv2, frame.rsv3))
725
726            message = self._get_message_from_frame(frame)
727            if message is None:
728                continue
729
730            for message_filter in self._options.incoming_message_filters:
731                message = message_filter.filter(message)
732
733            if self._original_opcode == common.OPCODE_TEXT:
734                # The WebSocket protocol section 4.4 specifies that invalid
735                # characters must be replaced with U+fffd REPLACEMENT
736                # CHARACTER.
737                try:
738                    return message.decode('utf-8')
739                except UnicodeDecodeError, e:
740                    raise InvalidUTF8Exception(e)
741            elif self._original_opcode == common.OPCODE_BINARY:
742                return message
743            elif self._original_opcode == common.OPCODE_CLOSE:
744                self._process_close_message(message)
745                return None
746            elif self._original_opcode == common.OPCODE_PING:
747                self._process_ping_message(message)
748            elif self._original_opcode == common.OPCODE_PONG:
749                self._process_pong_message(message)
750            else:
751                raise UnsupportedFrameException(
752                    'Opcode %d is not supported' % self._original_opcode)
753
754    def _send_closing_handshake(self, code, reason):
755        body = create_closing_handshake_body(code, reason)
756        frame = create_close_frame(
757            body, mask=self._options.mask_send,
758            frame_filters=self._options.outgoing_frame_filters)
759
760        self._request.server_terminated = True
761
762        self._write(frame)
763
764    def close_connection(self, code=common.STATUS_NORMAL_CLOSURE, reason=''):
765        """Closes a WebSocket connection.
766
767        Args:
768            code: Status code for close frame. If code is None, a close
769                frame with empty body will be sent.
770            reason: string representing close reason.
771        Raises:
772            BadOperationException: when reason is specified with code None
773            or reason is not an instance of both str and unicode.
774        """
775
776        if self._request.server_terminated:
777            self._logger.debug(
778                'Requested close_connection but server is already terminated')
779            return
780
781        if code is None:
782            if reason is not None and len(reason) > 0:
783                raise BadOperationException(
784                    'close reason must not be specified if code is None')
785            reason = ''
786        else:
787            if not isinstance(reason, str) and not isinstance(reason, unicode):
788                raise BadOperationException(
789                    'close reason must be an instance of str or unicode')
790
791        self._send_closing_handshake(code, reason)
792        self._logger.debug(
793            'Sent server-initiated closing handshake (code=%r, reason=%r)',
794            code, reason)
795
796        if (code == common.STATUS_GOING_AWAY or
797            code == common.STATUS_PROTOCOL_ERROR):
798            # It doesn't make sense to wait for a close frame if the reason is
799            # protocol error or that the server is going away. For some of
800            # other reasons, it might not make sense to wait for a close frame,
801            # but it's not clear, yet.
802            return
803
804        # TODO(ukai): 2. wait until the /client terminated/ flag has been set,
805        # or until a server-defined timeout expires.
806        #
807        # For now, we expect receiving closing handshake right after sending
808        # out closing handshake.
809        message = self.receive_message()
810        if message is not None:
811            raise ConnectionTerminatedException(
812                'Didn\'t receive valid ack for closing handshake')
813        # TODO: 3. close the WebSocket connection.
814        # note: mod_python Connection (mp_conn) doesn't have close method.
815
816    def send_ping(self, body=''):
817        frame = create_ping_frame(
818            body,
819            self._options.mask_send,
820            self._options.outgoing_frame_filters)
821        self._write(frame)
822
823        self._ping_queue.append(body)
824
825    def _send_pong(self, body):
826        frame = create_pong_frame(
827            body,
828            self._options.mask_send,
829            self._options.outgoing_frame_filters)
830        self._write(frame)
831
832    def get_last_received_opcode(self):
833        """Returns the opcode of the WebSocket message which the last received
834        frame belongs to. The return value is valid iff immediately after
835        receive_message call.
836        """
837
838        return self._original_opcode
839
840    def _drain_received_data(self):
841        """Drains unread data in the receive buffer to avoid sending out TCP
842        RST packet. This is because when deflate-stream is enabled, some
843        DEFLATE block for flushing data may follow a close frame. If any data
844        remains in the receive buffer of a socket when the socket is closed,
845        it sends out TCP RST packet to the other peer.
846
847        Since mod_python's mp_conn object doesn't support non-blocking read,
848        we perform this only when pywebsocket is running in standalone mode.
849        """
850
851        # If self._options.deflate_stream is true, self._request is
852        # DeflateRequest, so we can get wrapped request object by
853        # self._request._request.
854        #
855        # Only _StandaloneRequest has _drain_received_data method.
856        if (self._options.deflate_stream and
857            ('_drain_received_data' in dir(self._request._request))):
858            self._request._request._drain_received_data()
859
860
861# vi:sts=4 sw=4 et
862