12da489cd246702bee5938545b18a6f710ed214bcJamie Gennis#!/usr/bin/env python 22da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# 32da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# Copyright 2012, Google Inc. 42da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# All rights reserved. 52da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# 62da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# Redistribution and use in source and binary forms, with or without 72da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# modification, are permitted provided that the following conditions are 82da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# met: 92da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# 102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# * Redistributions of source code must retain the above copyright 112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# notice, this list of conditions and the following disclaimer. 122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# * Redistributions in binary form must reproduce the above 132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# copyright notice, this list of conditions and the following disclaimer 142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# in the documentation and/or other materials provided with the 152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# distribution. 162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# * Neither the name of Google Inc. nor the names of its 172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# contributors may be used to endorse or promote products derived from 182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# this software without specific prior written permission. 192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# 202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis"""WebSocket client utility for testing mux extension. 342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 352da489cd246702bee5938545b18a6f710ed214bcJamie GennisThis code should be independent from mod_pywebsocket. See the comment of 362da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclient_for_testing.py. 372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 382da489cd246702bee5938545b18a6f710ed214bcJamie GennisNOTE: This code is far from robust like client_for_testing.py. 392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis""" 402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 432da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport Queue 442da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport base64 452da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport collections 462da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport email 472da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport email.parser 482da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport logging 492da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport math 502da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport os 512da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport random 522da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport socket 532da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport struct 542da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport threading 552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 562da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket import util 572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 582da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom test import client_for_testing 592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_CONTROL_CHANNEL_ID = 0 622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_DEFAULT_CHANNEL_ID = 1 632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_MUX_OPCODE_ADD_CHANNEL_REQUEST = 0 652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_MUX_OPCODE_ADD_CHANNEL_RESPONSE = 1 662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_MUX_OPCODE_FLOW_CONTROL = 2 672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_MUX_OPCODE_DROP_CHANNEL = 3 682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_MUX_OPCODE_NEW_CHANNEL_SLOT = 4 692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 712da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _ControlBlock: 722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def __init__(self, opcode): 732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.opcode = opcode 742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 762da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _parse_handshake_response(response): 772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis status_line, header_lines = response.split('\r\n', 1) 782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis words = status_line.split(' ') 802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if len(words) < 3: 812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise ValueError('Bad Status-Line syntax %r' % status_line) 822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis [version, response_code] = words[:2] 832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if version != 'HTTP/1.1': 842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise ValueError('Bad response version %r' % version) 852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if response_code != '101': 872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise ValueError('Bad response code %r ' % response_code) 882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis headers = email.parser.Parser().parsestr(header_lines) 892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return headers 902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 922da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _parse_channel_id(data, offset=0): 932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis length = len(data) 942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis remaining = length - offset 952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if remaining <= 0: 972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('No channel id found') 982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id = ord(data[offset]) 1002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id_length = 1 1012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if channel_id & 0xe0 == 0xe0: 1022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if remaining < 4: 1032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Invalid channel id format') 1042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id = struct.unpack('!L', 1052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis data[offset:offset+4])[0] & 0x1fffffff 1062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id_length = 4 1072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif channel_id & 0xc0 == 0xc0: 1082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if remaining < 3: 1092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Invalid channel id format') 1102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id = (((channel_id & 0x1f) << 16) + 1112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis struct.unpack('!H', data[offset+1:offset+3])[0]) 1122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id_length = 3 1132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif channel_id & 0x80 == 0x80: 1142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if remaining < 2: 1152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Invalid channel id format') 1162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id = struct.unpack('!H', data[offset:offset+2])[0] & 0x3fff 1172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id_length = 2 1182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return channel_id, channel_id_length 1202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1222da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _read_number(data, size_of_size, offset=0): 1232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if size_of_size == 1: 1242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return ord(data[offset]) 1252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif size_of_size == 2: 1262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return struct.unpack('!H', data[offset:offset+2])[0] 1272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif size_of_size == 3: 1282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return ((ord(data[offset]) << 16) 1292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis + struct.unpack('!H', data[offset+1:offset+3])[0]) 1302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif size_of_size == 4: 1312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return struct.unpack('!L', data[offset:offset+4])[0] 1322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis else: 1332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Invalid "size of size" in control block') 1342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1362da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _parse_control_block_specific_data(data, size_of_size, offset=0): 1372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis remaining = len(data) - offset 1382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if remaining < size_of_size: 1392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Invalid control block received') 1402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis size = _read_number(data, size_of_size, offset) 1422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis start_position = offset + size_of_size 1442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis end_position = start_position + size 1452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if len(data) < end_position: 1462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Invalid size of control block (%d < %d)' % ( 1472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis len(data), end_position)) 1482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return data[start_position:end_position], size_of_size + size 1492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1512da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _parse_control_blocks(data): 1522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis blocks = [] 1532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis length = len(data) 1542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis pos = 0 1552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis while pos < length: 1572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis first_byte = ord(data[pos]) 1582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis pos += 1 1592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis opcode = (first_byte >> 5) & 0x7 1602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis block = _ControlBlock(opcode) 1612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # TODO(bashi): Support more opcode 1632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if opcode == _MUX_OPCODE_ADD_CHANNEL_RESPONSE: 1642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis block.encode = (first_byte >> 2) & 3 1652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis block.rejected = (first_byte >> 4) & 1 1662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id, advance = _parse_channel_id(data, pos) 1682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis block.channel_id = channel_id 1692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis pos += advance 1702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis size_of_size = (first_byte & 3) + 1 1722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis encoded_handshake, advance = _parse_control_block_specific_data( 1732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis data, size_of_size, pos) 1742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis block.encoded_handshake = encoded_handshake 1752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis pos += advance 1762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis blocks.append(block) 1772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif opcode == _MUX_OPCODE_DROP_CHANNEL: 1782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis block.mux_error = (first_byte >> 4) & 1 1792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id, channel_id_length = _parse_channel_id(data, pos) 1812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis block.channel_id = channel_id 1822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis pos += channel_id_length 1832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis size_of_size = first_byte & 3 1852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis reason, size = _parse_control_block_specific_data( 1862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis data, size_of_size, pos) 1872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis block.reason = reason 1882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis pos += size 1892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis blocks.append(block) 1902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif opcode == _MUX_OPCODE_FLOW_CONTROL: 1912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id, advance = _parse_channel_id(data, pos) 1922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis block.channel_id = channel_id 1932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis pos += advance 1942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis size_of_quota = (first_byte & 3) + 1 1952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis block.send_quota = _read_number(data, size_of_quota, pos) 1962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis pos += size_of_quota 1972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis blocks.append(block) 1982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT: 1992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis size_of_slots = ((first_byte >> 2) & 3) + 1 2002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis size_of_quota = (first_byte & 3) + 1 2012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis block.slots = _read_number(data, size_of_slots, pos) 2022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis pos += size_of_slots 2032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis block.send_quota = _read_number(data, size_of_quota, pos) 2042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis pos += size_of_quota 2052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis blocks.append(block) 2062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis else: 2072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception( 2082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'Unsupported mux opcode %d received' % opcode) 2092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return blocks 2112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2132da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _encode_channel_id(channel_id): 2142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if channel_id < 0: 2152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise ValueError('Channel id %d must not be negative' % channel_id) 2162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if channel_id < 2 ** 7: 2182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return chr(channel_id) 2192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if channel_id < 2 ** 14: 2202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return struct.pack('!H', 0x8000 + channel_id) 2212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if channel_id < 2 ** 21: 2222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis first = chr(0xc0 + (channel_id >> 16)) 2232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return first + struct.pack('!H', channel_id & 0xffff) 2242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if channel_id < 2 ** 29: 2252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return struct.pack('!L', 0xe0000000 + channel_id) 2262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise ValueError('Channel id %d is too large' % channel_id) 2282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2302da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _size_of_number_in_bytes_minus_1(number): 2312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # Calculate the minimum number of bytes minus 1 that are required to store 2322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # the data. 2332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if number < 0: 2342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise ValueError('Invalid number: %d' % number) 2352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif number < 2 ** 8: 2362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return 0 2372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif number < 2 ** 16: 2382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return 1 2392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif number < 2 ** 24: 2402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return 2 2412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif number < 2 ** 32: 2422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return 3 2432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis else: 2442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise ValueError('Invalid number %d' % number) 2452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2472da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _encode_number(number): 2482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if number < 2 ** 8: 2492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return chr(number) 2502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif number < 2 ** 16: 2512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return struct.pack('!H', number) 2522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif number < 2 ** 24: 2532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return chr(number >> 16) + struct.pack('!H', number & 0xffff) 2542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis else: 2552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return struct.pack('!L', number) 2562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2582da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _create_add_channel_request(channel_id, encoded_handshake, 2592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis encoding=0): 2602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis length = len(encoded_handshake) 2612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis size_of_length = _size_of_number_in_bytes_minus_1(length) 2622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis first_byte = ((_MUX_OPCODE_ADD_CHANNEL_REQUEST << 5) | (encoding << 2) | 2642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis size_of_length) 2652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis encoded_length = _encode_number(length) 2662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return (chr(first_byte) + _encode_channel_id(channel_id) + 2682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis encoded_length + encoded_handshake) 2692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2712da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _create_flow_control(channel_id, replenished_quota): 2722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis size_of_quota = _size_of_number_in_bytes_minus_1(replenished_quota) 2732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis first_byte = ((_MUX_OPCODE_FLOW_CONTROL << 5) | size_of_quota) 2742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return (chr(first_byte) + _encode_channel_id(channel_id) + 2752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis _encode_number(replenished_quota)) 2762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2782da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _MuxReaderThread(threading.Thread): 2792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Mux reader thread. 2802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Reads frames and passes them to the mux client. This thread accesses 2822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis private functions/variables of the mux client. 2832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 2842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def __init__(self, mux): 2862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis threading.Thread.__init__(self) 2872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.setDaemon(True) 2882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._mux = mux 2892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._stop_requested = False 2902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _receive_message(self): 2922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis first_opcode = None 2932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis pending_payload = [] 2942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis while not self._stop_requested: 2952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis fin, rsv1, rsv2, rsv3, opcode, payload_length = ( 2962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis client_for_testing.read_frame_header(self._mux._socket)) 2972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if not first_opcode: 2992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if opcode == client_for_testing.OPCODE_TEXT: 3002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Received a text message on physical ' 3012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'connection') 3022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if opcode == client_for_testing.OPCODE_CONTINUATION: 3032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Received an intermediate frame but ' 3042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'fragmentation was not started') 3052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if (opcode == client_for_testing.OPCODE_BINARY or 3062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis opcode == client_for_testing.OPCODE_PONG or 3072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis opcode == client_for_testing.OPCODE_PONG or 3082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis opcode == client_for_testing.OPCODE_CLOSE): 3092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis first_opcode = opcode 3102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis else: 3112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Received an undefined opcode frame: %d' % 3122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis opcode) 3132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif opcode != client_for_testing.OPCODE_CONTINUATION: 3152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Received a new opcode before ' 3162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'terminating fragmentation') 3172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis payload = client_for_testing.receive_bytes( 3192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._mux._socket, payload_length) 3202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if self._mux._incoming_frame_filter is not None: 3222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis payload = self._mux._incoming_frame_filter.filter(payload) 3232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis pending_payload.append(payload) 3252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if fin: 3272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis break 3282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if self._stop_requested: 3302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return None, None 3312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis message = ''.join(pending_payload) 3332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return first_opcode, message 3342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def request_stop(self): 3362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._stop_requested = True 3372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def run(self): 3392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 3402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis while not self._stop_requested: 3412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # opcode is OPCODE_BINARY or control opcodes when a message 3422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # is succesfully received. 3432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis opcode, message = self._receive_message() 3442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if not opcode: 3452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return 3462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if opcode == client_for_testing.OPCODE_BINARY: 3472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id, advance = _parse_channel_id(message) 3482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._mux._dispatch_frame(channel_id, message[advance:]) 3492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis else: 3502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._mux._process_control_message(opcode, message) 3512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis finally: 3522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._mux._notify_reader_done() 3532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3552da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _InnerFrame(object): 3562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def __init__(self, fin, rsv1, rsv2, rsv3, opcode, payload): 3572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.fin = fin 3582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.rsv1 = rsv1 3592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.rsv2 = rsv2 3602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.rsv3 = rsv3 3612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.opcode = opcode 3622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.payload = payload 3632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3652da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _LogicalChannelData(object): 3662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def __init__(self): 3672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.queue = Queue.Queue() 3682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.send_quota = 0 3692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.receive_quota = 0 3702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3722da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass MuxClient(object): 3732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """WebSocket mux client. 3742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Note that this class is NOT thread-safe. Do not access an instance of this 3762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis class from multiple threads at a same time. 3772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 3782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def __init__(self, options): 3802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger = util.get_class_logger(self) 3812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._options = options 3832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._options.enable_mux() 3842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._stream = None 3852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._socket = None 3862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._handshake = client_for_testing.WebSocketHandshake(self._options) 3872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._incoming_frame_filter = None 3882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._outgoing_frame_filter = None 3892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._is_active = False 3912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._read_thread = None 3922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._control_blocks_condition = threading.Condition() 3932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._control_blocks = [] 3942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._channel_slots = collections.deque() 3952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition = threading.Condition(); 3962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels = {} 3972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._timeout = 2 3982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._physical_connection_close_event = None 3992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._physical_connection_close_message = None 4002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _parse_inner_frame(self, data): 4022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if len(data) == 0: 4032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Invalid encapsulated frame received') 4042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis first_byte = ord(data[0]) 4062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis fin = (first_byte << 7) & 1 4072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis rsv1 = (first_byte << 6) & 1 4082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis rsv2 = (first_byte << 5) & 1 4092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis rsv3 = (first_byte << 4) & 1 4102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis opcode = first_byte & 0xf 4112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if self._outgoing_frame_filter: 4132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis payload = self._outgoing_frame_filter.filter( 4142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis data[1:]) 4152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis else: 4162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis payload = data[1:] 4172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return _InnerFrame(fin, rsv1, rsv2, rsv3, opcode, payload) 4192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _process_mux_control_blocks(self): 4212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis for block in self._control_blocks: 4222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if block.opcode == _MUX_OPCODE_ADD_CHANNEL_RESPONSE: 4232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # AddChannelResponse will be handled in add_channel(). 4242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis continue 4252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif block.opcode == _MUX_OPCODE_FLOW_CONTROL: 4262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 4272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.acquire() 4282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if not block.channel_id in self._logical_channels: 4292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Invalid flow control received for ' 4302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'channel id %d' % block.channel_id) 4312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels[block.channel_id].send_quota += ( 4322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis block.send_quota) 4332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.notify() 4342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis finally: 4352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.release() 4362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif block.opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT: 4372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._channel_slots.extend([block.send_quota] * block.slots) 4382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _dispatch_frame(self, channel_id, payload): 4402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if channel_id == _CONTROL_CHANNEL_ID: 4412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 4422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._control_blocks_condition.acquire() 4432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._control_blocks += _parse_control_blocks(payload) 4442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._process_mux_control_blocks() 4452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._control_blocks_condition.notify() 4462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis finally: 4472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._control_blocks_condition.release() 4482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis else: 4492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 4502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.acquire() 4512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if not channel_id in self._logical_channels: 4522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Received logical frame on channel id ' 4532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis '%d, which is not established' % 4542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id) 4552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis inner_frame = self._parse_inner_frame(payload) 4572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels[channel_id].receive_quota -= ( 4582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis len(inner_frame.payload)) 4592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if self._logical_channels[channel_id].receive_quota < 0: 4602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('The server violates quota on ' 4612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'channel id %d' % channel_id) 4622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis finally: 4632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.release() 4642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels[channel_id].queue.put(inner_frame) 4652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _process_control_message(self, opcode, message): 4672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # Ping/Pong are not supported. 4682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if opcode == client_for_testing.OPCODE_CLOSE: 4692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._physical_connection_close_message = message 4702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if self._is_active: 4712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._stream.send_close( 4722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis code=client_for_testing.STATUS_NORMAL_CLOSURE, reason='') 4732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._read_thread.request_stop() 4742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if self._physical_connection_close_event: 4762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._physical_connection_close_event.set() 4772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _notify_reader_done(self): 4792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug('Read thread terminated.') 4802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.close_socket() 4812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _assert_channel_slot_available(self): 4832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 4842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._control_blocks_condition.acquire() 4852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if len(self._channel_slots) == 0: 4862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # Wait once 4872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._control_blocks_condition.wait(timeout=self._timeout) 4882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis finally: 4892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._control_blocks_condition.release() 4902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if len(self._channel_slots) == 0: 4922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Failed to receive NewChannelSlot') 4932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _assert_send_quota_available(self, channel_id): 4952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 4962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.acquire() 4972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if self._logical_channels[channel_id].send_quota == 0: 4982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # Wait once 4992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.wait(timeout=self._timeout) 5002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis finally: 5012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.release() 5022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if self._logical_channels[channel_id].send_quota == 0: 5042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Failed to receive FlowControl for channel id %d' % 5052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id) 5062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def connect(self): 5082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._socket = socket.socket() 5092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._socket.settimeout(self._options.socket_timeout) 5102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._socket.connect((self._options.server_host, 5122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._options.server_port)) 5132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if self._options.use_tls: 5142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._socket = _TLSSocket(self._socket) 5152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._handshake.handshake(self._socket) 5172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._stream = client_for_testing.WebSocketStream( 5182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._socket, self._handshake) 5192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels[_DEFAULT_CHANNEL_ID] = _LogicalChannelData() 5212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._read_thread = _MuxReaderThread(self) 5232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._read_thread.start() 5242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._assert_channel_slot_available() 5262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._assert_send_quota_available(_DEFAULT_CHANNEL_ID) 5272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._is_active = True 5292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.info('Connection established') 5302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def add_channel(self, channel_id, options): 5322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if not self._is_active: 5332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Mux client is not active') 5342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if channel_id in self._logical_channels: 5362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Channel id %d already exists' % channel_id) 5372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 5392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis send_quota = self._channel_slots.popleft() 5402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis except IndexError, e: 5412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('No channel slots: %r' % e) 5422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # Create AddChannel request 5442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis request_line = 'GET %s HTTP/1.1\r\n' % options.resource 5452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis fields = [] 5462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis fields.append('Upgrade: websocket\r\n') 5472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis fields.append('Connection: Upgrade\r\n') 5482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if options.server_port == client_for_testing.DEFAULT_PORT: 5492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis fields.append('Host: %s\r\n' % options.server_host.lower()) 5502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis else: 5512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis fields.append('Host: %s:%d\r\n' % (options.server_host.lower(), 5522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis options.server_port)) 5532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis fields.append('Origin: %s\r\n' % options.origin.lower()) 5542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis original_key = os.urandom(16) 5562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis key = base64.b64encode(original_key) 5572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis fields.append('Sec-WebSocket-Key: %s\r\n' % key) 5582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis fields.append('Sec-WebSocket-Version: 13\r\n') 5602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if len(options.extensions) > 0: 5622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis fields.append('Sec-WebSocket-Extensions: %s\r\n' % 5632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis ', '.join(options.extensions)) 5642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis handshake = request_line + ''.join(fields) + '\r\n' 5662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis add_channel_request = _create_add_channel_request( 5672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id, handshake) 5682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis payload = _encode_channel_id(_CONTROL_CHANNEL_ID) + add_channel_request 5692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._stream.send_binary(payload) 5702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # Wait AddChannelResponse 5722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug('Waiting AddChannelResponse for the request...') 5732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis response = None 5742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 5752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._control_blocks_condition.acquire() 5762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis while True: 5772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis for block in self._control_blocks: 5782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if block.opcode != _MUX_OPCODE_ADD_CHANNEL_RESPONSE: 5792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis continue 5802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if block.channel_id == channel_id: 5812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis response = block 5822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._control_blocks.remove(response) 5832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis break 5842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if response: 5852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis break 5862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._control_blocks_condition.wait(self._timeout) 5872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if not self._is_active: 5882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('AddChannelRequest timed out') 5892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis finally: 5902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._control_blocks_condition.release() 5912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # Validate AddChannelResponse 5932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if response.rejected: 5942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('The server rejected AddChannelRequest') 5952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis fields = _parse_handshake_response(response.encoded_handshake) 5972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if not 'upgrade' in fields: 5992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('No Upgrade header') 6002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if fields['upgrade'] != 'websocket': 6012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Wrong Upgrade header') 6022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if not 'connection' in fields: 6032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('No Connection header') 6042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if fields['connection'] != 'Upgrade': 6052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Wrong Connection header') 6062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if not 'sec-websocket-accept' in fields: 6072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('No Sec-WebSocket-Accept header') 6082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis accept = fields['sec-websocket-accept'] 6102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 6112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis decoded_accept = base64.b64decode(accept) 6122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis except TypeError, e: 6132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception( 6142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'Illegal value for header Sec-WebSocket-Accept: ' + accept) 6152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if len(decoded_accept) != 20: 6172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception( 6182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'Decoded value of Sec-WebSocket-Accept is not 20-byte long') 6192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis original_expected_accept = util.sha1_hash( 6212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis key + client_for_testing.WEBSOCKET_ACCEPT_UUID).digest() 6222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis expected_accept = base64.b64encode(original_expected_accept) 6232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if accept != expected_accept: 6252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception( 6262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'Invalid Sec-WebSocket-Accept header: %r (expected) != %r ' 6272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis '(actual)' % (accept, expected_accept)) 6282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.acquire() 6302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels[channel_id] = _LogicalChannelData() 6312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels[channel_id].send_quota = send_quota 6322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.release() 6332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug('Logical channel %d established' % channel_id) 6352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _check_logical_channel_is_opened(self, channel_id): 6372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if not self._is_active: 6382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Mux client is not active') 6392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if not channel_id in self._logical_channels: 6412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Logical channel %d is not established.') 6422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def drop_channel(self, channel_id): 6442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # TODO(bashi): Implement 6452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis pass 6462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def send_flow_control(self, channel_id, replenished_quota): 6482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._check_logical_channel_is_opened(channel_id) 6492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis flow_control = _create_flow_control(channel_id, replenished_quota) 6502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis payload = _encode_channel_id(_CONTROL_CHANNEL_ID) + flow_control 6512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # Replenish receive quota 6522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 6532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.acquire() 6542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels[channel_id].receive_quota += ( 6552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis replenished_quota) 6562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis finally: 6572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.release() 6582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._stream.send_binary(payload) 6592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def send_message(self, channel_id, message, end=True, binary=False): 6612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._check_logical_channel_is_opened(channel_id) 6622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if binary: 6642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis first_byte = (end << 7) | client_for_testing.OPCODE_BINARY 6652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis else: 6662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis first_byte = (end << 7) | client_for_testing.OPCODE_TEXT 6672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis message = message.encode('utf-8') 6682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 6702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.acquire() 6712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if self._logical_channels[channel_id].send_quota < len(message): 6722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Send quota violation: %d < %d' % ( 6732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels[channel_id].send_quota, 6742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis len(message))) 6752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels[channel_id].send_quota -= len(message) 6772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis finally: 6782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.release() 6792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis payload = _encode_channel_id(channel_id) + chr(first_byte) + message 6802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._stream.send_binary(payload) 6812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def assert_receive(self, channel_id, payload, binary=False): 6832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._check_logical_channel_is_opened(channel_id) 6842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 6862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis inner_frame = self._logical_channels[channel_id].queue.get( 6872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis timeout=self._timeout) 6882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis except Queue.Empty, e: 6892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Cannot receive message from channel id %d' % 6902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id) 6912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if binary: 6932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis opcode = client_for_testing.OPCODE_BINARY 6942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis else: 6952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis opcode = client_for_testing.OPCODE_TEXT 6962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if inner_frame.opcode != opcode: 6982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Unexpected opcode received (%r != %r)' % 6992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis (expected_opcode, inner_frame.opcode)) 7002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if inner_frame.payload != payload: 7022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Unexpected payload received') 7032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def send_close(self, channel_id, code=None, reason=''): 7052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._check_logical_channel_is_opened(channel_id) 7062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if code is not None: 7082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis body = struct.pack('!H', code) + reason.encode('utf-8') 7092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis else: 7102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis body = '' 7112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis first_byte = (1 << 7) | client_for_testing.OPCODE_CLOSE 7132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis payload = _encode_channel_id(channel_id) + chr(first_byte) + body 7142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._stream.send_binary(payload) 7152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def assert_receive_close(self, channel_id): 7172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._check_logical_channel_is_opened(channel_id) 7182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 7202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis inner_frame = self._logical_channels[channel_id].queue.get( 7212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis timeout=self._timeout) 7222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis except Queue.Empty, e: 7232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Cannot receive message from channel id %d' % 7242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id) 7252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if inner_frame.opcode != client_for_testing.OPCODE_CLOSE: 7262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Didn\'t receive close frame') 7272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def send_physical_connection_close(self, code=None, reason=''): 7292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._physical_connection_close_event = threading.Event() 7302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._stream.send_close(code, reason) 7312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # This method can be used only after calling 7332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # send_physical_connection_close(). 7342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def assert_physical_connection_receive_close( 7352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self, code=client_for_testing.STATUS_NORMAL_CLOSURE, reason=''): 7362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._physical_connection_close_event.wait(timeout=self._timeout) 7372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if (not self._physical_connection_close_event.isSet() or 7382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis not self._physical_connection_close_message): 7392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise Exception('Didn\'t receive closing handshake') 7402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def close_socket(self): 7422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._is_active = False 7432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._socket.close() 744