12da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# Copyright 2012, Google Inc.
22da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# All rights reserved.
32da489cd246702bee5938545b18a6f710ed214bcJamie Gennis#
42da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# Redistribution and use in source and binary forms, with or without
52da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# modification, are permitted provided that the following conditions are
62da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# met:
72da489cd246702bee5938545b18a6f710ed214bcJamie Gennis#
82da489cd246702bee5938545b18a6f710ed214bcJamie Gennis#     * Redistributions of source code must retain the above copyright
92da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# notice, this list of conditions and the following disclaimer.
102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis#     * Redistributions in binary form must reproduce the above
112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# copyright notice, this list of conditions and the following disclaimer
122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# in the documentation and/or other materials provided with the
132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# distribution.
142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis#     * Neither the name of Google Inc. nor the names of its
152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# contributors may be used to endorse or promote products derived from
162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# this software without specific prior written permission.
172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis#
182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis"""This file provides classes and helper functions for multiplexing extension.
322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
332da489cd246702bee5938545b18a6f710ed214bcJamie GennisSpecification:
342da489cd246702bee5938545b18a6f710ed214bcJamie Gennishttp://tools.ietf.org/html/draft-ietf-hybi-websocket-multiplexing-03
352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis"""
362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
382da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport collections
392da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport copy
402da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport email
412da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport email.parser
422da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport logging
432da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport math
442da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport struct
452da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport threading
462da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport traceback
472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
482da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket import common
492da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket import handshake
502da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket import util
512da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket._stream_base import BadOperationException
522da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket._stream_base import ConnectionTerminatedException
532da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket._stream_hybi import Frame
542da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket._stream_hybi import Stream
552da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket._stream_hybi import StreamOptions
562da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket._stream_hybi import create_binary_frame
572da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket._stream_hybi import create_closing_handshake_body
582da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket._stream_hybi import create_header
592da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket._stream_hybi import parse_frame
602da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket.handshake import hybi
612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_CONTROL_CHANNEL_ID = 0
642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_DEFAULT_CHANNEL_ID = 1
652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_MUX_OPCODE_ADD_CHANNEL_REQUEST = 0
672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_MUX_OPCODE_ADD_CHANNEL_RESPONSE = 1
682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_MUX_OPCODE_FLOW_CONTROL = 2
692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_MUX_OPCODE_DROP_CHANNEL = 3
702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_MUX_OPCODE_NEW_CHANNEL_SLOT = 4
712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_MAX_CHANNEL_ID = 2 ** 29 - 1
732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_INITIAL_NUMBER_OF_CHANNEL_SLOTS = 64
752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_INITIAL_QUOTA_FOR_CLIENT = 8 * 1024
762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# We need only these status code for now.
782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_HTTP_BAD_RESPONSE_MESSAGES = {
792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    common.HTTP_STATUS_BAD_REQUEST: 'Bad Request',
802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis}
812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
832da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass MuxUnexpectedException(Exception):
842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """Exception in handling multiplexing extension."""
852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    pass
862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# Temporary
892da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass MuxNotImplementedException(Exception):
902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """Raised when a flow enters unimplemented code path."""
912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    pass
922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
942da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass InvalidMuxFrameException(Exception):
952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """Raised when an invalid multiplexed frame received."""
962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    pass
972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
992da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass InvalidMuxControlBlockException(Exception):
1002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """Raised when an invalid multiplexing control block received."""
1012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    pass
1022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1042da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass LogicalConnectionClosedException(Exception):
1052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """Raised when logical connection is gracefully closed."""
1062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    pass
1072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1092da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _encode_channel_id(channel_id):
1102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if channel_id < 0:
1112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise ValueError('Channel id %d must not be negative' % channel_id)
1122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if channel_id < 2 ** 7:
1142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return chr(channel_id)
1152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if channel_id < 2 ** 14:
1162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return struct.pack('!H', 0x8000 + channel_id)
1172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if channel_id < 2 ** 21:
1182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        first = chr(0xc0 + (channel_id >> 16))
1192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return first + struct.pack('!H', channel_id & 0xffff)
1202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if channel_id < 2 ** 29:
1212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return struct.pack('!L', 0xe0000000 + channel_id)
1222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    raise ValueError('Channel id %d is too large' % channel_id)
1242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1262da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _size_of_number_in_bytes_minus_1(number):
1272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    # Calculate the minimum number of bytes minus 1 that are required to store
1282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    # the data.
1292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if number < 0:
1302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise ValueError('Invalid number: %d' % number)
1312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    elif number < 2 ** 8:
1322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return 0
1332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    elif number < 2 ** 16:
1342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return 1
1352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    elif number < 2 ** 24:
1362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return 2
1372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    elif number < 2 ** 32:
1382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return 3
1392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    else:
1402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise ValueError('Invalid number %d' % number)
1412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1432da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _encode_number(number):
1442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if number < 2 ** 8:
1452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return chr(number)
1462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    elif number < 2 ** 16:
1472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return struct.pack('!H', number)
1482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    elif number < 2 ** 24:
1492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return chr(number >> 16) + struct.pack('!H', number & 0xffff)
1502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    else:
1512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return struct.pack('!L', number)
1522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1542da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _create_control_block_length_value(channel_id, opcode, flags, value):
1552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """Creates a control block that consists of objective channel id, opcode,
1562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    flags, encoded length of opcode specific value, and the value.
1572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    Most of control blocks have this structure.
1582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    Args:
1602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        channel_id: objective channel id.
1612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        opcode: opcode of the control block.
1622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        flags: 3bit opcode specific flags.
1632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        value: opcode specific data.
1642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """
1652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if channel_id < 0 or channel_id > _MAX_CHANNEL_ID:
1672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise ValueError('Invalid channel id: %d' % channel_id)
1682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if (opcode != _MUX_OPCODE_ADD_CHANNEL_REQUEST and
1692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        opcode != _MUX_OPCODE_ADD_CHANNEL_RESPONSE and
1702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        opcode != _MUX_OPCODE_DROP_CHANNEL):
1712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise ValueError('Invalid opcode: %d' % opcode)
1722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if flags < 0 or flags > 7:
1732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise ValueError('Invalid flags: %x' % flags)
1742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    length = len(value)
1752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if length < 0 or length > 2 ** 32 - 1:
1762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise ValueError('Invalid length: %d' % length)
1772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    # The first byte consists of opcode, opcode specific flags, and size of
1792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    # the size of value in bytes minus 1.
1802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    bytes_of_length = _size_of_number_in_bytes_minus_1(length)
1812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    first_byte = (opcode << 5) | (flags << 2) | bytes_of_length
1822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    encoded_length = _encode_number(length)
1842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    return (chr(first_byte) + _encode_channel_id(channel_id) +
1862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            encoded_length + value)
1872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1892da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _create_add_channel_response(channel_id, encoded_handshake,
1902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                 encoding=0, rejected=False,
1912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                 outer_frame_mask=False):
1922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if encoding != 0 and encoding != 1:
1932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise ValueError('Invalid encoding %d' % encoding)
1942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    flags = (rejected << 2) | encoding
1962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    block = _create_control_block_length_value(
1972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        channel_id, _MUX_OPCODE_ADD_CHANNEL_RESPONSE, flags, encoded_handshake)
1982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    payload = _encode_channel_id(_CONTROL_CHANNEL_ID) + block
1992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    return create_binary_frame(payload, mask=outer_frame_mask)
2002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2022da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _create_drop_channel(channel_id, reason='', mux_error=False,
2032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                         outer_frame_mask=False):
2042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if not mux_error and len(reason) > 0:
2052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise ValueError('Reason must be empty if mux_error is False')
2062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    flags = mux_error << 2
2082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    block = _create_control_block_length_value(
2092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        channel_id, _MUX_OPCODE_DROP_CHANNEL, flags, reason)
2102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    payload = _encode_channel_id(_CONTROL_CHANNEL_ID) + block
2112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    return create_binary_frame(payload, mask=outer_frame_mask)
2122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2142da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _create_flow_control(channel_id, replenished_quota,
2152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                         outer_frame_mask=False):
2162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if replenished_quota < 0 or replenished_quota >= 2 ** 32:
2172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise ValueError('Invalid quota: %d' % replenished_quota)
2182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    first_byte = ((_MUX_OPCODE_FLOW_CONTROL << 5) |
2192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                  _size_of_number_in_bytes_minus_1(replenished_quota))
2202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    payload = (_encode_channel_id(_CONTROL_CHANNEL_ID) + chr(first_byte) +
2212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis               _encode_channel_id(channel_id) +
2222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis               _encode_number(replenished_quota))
2232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    return create_binary_frame(payload, mask=outer_frame_mask)
2242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2262da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _create_new_channel_slot(slots, send_quota, outer_frame_mask=False):
2272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if slots < 0 or slots >= 2 ** 32:
2282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise ValueError('Invalid number of slots: %d' % slots)
2292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if send_quota < 0 or send_quota >= 2 ** 32:
2302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise ValueError('Invalid send quota: %d' % send_quota)
2312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    slots_size = _size_of_number_in_bytes_minus_1(slots)
2322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    send_quota_size = _size_of_number_in_bytes_minus_1(send_quota)
2332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    first_byte = ((_MUX_OPCODE_NEW_CHANNEL_SLOT << 5) |
2352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                  (slots_size << 2) | send_quota_size)
2362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    payload = (_encode_channel_id(_CONTROL_CHANNEL_ID) + chr(first_byte) +
2372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis               _encode_number(slots) + _encode_number(send_quota))
2382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    return create_binary_frame(payload, mask=outer_frame_mask)
2392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2412da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _parse_request_text(request_text):
2422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    request_line, header_lines = request_text.split('\r\n', 1)
2432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    words = request_line.split(' ')
2452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if len(words) != 3:
2462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise ValueError('Bad Request-Line syntax %r' % request_line)
2472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    [command, path, version] = words
2482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if version != 'HTTP/1.1':
2492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise ValueError('Bad request version %r' % version)
2502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    # email.parser.Parser() parses RFC 2822 (RFC 822) style headers.
2522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    # RFC 6455 refers RFC 2616 for handshake parsing, and RFC 2616 refers
2532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    # RFC 822.
2542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    headers = email.parser.Parser().parsestr(header_lines)
2552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    return command, path, version, headers
2562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2582da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _ControlBlock(object):
2592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """A structure that holds parsing result of multiplexing control block.
2602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    Control block specific attributes will be added by _MuxFramePayloadParser.
2612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    (e.g. encoded_handshake will be added for AddChannelRequest and
2622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    AddChannelResponse)
2632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """
2642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def __init__(self, opcode):
2662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.opcode = opcode
2672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2692da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _MuxFramePayloadParser(object):
2702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """A class that parses multiplexed frame payload."""
2712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def __init__(self, payload):
2732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._data = payload
2742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._read_position = 0
2752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logger = util.get_class_logger(self)
2762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def read_channel_id(self):
2782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Reads channel id.
2792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Raises:
2812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            InvalidMuxFrameException: when the payload doesn't contain
2822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                valid channel id.
2832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
2842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        remaining_length = len(self._data) - self._read_position
2862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        pos = self._read_position
2872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if remaining_length == 0:
2882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise InvalidMuxFrameException('No channel id found')
2892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        channel_id = ord(self._data[pos])
2912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        channel_id_length = 1
2922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if channel_id & 0xe0 == 0xe0:
2932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if remaining_length < 4:
2942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                raise InvalidMuxFrameException(
2952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    'Invalid channel id format')
2962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            channel_id = struct.unpack('!L',
2972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                       self._data[pos:pos+4])[0] & 0x1fffffff
2982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            channel_id_length = 4
2992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        elif channel_id & 0xc0 == 0xc0:
3002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if remaining_length < 3:
3012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                raise InvalidMuxFrameException(
3022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    'Invalid channel id format')
3032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            channel_id = (((channel_id & 0x1f) << 16) +
3042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                          struct.unpack('!H', self._data[pos+1:pos+3])[0])
3052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            channel_id_length = 3
3062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        elif channel_id & 0x80 == 0x80:
3072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if remaining_length < 2:
3082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                raise InvalidMuxFrameException(
3092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    'Invalid channel id format')
3102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            channel_id = struct.unpack('!H',
3112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                       self._data[pos:pos+2])[0] & 0x3fff
3122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            channel_id_length = 2
3132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._read_position += channel_id_length
3142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return channel_id
3162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def read_inner_frame(self):
3182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Reads an inner frame.
3192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Raises:
3212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            InvalidMuxFrameException: when the inner frame is invalid.
3222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
3232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if len(self._data) == self._read_position:
3252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise InvalidMuxFrameException('No inner frame bits found')
3262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        bits = ord(self._data[self._read_position])
3272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._read_position += 1
3282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        fin = (bits & 0x80) == 0x80
3292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        rsv1 = (bits & 0x40) == 0x40
3302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        rsv2 = (bits & 0x20) == 0x20
3312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        rsv3 = (bits & 0x10) == 0x10
3322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        opcode = bits & 0xf
3332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        payload = self.remaining_data()
3342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # Consume rest of the message which is payload data of the original
3352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # frame.
3362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._read_position = len(self._data)
3372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return fin, rsv1, rsv2, rsv3, opcode, payload
3382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _read_number(self, size):
3402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if self._read_position + size > len(self._data):
3412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise InvalidMuxControlBlock(
3422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                'Cannot read %d byte(s) number' % size)
3432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        pos = self._read_position
3452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if size == 1:
3462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._read_position += 1
3472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            return ord(self._data[pos])
3482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        elif size == 2:
3492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._read_position += 2
3502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            return struct.unpack('!H', self._data[pos:pos+2])[0]
3512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        elif size == 3:
3522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._read_position += 3
3532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            return ((ord(self._data[pos]) << 16)
3542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    + struct.unpack('!H', self._data[pos+1:pos+3])[0])
3552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        elif size == 4:
3562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._read_position += 4
3572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            return struct.unpack('!L', self._data[pos:pos+4])[0]
3582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        else:
3592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise InvalidMuxControlBlockException(
3602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                'Cannot read %d byte(s) number' % size)
3612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _read_opcode_specific_data(self, opcode, size_of_size):
3632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Reads opcode specific data that consists of followings:
3642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            - the size of the opcode specific data (1-4 bytes)
3652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            - the opcode specific data
3662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        AddChannelRequest and DropChannel have this structure.
3672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
3682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if self._read_position + size_of_size > len(self._data):
3702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise InvalidMuxControlBlockException(
3712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                'No size field for opcode %d' % opcode)
3722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        size = self._read_number(size_of_size)
3742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        pos = self._read_position
3762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if pos + size > len(self._data):
3772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise InvalidMuxControlBlockException(
3782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                'No data field for opcode %d (%d + %d > %d)' %
3792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                (opcode, pos, size, len(self._data)))
3802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        specific_data = self._data[pos:pos+size]
3822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._read_position += size
3832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return specific_data
3842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _read_add_channel_request(self, first_byte, control_block):
3862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        reserved = (first_byte >> 4) & 0x1
3872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        encoding = (first_byte >> 2) & 0x3
3882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        size_of_handshake_size = (first_byte & 0x3) + 1
3892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        control_block.channel_id = self.read_channel_id()
3912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        encoded_handshake = self._read_opcode_specific_data(
3922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                _MUX_OPCODE_ADD_CHANNEL_REQUEST,
3932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                size_of_handshake_size)
3942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        control_block.encoding = encoding
3952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        control_block.encoded_handshake = encoded_handshake
3962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return control_block
3972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _read_flow_control(self, first_byte, control_block):
3992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        quota_size = (first_byte & 0x3) + 1
4002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        control_block.channel_id = self.read_channel_id()
4012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        control_block.send_quota = self._read_number(quota_size)
4022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return control_block
4032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _read_drop_channel(self, first_byte, control_block):
4052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        mux_error = (first_byte >> 4) & 0x1
4062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        reserved = (first_byte >> 2) & 0x3
4072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        size_of_reason_size = (first_byte & 0x3) + 1
4082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        control_block.channel_id = self.read_channel_id()
4102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        reason = self._read_opcode_specific_data(
4112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                     _MUX_OPCODE_ADD_CHANNEL_RESPONSE,
4122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                     size_of_reason_size)
4132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if mux_error and len(reason) > 0:
4142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise InvalidMuxControlBlockException(
4152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                'Reason must be empty when F bit is set')
4162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        control_block.mux_error = mux_error
4172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        control_block.reason = reason
4182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return control_block
4192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _read_new_channel_slot(self, first_byte, control_block):
4212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # TODO(bashi): Implement
4222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise MuxNotImplementedException('NewChannelSlot is not implemented')
4232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def read_control_blocks(self):
4252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Reads control block(s).
4262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Raises:
4282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis           InvalidMuxControlBlock: when the payload contains invalid control
4292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis               block(s).
4302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis           StopIteration: when no control blocks left.
4312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
4322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        while self._read_position < len(self._data):
4342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if self._read_position >= len(self._data):
4352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                raise InvalidMuxControlBlockException(
4362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    'No control opcode found')
4372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            first_byte = ord(self._data[self._read_position])
4382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._read_position += 1
4392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            opcode = (first_byte >> 5) & 0x7
4402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            control_block = _ControlBlock(opcode=opcode)
4412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if opcode == _MUX_OPCODE_ADD_CHANNEL_REQUEST:
4422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                yield self._read_add_channel_request(first_byte, control_block)
4432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            elif opcode == _MUX_OPCODE_FLOW_CONTROL:
4442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                yield self._read_flow_control(first_byte, control_block)
4452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            elif opcode == _MUX_OPCODE_DROP_CHANNEL:
4462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                yield self._read_drop_channel(first_byte, control_block)
4472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            elif opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT:
4482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                yield self._read_new_channel_slot(first_byte, control_block)
4492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            else:
4502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                raise InvalidMuxControlBlockException(
4512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    'Invalid opcode %d' % opcode)
4522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        assert self._read_position == len(self._data)
4532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise StopIteration
4542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def remaining_data(self):
4562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Returns remaining data."""
4572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return self._data[self._read_position:]
4592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4612da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _LogicalRequest(object):
4622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """Mimics mod_python request."""
4632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def __init__(self, channel_id, command, path, headers, connection):
4652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Constructs an instance.
4662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Args:
4682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            channel_id: the channel id of the logical channel.
4692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            command: HTTP request command.
4702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            path: HTTP request path.
4712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            headers: HTTP headers.
4722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            connection: _LogicalConnection instance.
4732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
4742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.channel_id = channel_id
4762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.method = command
4772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.uri = path
4782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.headers_in = headers
4792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.connection = connection
4802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.server_terminated = False
4812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.client_terminated = False
4822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def is_https(self):
4842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Mimics request.is_https(). Returns False because this method is
4852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        used only by old protocols (hixie and hybi00).
4862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
4872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return False
4892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4912da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _LogicalConnection(object):
4922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """Mimics mod_python mp_conn."""
4932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    # For details, see the comment of set_read_state().
4952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    STATE_ACTIVE = 1
4962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    STATE_GRACEFULLY_CLOSED = 2
4972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    STATE_TERMINATED = 3
4982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def __init__(self, mux_handler, channel_id):
5002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Constructs an instance.
5012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Args:
5032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            mux_handler: _MuxHandler instance.
5042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            channel_id: channel id of this connection.
5052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
5062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._mux_handler = mux_handler
5082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._channel_id = channel_id
5092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._incoming_data = ''
5102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._write_condition = threading.Condition()
5112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._waiting_write_completion = False
5122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._read_condition = threading.Condition()
5132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._read_state = self.STATE_ACTIVE
5142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def get_local_addr(self):
5162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Getter to mimic mp_conn.local_addr."""
5172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return self._mux_handler.physical_connection.get_local_addr()
5192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    local_addr = property(get_local_addr)
5202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def get_remote_addr(self):
5222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Getter to mimic mp_conn.remote_addr."""
5232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return self._mux_handler.physical_connection.get_remote_addr()
5252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    remote_addr = property(get_remote_addr)
5262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def get_memorized_lines(self):
5282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Gets memorized lines. Not supported."""
5292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise MuxUnexpectedException('_LogicalConnection does not support '
5312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                     'get_memorized_lines')
5322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def write(self, data):
5342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Writes data. mux_handler sends data asynchronously. The caller will
5352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        be suspended until write done.
5362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Args:
5382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            data: data to be written.
5392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Raises:
5412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            MuxUnexpectedException: when called before finishing the previous
5422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                write.
5432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
5442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
5462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._write_condition.acquire()
5472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if self._waiting_write_completion:
5482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                raise MuxUnexpectedException(
5492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    'Logical connection %d is already waiting the completion '
5502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    'of write' % self._channel_id)
5512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._waiting_write_completion = True
5532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._mux_handler.send_data(self._channel_id, data)
5542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._write_condition.wait()
5552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        finally:
5562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._write_condition.release()
5572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def write_control_data(self, data):
5592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Writes data via the control channel. Don't wait finishing write
5602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        because this method can be called by mux dispatcher.
5612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Args:
5632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            data: data to be written.
5642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
5652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._mux_handler.send_control_data(data)
5672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def notify_write_done(self):
5692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Called when sending data is completed."""
5702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
5722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._write_condition.acquire()
5732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if not self._waiting_write_completion:
5742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                raise MuxUnexpectedException(
5752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    'Invalid call of notify_write_done for logical connection'
5762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    ' %d' % self._channel_id)
5772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._waiting_write_completion = False
5782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._write_condition.notify()
5792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        finally:
5802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._write_condition.release()
5812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def append_frame_data(self, frame_data):
5832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Appends incoming frame data. Called when mux_handler dispatches
5842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        frame data to the corresponding application.
5852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Args:
5872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            frame_data: incoming frame data.
5882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
5892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._read_condition.acquire()
5912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._incoming_data += frame_data
5922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._read_condition.notify()
5932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._read_condition.release()
5942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def read(self, length):
5962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Reads data. Blocks until enough data has arrived via physical
5972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        connection.
5982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Args:
6002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            length: length of data to be read.
6012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Raises:
6022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            LogicalConnectionClosedException: when closing handshake for this
6032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                logical channel has been received.
6042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            ConnectionTerminatedException: when the physical connection has
6052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                closed, or an error is caused on the reader thread.
6062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
6072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._read_condition.acquire()
6092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        while (self._read_state == self.STATE_ACTIVE and
6102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis               len(self._incoming_data) < length):
6112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._read_condition.wait()
6122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
6142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if self._read_state == self.STATE_GRACEFULLY_CLOSED:
6152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                raise LogicalConnectionClosedException(
6162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    'Logical channel %d has closed.' % self._channel_id)
6172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            elif self._read_state == self.STATE_TERMINATED:
6182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                raise ConnectionTerminatedException(
6192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    'Receiving %d byte failed. Logical channel (%d) closed' %
6202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    (length, self._channel_id))
6212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            value = self._incoming_data[:length]
6232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._incoming_data = self._incoming_data[length:]
6242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        finally:
6252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._read_condition.release()
6262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return value
6282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def set_read_state(self, new_state):
6302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Sets the state of this connection. Called when an event for this
6312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        connection has occurred.
6322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Args:
6342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            new_state: state to be set. new_state must be one of followings:
6352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            - STATE_GRACEFULLY_CLOSED: when closing handshake for this
6362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                connection has been received.
6372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            - STATE_TERMINATED: when the physical connection has closed or
6382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                DropChannel of this connection has received.
6392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
6402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._read_condition.acquire()
6422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._read_state = new_state
6432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._read_condition.notify()
6442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._read_condition.release()
6452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6472da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _LogicalStream(Stream):
6482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """Mimics the Stream class. This class interprets multiplexed WebSocket
6492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    frames.
6502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """
6512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def __init__(self, request, send_quota, receive_quota):
6532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Constructs an instance.
6542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Args:
6562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            request: _LogicalRequest instance.
6572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            send_quota: Initial send quota.
6582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            receive_quota: Initial receive quota.
6592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
6602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # TODO(bashi): Support frame filters.
6622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        stream_options = StreamOptions()
6632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # Physical stream is responsible for masking.
6642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        stream_options.unmask_receive = False
6652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # Control frames can be fragmented on logical channel.
6662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        stream_options.allow_fragmented_control_frame = True
6672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Stream.__init__(self, request, stream_options)
6682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._send_quota = send_quota
6692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._send_quota_condition = threading.Condition()
6702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._receive_quota = receive_quota
6712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._write_inner_frame_semaphore = threading.Semaphore()
6722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _create_inner_frame(self, opcode, payload, end=True):
6742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # TODO(bashi): Support extensions that use reserved bits.
6752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        first_byte = (end << 7) | opcode
6762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return (_encode_channel_id(self._request.channel_id) +
6772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                chr(first_byte) + payload)
6782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _write_inner_frame(self, opcode, payload, end=True):
6802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        payload_length = len(payload)
6812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        write_position = 0
6822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
6842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            # An inner frame will be fragmented if there is no enough send
6852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            # quota. This semaphore ensures that fragmented inner frames are
6862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            # sent in order on the logical channel.
6872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            # Note that frames that come from other logical channels or
6882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            # multiplexing control blocks can be inserted between fragmented
6892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            # inner frames on the physical channel.
6902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._write_inner_frame_semaphore.acquire()
6912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            while write_position < payload_length:
6922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                try:
6932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    self._send_quota_condition.acquire()
6942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    while self._send_quota == 0:
6952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                        self._logger.debug(
6962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                            'No quota. Waiting FlowControl message for %d.' %
6972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                            self._request.channel_id)
6982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                        self._send_quota_condition.wait()
6992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    remaining = payload_length - write_position
7012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    write_length = min(self._send_quota, remaining)
7022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    inner_frame_end = (
7032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                        end and
7042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                        (write_position + write_length == payload_length))
7052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    inner_frame = self._create_inner_frame(
7072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                        opcode,
7082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                        payload[write_position:write_position+write_length],
7092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                        inner_frame_end)
7102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    frame_data = self._writer.build(
7112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                        inner_frame, end=True, binary=True)
7122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    self._send_quota -= write_length
7132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    self._logger.debug('Consumed quota=%d, remaining=%d' %
7142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                       (write_length, self._send_quota))
7152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                finally:
7162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    self._send_quota_condition.release()
7172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                # Writing data will block the worker so we need to release
7192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                # _send_quota_condition before writing.
7202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._logger.debug('Sending inner frame: %r' % frame_data)
7212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._request.connection.write(frame_data)
7222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                write_position += write_length
7232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                opcode = common.OPCODE_CONTINUATION
7252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        except ValueError, e:
7272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise BadOperationException(e)
7282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        finally:
7292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._write_inner_frame_semaphore.release()
7302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def replenish_send_quota(self, send_quota):
7322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Replenish send quota."""
7332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._send_quota_condition.acquire()
7352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._send_quota += send_quota
7362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logger.debug('Replenished send quota for channel id %d: %d' %
7372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                           (self._request.channel_id, self._send_quota))
7382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._send_quota_condition.notify()
7392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._send_quota_condition.release()
7402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def consume_receive_quota(self, amount):
7422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Consumes receive quota. Returns False on failure."""
7432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if self._receive_quota < amount:
7452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logger.debug('Violate quota on channel id %d: %d < %d' %
7462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                               (self._request.channel_id,
7472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                self._receive_quota, amount))
7482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            return False
7492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._receive_quota -= amount
7502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return True
7512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def send_message(self, message, end=True, binary=False):
7532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Override Stream.send_message."""
7542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if self._request.server_terminated:
7562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise BadOperationException(
7572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                'Requested send_message after sending out a closing handshake')
7582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if binary and isinstance(message, unicode):
7602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise BadOperationException(
7612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                'Message for binary frame must be instance of str')
7622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if binary:
7642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            opcode = common.OPCODE_BINARY
7652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        else:
7662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            opcode = common.OPCODE_TEXT
7672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            message = message.encode('utf-8')
7682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._write_inner_frame(opcode, message, end)
7702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _receive_frame(self):
7722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Overrides Stream._receive_frame.
7732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        In addition to call Stream._receive_frame, this method adds the amount
7752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        of payload to receiving quota and sends FlowControl to the client.
7762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        We need to do it here because Stream.receive_message() handles
7772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        control frames internally.
7782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
7792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        opcode, payload, fin, rsv1, rsv2, rsv3 = Stream._receive_frame(self)
7812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        amount = len(payload)
7822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._receive_quota += amount
7832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        frame_data = _create_flow_control(self._request.channel_id,
7842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                          amount)
7852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logger.debug('Sending flow control for %d, replenished=%d' %
7862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                           (self._request.channel_id, amount))
7872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._request.connection.write_control_data(frame_data)
7882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return opcode, payload, fin, rsv1, rsv2, rsv3
7892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def receive_message(self):
7912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Overrides Stream.receive_message."""
7922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # Just call Stream.receive_message(), but catch
7942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # LogicalConnectionClosedException, which is raised when the logical
7952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # connection has closed gracefully.
7962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
7972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            return Stream.receive_message(self)
7982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        except LogicalConnectionClosedException, e:
7992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logger.debug('%s', e)
8002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            return None
8012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
8022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _send_closing_handshake(self, code, reason):
8032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Overrides Stream._send_closing_handshake."""
8042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
8052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        body = create_closing_handshake_body(code, reason)
8062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logger.debug('Sending closing handshake for %d: (%r, %r)' %
8072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                           (self._request.channel_id, code, reason))
8082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._write_inner_frame(common.OPCODE_CLOSE, body, end=True)
8092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
8102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._request.server_terminated = True
8112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
8122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def send_ping(self, body=''):
8132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Overrides Stream.send_ping"""
8142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
8152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logger.debug('Sending ping on logical channel %d: %r' %
8162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                           (self._request.channel_id, body))
8172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._write_inner_frame(common.OPCODE_PING, body, end=True)
8182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
8192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._ping_queue.append(body)
8202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
8212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _send_pong(self, body):
8222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Overrides Stream._send_pong"""
8232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
8242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logger.debug('Sending pong on logical channel %d: %r' %
8252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                           (self._request.channel_id, body))
8262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._write_inner_frame(common.OPCODE_PONG, body, end=True)
8272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
8282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def close_connection(self, code=common.STATUS_NORMAL_CLOSURE, reason=''):
8292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Overrides Stream.close_connection."""
8302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
8312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # TODO(bashi): Implement
8322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logger.debug('Closing logical connection %d' %
8332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                           self._request.channel_id)
8342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._request.server_terminated = True
8352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
8362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _drain_received_data(self):
8372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Overrides Stream._drain_received_data. Nothing need to be done for
8382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        logical channel.
8392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
8402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
8412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        pass
8422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
8432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
8442da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _OutgoingData(object):
8452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """A structure that holds data to be sent via physical connection and
8462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    origin of the data.
8472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """
8482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
8492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def __init__(self, channel_id, data):
8502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.channel_id = channel_id
8512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.data = data
8522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
8532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
8542da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _PhysicalConnectionWriter(threading.Thread):
8552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """A thread that is responsible for writing data to physical connection.
8562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
8572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    TODO(bashi): Make sure there is no thread-safety problem when the reader
8582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    thread reads data from the same socket at a time.
8592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """
8602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
8612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def __init__(self, mux_handler):
8622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Constructs an instance.
8632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
8642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Args:
8652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            mux_handler: _MuxHandler instance.
8662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
8672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
8682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        threading.Thread.__init__(self)
8692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logger = util.get_class_logger(self)
8702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._mux_handler = mux_handler
8712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.setDaemon(True)
8722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._stop_requested = False
8732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._deque = collections.deque()
8742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._deque_condition = threading.Condition()
8752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
8762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def put_outgoing_data(self, data):
8772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Puts outgoing data.
8782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
8792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Args:
8802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            data: _OutgoingData instance.
8812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
8822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Raises:
8832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            BadOperationException: when the thread has been requested to
8842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                terminate.
8852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
8862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
8872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
8882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._deque_condition.acquire()
8892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if self._stop_requested:
8902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                raise BadOperationException('Cannot write data anymore')
8912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
8922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._deque.append(data)
8932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._deque_condition.notify()
8942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        finally:
8952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._deque_condition.release()
8962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
8972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _write_data(self, outgoing_data):
8982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
8992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._mux_handler.physical_connection.write(outgoing_data.data)
9002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        except Exception, e:
9012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            util.prepend_message_to_exception(
9022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                'Failed to send message to %r: ' %
9032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                (self._mux_handler.physical_connection.remote_addr,), e)
9042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise
9052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
9062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # TODO(bashi): It would be better to block the thread that sends
9072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # control data as well.
9082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if outgoing_data.channel_id != _CONTROL_CHANNEL_ID:
9092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._mux_handler.notify_write_done(outgoing_data.channel_id)
9102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
9112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def run(self):
9122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._deque_condition.acquire()
9132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        while not self._stop_requested:
9142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if len(self._deque) == 0:
9152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._deque_condition.wait()
9162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                continue
9172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
9182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            outgoing_data = self._deque.popleft()
9192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._deque_condition.release()
9202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._write_data(outgoing_data)
9212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._deque_condition.acquire()
9222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
9232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # Flush deque
9242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
9252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            while len(self._deque) > 0:
9262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                outgoing_data = self._deque.popleft()
9272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._write_data(outgoing_data)
9282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        finally:
9292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._deque_condition.release()
9302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
9312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def stop(self):
9322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Stops the writer thread."""
9332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
9342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._deque_condition.acquire()
9352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._stop_requested = True
9362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._deque_condition.notify()
9372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._deque_condition.release()
9382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
9392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
9402da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _PhysicalConnectionReader(threading.Thread):
9412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """A thread that is responsible for reading data from physical connection.
9422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """
9432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
9442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def __init__(self, mux_handler):
9452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Constructs an instance.
9462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
9472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Args:
9482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            mux_handler: _MuxHandler instance.
9492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
9502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
9512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        threading.Thread.__init__(self)
9522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logger = util.get_class_logger(self)
9532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._mux_handler = mux_handler
9542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.setDaemon(True)
9552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
9562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def run(self):
9572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        while True:
9582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            try:
9592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                physical_stream = self._mux_handler.physical_stream
9602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                message = physical_stream.receive_message()
9612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                if message is None:
9622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    break
9632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                opcode = physical_stream.get_last_received_opcode()
9642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                if opcode == common.OPCODE_TEXT:
9652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    raise MuxUnexpectedException(
9662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                        'Received a text message on physical connection')
9672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            except ConnectionTerminatedException, e:
9682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._logger.debug('%s', e)
9692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                break
9702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
9712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            try:
9722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._mux_handler.dispatch_message(message)
9732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            except Exception, e:
9742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._logger.debug(traceback.format_exc())
9752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                break
9762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
9772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._mux_handler.notify_reader_done()
9782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
9792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
9802da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _Worker(threading.Thread):
9812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """A thread that is responsible for running the corresponding application
9822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    handler.
9832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """
9842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
9852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def __init__(self, mux_handler, request):
9862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Constructs an instance.
9872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
9882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Args:
9892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            mux_handler: _MuxHandler instance.
9902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            request: _LogicalRequest instance.
9912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
9922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
9932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        threading.Thread.__init__(self)
9942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logger = util.get_class_logger(self)
9952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._mux_handler = mux_handler
9962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._request = request
9972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.setDaemon(True)
9982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
9992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def run(self):
10002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logger.debug('Logical channel worker started. (id=%d)' %
10012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                           self._request.channel_id)
10022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
10032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            # Non-critical exceptions will be handled by dispatcher.
10042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._mux_handler.dispatcher.transfer_data(self._request)
10052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        finally:
10062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._mux_handler.notify_worker_done(self._request.channel_id)
10072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
10082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
10092da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _MuxHandshaker(hybi.Handshaker):
10102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """Opening handshake processor for multiplexing."""
10112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
10122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def __init__(self, request, dispatcher, send_quota, receive_quota):
10132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Constructs an instance.
10142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Args:
10152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            request: _LogicalRequest instance.
10162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            dispatcher: Dispatcher instance (dispatch.Dispatcher).
10172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            send_quota: Initial send quota.
10182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            receive_quota: Initial receive quota.
10192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
10202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
10212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        hybi.Handshaker.__init__(self, request, dispatcher)
10222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._send_quota = send_quota
10232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._receive_quota = receive_quota
10242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
10252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _create_stream(self, stream_options):
10262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Override hybi.Handshaker._create_stream."""
10272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
10282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logger.debug('Creating logical stream for %d' %
10292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                           self._request.channel_id)
10302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return _LogicalStream(self._request, self._send_quota,
10312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                              self._receive_quota)
10322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
10332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _send_handshake(self, accept):
10342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Override hybi.Handshaker._send_handshake."""
10352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
10362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # Don't send handshake response for the default channel
10372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if self._request.channel_id == _DEFAULT_CHANNEL_ID:
10382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            return
10392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
10402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        handshake_response = self._create_handshake_response(accept)
10412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        frame_data = _create_add_channel_response(
10422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                         self._request.channel_id,
10432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                         handshake_response)
10442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logger.debug('Sending handshake response for %d: %r' %
10452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                           (self._request.channel_id, frame_data))
10462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._request.connection.write_control_data(frame_data)
10472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
10482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
10492da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _LogicalChannelData(object):
10502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """A structure that holds information about logical channel.
10512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """
10522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
10532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def __init__(self, request, worker):
10542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.request = request
10552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.worker = worker
10562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.mux_error_occurred = False
10572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.mux_error_reason = ''
10582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
10592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
10602da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _MuxHandler(object):
10612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """Multiplexing handler. When a handler starts, it launches three
10622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    threads; the reader thread, the writer thread, and a worker thread.
10632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
10642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    The reader thread reads data from the physical stream, i.e., the
10652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    ws_stream object of the underlying websocket connection. The reader
10662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    thread interprets multiplexed frames and dispatches them to logical
10672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    channels. Methods of this class are mostly called by the reader thread.
10682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
10692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    The writer thread sends multiplexed frames which are created by
10702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    logical channels via the physical connection.
10712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
10722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    The worker thread launched at the starting point handles the
10732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    "Implicitly Opened Connection". If multiplexing handler receives
10742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    an AddChannelRequest and accepts it, the handler will launch a new worker
10752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    thread and dispatch the request to it.
10762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """
10772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
10782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def __init__(self, request, dispatcher):
10792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Constructs an instance.
10802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
10812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Args:
10822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            request: mod_python request of the physical connection.
10832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            dispatcher: Dispatcher instance (dispatch.Dispatcher).
10842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
10852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
10862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.original_request = request
10872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.dispatcher = dispatcher
10882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.physical_connection = request.connection
10892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.physical_stream = request.ws_stream
10902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logger = util.get_class_logger(self)
10912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logical_channels = {}
10922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logical_channels_condition = threading.Condition()
10932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # Holds client's initial quota
10942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._channel_slots = collections.deque()
10952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._worker_done_notify_received = False
10962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._reader = None
10972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._writer = None
10982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
10992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def start(self):
11002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Starts the handler.
11012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
11022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Raises:
11032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            MuxUnexpectedException: when the handler already started, or when
11042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                opening handshake of the default channel fails.
11052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
11062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
11072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if self._reader or self._writer:
11082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise MuxUnexpectedException('MuxHandler already started')
11092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
11102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._reader = _PhysicalConnectionReader(self)
11112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._writer = _PhysicalConnectionWriter(self)
11122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._reader.start()
11132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._writer.start()
11142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
11152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # Create "Implicitly Opened Connection".
11162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        logical_connection = _LogicalConnection(self, _DEFAULT_CHANNEL_ID)
11172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        headers_in = copy.copy(self.original_request.headers_in)
11182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # TODO(bashi): Support extensions
11192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        headers_in['Sec-WebSocket-Extensions'] = ''
11202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        logical_request = _LogicalRequest(_DEFAULT_CHANNEL_ID,
11212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                          self.original_request.method,
11222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                          self.original_request.uri,
11232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                          headers_in,
11242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                          logical_connection)
11252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # Client's send quota for the implicitly opened connection is zero,
11262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # but we will send FlowControl later so set the initial quota to
11272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # _INITIAL_QUOTA_FOR_CLIENT.
11282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._channel_slots.append(_INITIAL_QUOTA_FOR_CLIENT)
11292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if not self._do_handshake_for_logical_request(
11302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            logical_request, send_quota=self.original_request.mux_quota):
11312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise MuxUnexpectedException(
11322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                'Failed handshake on the default channel id')
11332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._add_logical_channel(logical_request)
11342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
11352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # Send FlowControl for the implicitly opened connection.
11362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        frame_data = _create_flow_control(_DEFAULT_CHANNEL_ID,
11372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                          _INITIAL_QUOTA_FOR_CLIENT)
11382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        logical_request.connection.write_control_data(frame_data)
11392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
11402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def add_channel_slots(self, slots, send_quota):
11412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Adds channel slots.
11422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
11432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Args:
11442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            slots: number of slots to be added.
11452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            send_quota: initial send quota for slots.
11462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
11472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
11482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._channel_slots.extend([send_quota] * slots)
11492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # Send NewChannelSlot to client.
11502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        frame_data = _create_new_channel_slot(slots, send_quota)
11512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.send_control_data(frame_data)
11522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
11532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def wait_until_done(self, timeout=None):
11542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Waits until all workers are done. Returns False when timeout has
11552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        occurred. Returns True on success.
11562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
11572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Args:
11582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            timeout: timeout in sec.
11592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
11602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
11612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logical_channels_condition.acquire()
11622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
11632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            while len(self._logical_channels) > 0:
11642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._logger.debug('Waiting workers(%d)...' %
11652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                   len(self._logical_channels))
11662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._worker_done_notify_received = False
11672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._logical_channels_condition.wait(timeout)
11682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                if not self._worker_done_notify_received:
11692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    self._logger.debug('Waiting worker(s) timed out')
11702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    return False
11712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
11722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        finally:
11732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logical_channels_condition.release()
11742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
11752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # Flush pending outgoing data
11762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._writer.stop()
11772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._writer.join()
11782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
11792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return True
11802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
11812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def notify_write_done(self, channel_id):
11822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Called by the writer thread when a write operation has done.
11832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
11842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Args:
11852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            channel_id: objective channel id.
11862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
11872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
11882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
11892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logical_channels_condition.acquire()
11902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if channel_id in self._logical_channels:
11912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                channel_data = self._logical_channels[channel_id]
11922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                channel_data.request.connection.notify_write_done()
11932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            else:
11942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._logger.debug('Seems that logical channel for %d has gone'
11952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                   % channel_id)
11962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        finally:
11972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logical_channels_condition.release()
11982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
11992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def send_control_data(self, data):
12002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Sends data via the control channel.
12012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
12022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Args:
12032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            data: data to be sent.
12042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
12052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
12062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._writer.put_outgoing_data(_OutgoingData(
12072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                channel_id=_CONTROL_CHANNEL_ID, data=data))
12082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
12092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def send_data(self, channel_id, data):
12102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Sends data via given logical channel. This method is called by
12112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        worker threads.
12122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
12132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Args:
12142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            channel_id: objective channel id.
12152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            data: data to be sent.
12162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
12172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
12182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._writer.put_outgoing_data(_OutgoingData(
12192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                channel_id=channel_id, data=data))
12202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
12212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _send_drop_channel(self, channel_id, reason='', mux_error=False):
12222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        frame_data = _create_drop_channel(channel_id, reason, mux_error)
12232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logger.debug(
12242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            'Sending drop channel for channel id %d' % channel_id)
12252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.send_control_data(frame_data)
12262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
12272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _send_error_add_channel_response(self, channel_id, status=None):
12282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if status is None:
12292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            status = common.HTTP_STATUS_BAD_REQUEST
12302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
12312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if status in _HTTP_BAD_RESPONSE_MESSAGES:
12322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            message = _HTTP_BAD_RESPONSE_MESSAGES[status]
12332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        else:
12342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logger.debug('Response message for %d is not found' % status)
12352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            message = '???'
12362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
12372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        response = 'HTTP/1.1 %d %s\r\n\r\n' % (status, message)
12382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        frame_data = _create_add_channel_response(channel_id,
12392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                                  encoded_handshake=response,
12402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                                  encoding=0, rejected=True)
12412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.send_control_data(frame_data)
12422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
12432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _create_logical_request(self, block):
12442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if block.channel_id == _CONTROL_CHANNEL_ID:
12452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise MuxUnexpectedException(
12462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                'Received the control channel id (0) as objective channel '
12472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                'id for AddChannel')
12482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
12492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if block.encoding != 0:
12502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise MuxNotImplementedException(
12512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                'delta-encoding not supported yet')
12522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        connection = _LogicalConnection(self, block.channel_id)
12532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        command, path, version, headers = _parse_request_text(
12542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                              block.encoded_handshake)
12552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        request = _LogicalRequest(block.channel_id, command, path,
12562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                  headers, connection)
12572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
12582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return request
12592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
12602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _do_handshake_for_logical_request(self, request, send_quota=0):
12612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
12622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            receive_quota = self._channel_slots.popleft()
12632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        except IndexError:
12642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise MuxUnexpectedException('No room in channel pool')
12652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
12662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        handshaker = _MuxHandshaker(request, self.dispatcher,
12672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                    send_quota, receive_quota)
12682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
12692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            handshaker.do_handshake()
12702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        except handshake.VersionException, e:
12712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logger.info('%s', e)
12722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._send_error_add_channel_response(
12732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                block.channel_id, status=common.HTTP_STATUS_BAD_REQUEST)
12742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            return False
12752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        except handshake.HandshakeException, e:
12762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logger.info('%s', e)
12772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._send_error_add_channel_response(request.channel_id,
12782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                                  status=e.status)
12792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            return False
12802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        except handshake.AbortedByUserException, e:
12812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logger.info('%s', e)
12822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._send_error_add_channel_response(request.channel_id)
12832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            return False
12842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
12852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return True
12862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
12872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _add_logical_channel(self, logical_request):
12882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
12892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logical_channels_condition.acquire()
12902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if logical_request.channel_id in self._logical_channels:
12912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                raise MuxUnexpectedException('Channel id %d already exists' %
12922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                             logical_request.channel_id)
12932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            worker = _Worker(self, logical_request)
12942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            channel_data = _LogicalChannelData(logical_request, worker)
12952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logical_channels[logical_request.channel_id] = channel_data
12962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            worker.start()
12972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        finally:
12982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logical_channels_condition.release()
12992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
13002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _process_add_channel_request(self, block):
13012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
13022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            logical_request = self._create_logical_request(block)
13032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        except ValueError, e:
13042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logger.debug('Failed to create logical request: %r' % e)
13052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._send_error_add_channel_response(
13062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                block.channel_id, status=common.HTTP_STATUS_BAD_REQUEST)
13072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            return
13082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if self._do_handshake_for_logical_request(logical_request):
13092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._add_logical_channel(logical_request)
13102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        else:
13112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._send_error_add_channel_response(
13122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                block.channel_id, status=common.HTTP_STATUS_BAD_REQUEST)
13132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
13142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _process_flow_control(self, block):
13152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
13162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logical_channels_condition.acquire()
13172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if not block.channel_id in self._logical_channels:
13182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                return
13192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            channel_data = self._logical_channels[block.channel_id]
13202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            channel_data.request.ws_stream.replenish_send_quota(
13212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                block.send_quota)
13222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        finally:
13232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logical_channels_condition.release()
13242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
13252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _process_drop_channel(self, block):
13262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logger.debug('DropChannel received for %d: reason=%r' %
13272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                           (block.channel_id, block.reason))
13282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
13292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logical_channels_condition.acquire()
13302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if not block.channel_id in self._logical_channels:
13312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                return
13322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            channel_data = self._logical_channels[block.channel_id]
13332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if not block.mux_error:
13342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                channel_data.request.connection.set_read_state(
13352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    _LogicalConnection.STATE_TERMINATED)
13362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            else:
13372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                # TODO(bashi): What should we do?
13382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                channel_data.request.connection.set_read_state(
13392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    _LogicalConnection.STATE_TERMINATED)
13402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        finally:
13412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logical_channels_condition.release()
13422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
13432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _process_new_channel_slot(self, block):
13442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise MuxUnexpectedException('Client should not send NewChannelSlot')
13452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
13462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _process_control_blocks(self, parser):
13472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        for control_block in parser.read_control_blocks():
13482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            opcode = control_block.opcode
13492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logger.debug('control block received, opcode: %d' % opcode)
13502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if opcode == _MUX_OPCODE_ADD_CHANNEL_REQUEST:
13512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._process_add_channel_request(control_block)
13522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            elif opcode == _MUX_OPCODE_FLOW_CONTROL:
13532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._process_flow_control(control_block)
13542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            elif opcode == _MUX_OPCODE_DROP_CHANNEL:
13552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._process_drop_channel(control_block)
13562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            elif opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT:
13572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._process_new_channel_slot(control_block)
13582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            else:
13592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                raise InvalidMuxControlBlockException(
13602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    'Invalid opcode')
13612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
13622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _process_logical_frame(self, channel_id, parser):
13632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logger.debug('Received a frame. channel id=%d' % channel_id)
13642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
13652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logical_channels_condition.acquire()
13662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if not channel_id in self._logical_channels:
13672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                raise MuxUnexpectedException(
13682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    'Channel id %d not found' % channel_id)
13692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            channel_data = self._logical_channels[channel_id]
13702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            fin, rsv1, rsv2, rsv3, opcode, payload = parser.read_inner_frame()
13712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if not channel_data.request.ws_stream.consume_receive_quota(
13722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                len(payload)):
13732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                # The client violates quota. Close logical channel.
13742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                channel_data.mux_error_occurred = True
13752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                channel_data.mux_error_reason = 'Quota violation'
13762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                channel_data.request.connection.set_read_state(
13772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    _LogicalConnection.STATE_TERMINATED)
13782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                return
13792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            header = create_header(opcode, len(payload), fin, rsv1, rsv2, rsv3,
13802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                   mask=False)
13812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            frame_data = header + payload
13822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            channel_data.request.connection.append_frame_data(frame_data)
13832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        finally:
13842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logical_channels_condition.release()
13852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
13862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def dispatch_message(self, message):
13872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Dispatches message. The reader thread calls this method.
13882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
13892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Args:
13902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            message: a message that contains encapsulated frame.
13912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Raises:
13922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            InvalidMuxFrameException: if the message is invalid.
13932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
13942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
13952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        parser = _MuxFramePayloadParser(message)
13962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        channel_id = parser.read_channel_id()
13972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if channel_id == _CONTROL_CHANNEL_ID:
13982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._process_control_blocks(parser)
13992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        else:
14002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._process_logical_frame(channel_id, parser)
14012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
14022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def notify_worker_done(self, channel_id):
14032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """Called when a worker has finished.
14042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
14052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        Args:
14062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            channel_id: channel id corresponded with the worker.
14072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
14082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
14092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logger.debug('Worker for channel id %d terminated' % channel_id)
14102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
14112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logical_channels_condition.acquire()
14122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if not channel_id in self._logical_channels:
14132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                raise MuxUnexpectedException(
14142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    'Channel id %d not found' % channel_id)
14152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            channel_data = self._logical_channels.pop(channel_id)
14162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        finally:
14172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._worker_done_notify_received = True
14182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logical_channels_condition.notify()
14192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logical_channels_condition.release()
14202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
14212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if not channel_data.request.server_terminated:
14222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if channel_data.mux_error_occurred:
14232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._send_drop_channel(
14242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    channel_id, reason=channel_data.mux_error_reason,
14252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    mux_error=True)
14262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            else:
14272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._send_drop_channel(channel_id)
14282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
14292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def notify_reader_done(self):
14302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """This method is called by the reader thread when the reader has
14312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        finished.
14322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        """
14332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
14342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # Terminate all logical connections
14352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logger.debug('termiating all logical connections...')
14362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logical_channels_condition.acquire()
14372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        for channel_data in self._logical_channels.values():
14382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            try:
14392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                channel_data.request.connection.set_read_state(
14402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    _LogicalConnection.STATE_TERMINATED)
14412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            except Exception:
14422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                pass
14432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logical_channels_condition.release()
14442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
14452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
14462da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef use_mux(request):
14472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    return hasattr(request, 'mux') and request.mux
14482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
14492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
14502da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef start(request, dispatcher):
14512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    mux_handler = _MuxHandler(request, dispatcher)
14522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    mux_handler.start()
14532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
14542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    mux_handler.add_channel_slots(_INITIAL_NUMBER_OF_CHANNEL_SLOTS,
14552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                  _INITIAL_QUOTA_FOR_CLIENT)
14562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
14572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    mux_handler.wait_until_done()
14582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
14592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
14602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# vi:sts=4 sw=4 et
1461