12da489cd246702bee5938545b18a6f710ed214bcJamie Gennis#!/usr/bin/env python
22da489cd246702bee5938545b18a6f710ed214bcJamie Gennis#
32da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# Copyright 2012, Google Inc.
42da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# All rights reserved.
52da489cd246702bee5938545b18a6f710ed214bcJamie Gennis#
62da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# Redistribution and use in source and binary forms, with or without
72da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# modification, are permitted provided that the following conditions are
82da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# met:
92da489cd246702bee5938545b18a6f710ed214bcJamie Gennis#
102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis#     * Redistributions of source code must retain the above copyright
112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# notice, this list of conditions and the following disclaimer.
122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis#     * Redistributions in binary form must reproduce the above
132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# copyright notice, this list of conditions and the following disclaimer
142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# in the documentation and/or other materials provided with the
152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# distribution.
162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis#     * Neither the name of Google Inc. nor the names of its
172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# contributors may be used to endorse or promote products derived from
182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# this software without specific prior written permission.
192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis#
202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis"""WebSocket client utility for testing mux extension.
342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
352da489cd246702bee5938545b18a6f710ed214bcJamie GennisThis code should be independent from mod_pywebsocket. See the comment of
362da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclient_for_testing.py.
372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
382da489cd246702bee5938545b18a6f710ed214bcJamie GennisNOTE: This code is far from robust like client_for_testing.py.
392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis"""
402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
432da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport Queue
442da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport base64
452da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport collections
462da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport email
472da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport email.parser
482da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport logging
492da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport math
502da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport os
512da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport random
522da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport socket
532da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport struct
542da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport threading
552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
562da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket import util
572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
582da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom test import client_for_testing
592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_CONTROL_CHANNEL_ID = 0
622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_DEFAULT_CHANNEL_ID = 1
632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_MUX_OPCODE_ADD_CHANNEL_REQUEST = 0
652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_MUX_OPCODE_ADD_CHANNEL_RESPONSE = 1
662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_MUX_OPCODE_FLOW_CONTROL = 2
672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_MUX_OPCODE_DROP_CHANNEL = 3
682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_MUX_OPCODE_NEW_CHANNEL_SLOT = 4
692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
712da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _ControlBlock:
722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def __init__(self, opcode):
732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.opcode = opcode
742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
762da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _parse_handshake_response(response):
772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    status_line, header_lines = response.split('\r\n', 1)
782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    words = status_line.split(' ')
802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if len(words) < 3:
812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise ValueError('Bad Status-Line syntax %r' % status_line)
822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    [version, response_code] = words[:2]
832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if version != 'HTTP/1.1':
842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise ValueError('Bad response version %r' % version)
852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if response_code != '101':
872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise ValueError('Bad response code %r ' % response_code)
882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    headers = email.parser.Parser().parsestr(header_lines)
892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    return headers
902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
922da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _parse_channel_id(data, offset=0):
932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    length = len(data)
942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    remaining = length - offset
952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if remaining <= 0:
972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise Exception('No channel id found')
982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    channel_id = ord(data[offset])
1002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    channel_id_length = 1
1012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if channel_id & 0xe0 == 0xe0:
1022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if remaining < 4:
1032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise Exception('Invalid channel id format')
1042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        channel_id = struct.unpack('!L',
1052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                   data[offset:offset+4])[0] & 0x1fffffff
1062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        channel_id_length = 4
1072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    elif channel_id & 0xc0 == 0xc0:
1082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if remaining < 3:
1092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise Exception('Invalid channel id format')
1102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        channel_id = (((channel_id & 0x1f) << 16) +
1112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                      struct.unpack('!H', data[offset+1:offset+3])[0])
1122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        channel_id_length = 3
1132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    elif channel_id & 0x80 == 0x80:
1142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if remaining < 2:
1152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise Exception('Invalid channel id format')
1162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        channel_id = struct.unpack('!H', data[offset:offset+2])[0] & 0x3fff
1172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        channel_id_length = 2
1182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    return channel_id, channel_id_length
1202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1222da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _read_number(data, size_of_size, offset=0):
1232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if size_of_size == 1:
1242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return ord(data[offset])
1252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    elif size_of_size == 2:
1262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return struct.unpack('!H', data[offset:offset+2])[0]
1272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    elif size_of_size == 3:
1282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return ((ord(data[offset]) << 16)
1292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                + struct.unpack('!H', data[offset+1:offset+3])[0])
1302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    elif size_of_size == 4:
1312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return struct.unpack('!L', data[offset:offset+4])[0]
1322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    else:
1332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise Exception('Invalid "size of size" in control block')
1342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1362da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _parse_control_block_specific_data(data, size_of_size, offset=0):
1372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    remaining = len(data) - offset
1382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if remaining < size_of_size:
1392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise Exception('Invalid control block received')
1402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    size = _read_number(data, size_of_size, offset)
1422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    start_position = offset + size_of_size
1442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    end_position = start_position + size
1452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if len(data) < end_position:
1462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise Exception('Invalid size of control block (%d < %d)' % (
1472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                len(data), end_position))
1482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    return data[start_position:end_position], size_of_size + size
1492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1512da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _parse_control_blocks(data):
1522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    blocks = []
1532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    length = len(data)
1542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    pos = 0
1552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    while pos < length:
1572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        first_byte = ord(data[pos])
1582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        pos += 1
1592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        opcode = (first_byte >> 5) & 0x7
1602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        block = _ControlBlock(opcode)
1612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # TODO(bashi): Support more opcode
1632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if opcode == _MUX_OPCODE_ADD_CHANNEL_RESPONSE:
1642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            block.encode = (first_byte >> 2) & 3
1652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            block.rejected = (first_byte >> 4) & 1
1662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            channel_id, advance = _parse_channel_id(data, pos)
1682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            block.channel_id = channel_id
1692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            pos += advance
1702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            size_of_size = (first_byte & 3) + 1
1722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            encoded_handshake, advance = _parse_control_block_specific_data(
1732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                data, size_of_size, pos)
1742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            block.encoded_handshake = encoded_handshake
1752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            pos += advance
1762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            blocks.append(block)
1772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        elif opcode == _MUX_OPCODE_DROP_CHANNEL:
1782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            block.mux_error = (first_byte >> 4) & 1
1792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            channel_id, channel_id_length = _parse_channel_id(data, pos)
1812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            block.channel_id = channel_id
1822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            pos += channel_id_length
1832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
1842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            size_of_size = first_byte & 3
1852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            reason, size = _parse_control_block_specific_data(
1862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                data, size_of_size, pos)
1872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            block.reason = reason
1882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            pos += size
1892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            blocks.append(block)
1902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        elif opcode == _MUX_OPCODE_FLOW_CONTROL:
1912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            channel_id, advance = _parse_channel_id(data, pos)
1922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            block.channel_id = channel_id
1932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            pos += advance
1942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            size_of_quota = (first_byte & 3) + 1
1952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            block.send_quota = _read_number(data, size_of_quota, pos)
1962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            pos += size_of_quota
1972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            blocks.append(block)
1982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        elif opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT:
1992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            size_of_slots = ((first_byte >> 2) & 3) + 1
2002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            size_of_quota = (first_byte & 3) + 1
2012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            block.slots = _read_number(data, size_of_slots, pos)
2022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            pos += size_of_slots
2032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            block.send_quota = _read_number(data, size_of_quota, pos)
2042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            pos += size_of_quota
2052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            blocks.append(block)
2062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        else:
2072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise Exception(
2082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                'Unsupported mux opcode %d received' % opcode)
2092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    return blocks
2112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2132da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _encode_channel_id(channel_id):
2142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if channel_id < 0:
2152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise ValueError('Channel id %d must not be negative' % channel_id)
2162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if channel_id < 2 ** 7:
2182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return chr(channel_id)
2192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if channel_id < 2 ** 14:
2202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return struct.pack('!H', 0x8000 + channel_id)
2212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if channel_id < 2 ** 21:
2222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        first = chr(0xc0 + (channel_id >> 16))
2232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return first + struct.pack('!H', channel_id & 0xffff)
2242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if channel_id < 2 ** 29:
2252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return struct.pack('!L', 0xe0000000 + channel_id)
2262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    raise ValueError('Channel id %d is too large' % channel_id)
2282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2302da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _size_of_number_in_bytes_minus_1(number):
2312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    # Calculate the minimum number of bytes minus 1 that are required to store
2322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    # the data.
2332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if number < 0:
2342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise ValueError('Invalid number: %d' % number)
2352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    elif number < 2 ** 8:
2362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return 0
2372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    elif number < 2 ** 16:
2382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return 1
2392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    elif number < 2 ** 24:
2402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return 2
2412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    elif number < 2 ** 32:
2422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return 3
2432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    else:
2442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        raise ValueError('Invalid number %d' % number)
2452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2472da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _encode_number(number):
2482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    if number < 2 ** 8:
2492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return chr(number)
2502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    elif number < 2 ** 16:
2512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return struct.pack('!H', number)
2522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    elif number < 2 ** 24:
2532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return chr(number >> 16) + struct.pack('!H', number & 0xffff)
2542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    else:
2552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return struct.pack('!L', number)
2562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2582da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _create_add_channel_request(channel_id, encoded_handshake,
2592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                encoding=0):
2602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    length = len(encoded_handshake)
2612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    size_of_length = _size_of_number_in_bytes_minus_1(length)
2622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    first_byte = ((_MUX_OPCODE_ADD_CHANNEL_REQUEST << 5) | (encoding << 2) |
2642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                  size_of_length)
2652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    encoded_length = _encode_number(length)
2662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    return (chr(first_byte) + _encode_channel_id(channel_id) +
2682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            encoded_length + encoded_handshake)
2692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2712da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _create_flow_control(channel_id, replenished_quota):
2722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    size_of_quota = _size_of_number_in_bytes_minus_1(replenished_quota)
2732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    first_byte = ((_MUX_OPCODE_FLOW_CONTROL << 5) | size_of_quota)
2742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    return (chr(first_byte) + _encode_channel_id(channel_id) +
2752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            _encode_number(replenished_quota))
2762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2782da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _MuxReaderThread(threading.Thread):
2792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """Mux reader thread.
2802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    Reads frames and passes them to the mux client. This thread accesses
2822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    private functions/variables of the mux client.
2832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """
2842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def __init__(self, mux):
2862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        threading.Thread.__init__(self)
2872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.setDaemon(True)
2882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._mux = mux
2892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._stop_requested = False
2902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _receive_message(self):
2922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        first_opcode = None
2932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        pending_payload = []
2942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        while not self._stop_requested:
2952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            fin, rsv1, rsv2, rsv3, opcode, payload_length = (
2962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                client_for_testing.read_frame_header(self._mux._socket))
2972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
2982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if not first_opcode:
2992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                if opcode == client_for_testing.OPCODE_TEXT:
3002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    raise Exception('Received a text message on physical '
3012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                    'connection')
3022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                if opcode == client_for_testing.OPCODE_CONTINUATION:
3032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    raise Exception('Received an intermediate frame but '
3042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                    'fragmentation was not started')
3052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                if (opcode == client_for_testing.OPCODE_BINARY or
3062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    opcode == client_for_testing.OPCODE_PONG or
3072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    opcode == client_for_testing.OPCODE_PONG or
3082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    opcode == client_for_testing.OPCODE_CLOSE):
3092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    first_opcode = opcode
3102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                else:
3112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    raise Exception('Received an undefined opcode frame: %d' %
3122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                    opcode)
3132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            elif opcode != client_for_testing.OPCODE_CONTINUATION:
3152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                raise Exception('Received a new opcode before '
3162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                'terminating fragmentation')
3172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            payload = client_for_testing.receive_bytes(
3192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._mux._socket, payload_length)
3202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if self._mux._incoming_frame_filter is not None:
3222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                payload = self._mux._incoming_frame_filter.filter(payload)
3232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            pending_payload.append(payload)
3252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if fin:
3272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                break
3282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if self._stop_requested:
3302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            return None, None
3312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        message = ''.join(pending_payload)
3332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return first_opcode, message
3342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def request_stop(self):
3362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._stop_requested = True
3372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def run(self):
3392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
3402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            while not self._stop_requested:
3412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                # opcode is OPCODE_BINARY or control opcodes when a message
3422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                # is succesfully received.
3432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                opcode, message = self._receive_message()
3442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                if not opcode:
3452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    return
3462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                if opcode == client_for_testing.OPCODE_BINARY:
3472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    channel_id, advance = _parse_channel_id(message)
3482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    self._mux._dispatch_frame(channel_id, message[advance:])
3492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                else:
3502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    self._mux._process_control_message(opcode, message)
3512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        finally:
3522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._mux._notify_reader_done()
3532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3552da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _InnerFrame(object):
3562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def __init__(self, fin, rsv1, rsv2, rsv3, opcode, payload):
3572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.fin = fin
3582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.rsv1 = rsv1
3592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.rsv2 = rsv2
3602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.rsv3 = rsv3
3612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.opcode = opcode
3622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.payload = payload
3632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3652da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _LogicalChannelData(object):
3662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def __init__(self):
3672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.queue = Queue.Queue()
3682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.send_quota = 0
3692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.receive_quota = 0
3702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3722da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass MuxClient(object):
3732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """WebSocket mux client.
3742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    Note that this class is NOT thread-safe. Do not access an instance of this
3762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    class from multiple threads at a same time.
3772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    """
3782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def __init__(self, options):
3802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logger = util.get_class_logger(self)
3812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._options = options
3832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._options.enable_mux()
3842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._stream = None
3852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._socket = None
3862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._handshake = client_for_testing.WebSocketHandshake(self._options)
3872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._incoming_frame_filter = None
3882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._outgoing_frame_filter = None
3892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
3902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._is_active = False
3912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._read_thread = None
3922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._control_blocks_condition = threading.Condition()
3932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._control_blocks = []
3942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._channel_slots = collections.deque()
3952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logical_channels_condition = threading.Condition();
3962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logical_channels = {}
3972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._timeout = 2
3982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._physical_connection_close_event = None
3992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._physical_connection_close_message = None
4002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _parse_inner_frame(self, data):
4022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if len(data) == 0:
4032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise Exception('Invalid encapsulated frame received')
4042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        first_byte = ord(data[0])
4062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        fin = (first_byte << 7) & 1
4072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        rsv1 = (first_byte << 6) & 1
4082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        rsv2 = (first_byte << 5) & 1
4092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        rsv3 = (first_byte << 4) & 1
4102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        opcode = first_byte & 0xf
4112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if self._outgoing_frame_filter:
4132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            payload = self._outgoing_frame_filter.filter(
4142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                data[1:])
4152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        else:
4162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            payload = data[1:]
4172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        return _InnerFrame(fin, rsv1, rsv2, rsv3, opcode, payload)
4192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _process_mux_control_blocks(self):
4212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        for block in self._control_blocks:
4222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if block.opcode == _MUX_OPCODE_ADD_CHANNEL_RESPONSE:
4232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                # AddChannelResponse will be handled in add_channel().
4242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                continue
4252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            elif block.opcode == _MUX_OPCODE_FLOW_CONTROL:
4262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                try:
4272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    self._logical_channels_condition.acquire()
4282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    if not block.channel_id in self._logical_channels:
4292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                        raise Exception('Invalid flow control received for '
4302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                        'channel id %d' % block.channel_id)
4312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    self._logical_channels[block.channel_id].send_quota += (
4322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                        block.send_quota)
4332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    self._logical_channels_condition.notify()
4342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                finally:
4352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    self._logical_channels_condition.release()
4362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            elif block.opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT:
4372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._channel_slots.extend([block.send_quota] * block.slots)
4382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _dispatch_frame(self, channel_id, payload):
4402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if channel_id == _CONTROL_CHANNEL_ID:
4412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            try:
4422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._control_blocks_condition.acquire()
4432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._control_blocks += _parse_control_blocks(payload)
4442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._process_mux_control_blocks()
4452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._control_blocks_condition.notify()
4462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            finally:
4472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._control_blocks_condition.release()
4482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        else:
4492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            try:
4502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._logical_channels_condition.acquire()
4512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                if not channel_id in self._logical_channels:
4522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    raise Exception('Received logical frame on channel id '
4532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                    '%d, which is not established' %
4542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                    channel_id)
4552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                inner_frame = self._parse_inner_frame(payload)
4572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._logical_channels[channel_id].receive_quota -= (
4582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                        len(inner_frame.payload))
4592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                if self._logical_channels[channel_id].receive_quota < 0:
4602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    raise Exception('The server violates quota on '
4612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                    'channel id %d' % channel_id)
4622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            finally:
4632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._logical_channels_condition.release()
4642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logical_channels[channel_id].queue.put(inner_frame)
4652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _process_control_message(self, opcode, message):
4672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # Ping/Pong are not supported.
4682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if opcode == client_for_testing.OPCODE_CLOSE:
4692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._physical_connection_close_message = message
4702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if self._is_active:
4712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._stream.send_close(
4722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    code=client_for_testing.STATUS_NORMAL_CLOSURE, reason='')
4732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._read_thread.request_stop()
4742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if self._physical_connection_close_event:
4762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._physical_connection_close_event.set()
4772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _notify_reader_done(self):
4792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logger.debug('Read thread terminated.')
4802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self.close_socket()
4812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _assert_channel_slot_available(self):
4832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
4842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._control_blocks_condition.acquire()
4852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if len(self._channel_slots) == 0:
4862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                # Wait once
4872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._control_blocks_condition.wait(timeout=self._timeout)
4882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        finally:
4892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._control_blocks_condition.release()
4902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if len(self._channel_slots) == 0:
4922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise Exception('Failed to receive NewChannelSlot')
4932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
4942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _assert_send_quota_available(self, channel_id):
4952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
4962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logical_channels_condition.acquire()
4972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if self._logical_channels[channel_id].send_quota == 0:
4982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                # Wait once
4992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._logical_channels_condition.wait(timeout=self._timeout)
5002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        finally:
5012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logical_channels_condition.release()
5022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if self._logical_channels[channel_id].send_quota == 0:
5042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise Exception('Failed to receive FlowControl for channel id %d' %
5052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                            channel_id)
5062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def connect(self):
5082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._socket = socket.socket()
5092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._socket.settimeout(self._options.socket_timeout)
5102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._socket.connect((self._options.server_host,
5122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                              self._options.server_port))
5132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if self._options.use_tls:
5142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._socket = _TLSSocket(self._socket)
5152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._handshake.handshake(self._socket)
5172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._stream = client_for_testing.WebSocketStream(
5182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._socket, self._handshake)
5192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logical_channels[_DEFAULT_CHANNEL_ID] = _LogicalChannelData()
5212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._read_thread = _MuxReaderThread(self)
5232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._read_thread.start()
5242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._assert_channel_slot_available()
5262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._assert_send_quota_available(_DEFAULT_CHANNEL_ID)
5272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._is_active = True
5292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logger.info('Connection established')
5302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def add_channel(self, channel_id, options):
5322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if not self._is_active:
5332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise Exception('Mux client is not active')
5342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if channel_id in self._logical_channels:
5362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise Exception('Channel id %d already exists' % channel_id)
5372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
5392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            send_quota = self._channel_slots.popleft()
5402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        except IndexError, e:
5412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise Exception('No channel slots: %r' % e)
5422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # Create AddChannel request
5442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        request_line = 'GET %s HTTP/1.1\r\n' % options.resource
5452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        fields = []
5462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        fields.append('Upgrade: websocket\r\n')
5472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        fields.append('Connection: Upgrade\r\n')
5482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if options.server_port == client_for_testing.DEFAULT_PORT:
5492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            fields.append('Host: %s\r\n' % options.server_host.lower())
5502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        else:
5512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            fields.append('Host: %s:%d\r\n' % (options.server_host.lower(),
5522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                                               options.server_port))
5532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        fields.append('Origin: %s\r\n' % options.origin.lower())
5542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        original_key = os.urandom(16)
5562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        key = base64.b64encode(original_key)
5572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        fields.append('Sec-WebSocket-Key: %s\r\n' % key)
5582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        fields.append('Sec-WebSocket-Version: 13\r\n')
5602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if len(options.extensions) > 0:
5622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            fields.append('Sec-WebSocket-Extensions: %s\r\n' %
5632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                          ', '.join(options.extensions))
5642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        handshake = request_line + ''.join(fields) + '\r\n'
5662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        add_channel_request = _create_add_channel_request(
5672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            channel_id, handshake)
5682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        payload = _encode_channel_id(_CONTROL_CHANNEL_ID) + add_channel_request
5692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._stream.send_binary(payload)
5702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # Wait AddChannelResponse
5722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logger.debug('Waiting AddChannelResponse for the request...')
5732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        response = None
5742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
5752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._control_blocks_condition.acquire()
5762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            while True:
5772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                for block in self._control_blocks:
5782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    if block.opcode != _MUX_OPCODE_ADD_CHANNEL_RESPONSE:
5792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                        continue
5802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    if block.channel_id == channel_id:
5812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                        response = block
5822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                        self._control_blocks.remove(response)
5832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                        break
5842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                if response:
5852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    break
5862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                self._control_blocks_condition.wait(self._timeout)
5872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                if not self._is_active:
5882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                    raise Exception('AddChannelRequest timed out')
5892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        finally:
5902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._control_blocks_condition.release()
5912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # Validate AddChannelResponse
5932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if response.rejected:
5942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise Exception('The server rejected AddChannelRequest')
5952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        fields = _parse_handshake_response(response.encoded_handshake)
5972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
5982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if not 'upgrade' in fields:
5992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise Exception('No Upgrade header')
6002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if fields['upgrade'] != 'websocket':
6012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise Exception('Wrong Upgrade header')
6022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if not 'connection' in fields:
6032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise Exception('No Connection header')
6042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if fields['connection'] != 'Upgrade':
6052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise Exception('Wrong Connection header')
6062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if not 'sec-websocket-accept' in fields:
6072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise Exception('No Sec-WebSocket-Accept header')
6082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        accept = fields['sec-websocket-accept']
6102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
6112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            decoded_accept = base64.b64decode(accept)
6122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        except TypeError, e:
6132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise Exception(
6142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                'Illegal value for header Sec-WebSocket-Accept: ' + accept)
6152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if len(decoded_accept) != 20:
6172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise Exception(
6182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                'Decoded value of Sec-WebSocket-Accept is not 20-byte long')
6192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        original_expected_accept = util.sha1_hash(
6212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            key + client_for_testing.WEBSOCKET_ACCEPT_UUID).digest()
6222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        expected_accept = base64.b64encode(original_expected_accept)
6232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if accept != expected_accept:
6252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise Exception(
6262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                'Invalid Sec-WebSocket-Accept header: %r (expected) != %r '
6272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                '(actual)' % (accept, expected_accept))
6282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logical_channels_condition.acquire()
6302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logical_channels[channel_id] = _LogicalChannelData()
6312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logical_channels[channel_id].send_quota = send_quota
6322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logical_channels_condition.release()
6332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._logger.debug('Logical channel %d established' % channel_id)
6352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def _check_logical_channel_is_opened(self, channel_id):
6372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if not self._is_active:
6382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise Exception('Mux client is not active')
6392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if not channel_id in self._logical_channels:
6412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise Exception('Logical channel %d is not established.')
6422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def drop_channel(self, channel_id):
6442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # TODO(bashi): Implement
6452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        pass
6462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def send_flow_control(self, channel_id, replenished_quota):
6482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._check_logical_channel_is_opened(channel_id)
6492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        flow_control = _create_flow_control(channel_id, replenished_quota)
6502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        payload = _encode_channel_id(_CONTROL_CHANNEL_ID) + flow_control
6512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        # Replenish receive quota
6522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
6532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logical_channels_condition.acquire()
6542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logical_channels[channel_id].receive_quota += (
6552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                replenished_quota)
6562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        finally:
6572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logical_channels_condition.release()
6582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._stream.send_binary(payload)
6592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def send_message(self, channel_id, message, end=True, binary=False):
6612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._check_logical_channel_is_opened(channel_id)
6622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if binary:
6642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            first_byte = (end << 7) | client_for_testing.OPCODE_BINARY
6652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        else:
6662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            first_byte = (end << 7) | client_for_testing.OPCODE_TEXT
6672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            message = message.encode('utf-8')
6682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
6702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logical_channels_condition.acquire()
6712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            if self._logical_channels[channel_id].send_quota < len(message):
6722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                raise Exception('Send quota violation: %d < %d' % (
6732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                        self._logical_channels[channel_id].send_quota,
6742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                        len(message)))
6752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logical_channels[channel_id].send_quota -= len(message)
6772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        finally:
6782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            self._logical_channels_condition.release()
6792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        payload = _encode_channel_id(channel_id) + chr(first_byte) + message
6802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._stream.send_binary(payload)
6812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def assert_receive(self, channel_id, payload, binary=False):
6832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._check_logical_channel_is_opened(channel_id)
6842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
6862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            inner_frame = self._logical_channels[channel_id].queue.get(
6872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                timeout=self._timeout)
6882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        except Queue.Empty, e:
6892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise Exception('Cannot receive message from channel id %d' %
6902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                            channel_id)
6912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if binary:
6932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            opcode = client_for_testing.OPCODE_BINARY
6942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        else:
6952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            opcode = client_for_testing.OPCODE_TEXT
6962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
6972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if inner_frame.opcode != opcode:
6982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise Exception('Unexpected opcode received (%r != %r)' %
6992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                            (expected_opcode, inner_frame.opcode))
7002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if inner_frame.payload != payload:
7022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise Exception('Unexpected payload received')
7032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def send_close(self, channel_id, code=None, reason=''):
7052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._check_logical_channel_is_opened(channel_id)
7062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if code is not None:
7082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            body = struct.pack('!H', code) + reason.encode('utf-8')
7092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        else:
7102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            body = ''
7112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        first_byte = (1 << 7) | client_for_testing.OPCODE_CLOSE
7132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        payload = _encode_channel_id(channel_id) + chr(first_byte) + body
7142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._stream.send_binary(payload)
7152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def assert_receive_close(self, channel_id):
7172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._check_logical_channel_is_opened(channel_id)
7182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        try:
7202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            inner_frame = self._logical_channels[channel_id].queue.get(
7212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                timeout=self._timeout)
7222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        except Queue.Empty, e:
7232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise Exception('Cannot receive message from channel id %d' %
7242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis                            channel_id)
7252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if inner_frame.opcode != client_for_testing.OPCODE_CLOSE:
7262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise Exception('Didn\'t receive close frame')
7272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def send_physical_connection_close(self, code=None, reason=''):
7292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._physical_connection_close_event = threading.Event()
7302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._stream.send_close(code, reason)
7312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    # This method can be used only after calling
7332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    # send_physical_connection_close().
7342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def assert_physical_connection_receive_close(
7352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self, code=client_for_testing.STATUS_NORMAL_CLOSURE, reason=''):
7362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._physical_connection_close_event.wait(timeout=self._timeout)
7372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        if (not self._physical_connection_close_event.isSet() or
7382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            not self._physical_connection_close_message):
7392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis            raise Exception('Didn\'t receive closing handshake')
7402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis
7412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis    def close_socket(self):
7422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._is_active = False
7432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis        self._socket.close()
744