12da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# Copyright 2012, Google Inc. 22da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# All rights reserved. 32da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# 42da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# Redistribution and use in source and binary forms, with or without 52da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# modification, are permitted provided that the following conditions are 62da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# met: 72da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# 82da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# * Redistributions of source code must retain the above copyright 92da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# notice, this list of conditions and the following disclaimer. 102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# * Redistributions in binary form must reproduce the above 112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# copyright notice, this list of conditions and the following disclaimer 122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# in the documentation and/or other materials provided with the 132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# distribution. 142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# * Neither the name of Google Inc. nor the names of its 152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# contributors may be used to endorse or promote products derived from 162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# this software without specific prior written permission. 172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# 182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis"""This file provides classes and helper functions for multiplexing extension. 322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 332da489cd246702bee5938545b18a6f710ed214bcJamie GennisSpecification: 342da489cd246702bee5938545b18a6f710ed214bcJamie Gennishttp://tools.ietf.org/html/draft-ietf-hybi-websocket-multiplexing-03 352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis""" 362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 382da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport collections 392da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport copy 402da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport email 412da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport email.parser 422da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport logging 432da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport math 442da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport struct 452da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport threading 462da489cd246702bee5938545b18a6f710ed214bcJamie Gennisimport traceback 472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 482da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket import common 492da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket import handshake 502da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket import util 512da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket._stream_base import BadOperationException 522da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket._stream_base import ConnectionTerminatedException 532da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket._stream_hybi import Frame 542da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket._stream_hybi import Stream 552da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket._stream_hybi import StreamOptions 562da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket._stream_hybi import create_binary_frame 572da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket._stream_hybi import create_closing_handshake_body 582da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket._stream_hybi import create_header 592da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket._stream_hybi import parse_frame 602da489cd246702bee5938545b18a6f710ed214bcJamie Gennisfrom mod_pywebsocket.handshake import hybi 612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_CONTROL_CHANNEL_ID = 0 642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_DEFAULT_CHANNEL_ID = 1 652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_MUX_OPCODE_ADD_CHANNEL_REQUEST = 0 672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_MUX_OPCODE_ADD_CHANNEL_RESPONSE = 1 682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_MUX_OPCODE_FLOW_CONTROL = 2 692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_MUX_OPCODE_DROP_CHANNEL = 3 702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_MUX_OPCODE_NEW_CHANNEL_SLOT = 4 712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_MAX_CHANNEL_ID = 2 ** 29 - 1 732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_INITIAL_NUMBER_OF_CHANNEL_SLOTS = 64 752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_INITIAL_QUOTA_FOR_CLIENT = 8 * 1024 762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# We need only these status code for now. 782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis_HTTP_BAD_RESPONSE_MESSAGES = { 792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis common.HTTP_STATUS_BAD_REQUEST: 'Bad Request', 802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis} 812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 832da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass MuxUnexpectedException(Exception): 842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Exception in handling multiplexing extension.""" 852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis pass 862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# Temporary 892da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass MuxNotImplementedException(Exception): 902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Raised when a flow enters unimplemented code path.""" 912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis pass 922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 942da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass InvalidMuxFrameException(Exception): 952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Raised when an invalid multiplexed frame received.""" 962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis pass 972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 992da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass InvalidMuxControlBlockException(Exception): 1002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Raised when an invalid multiplexing control block received.""" 1012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis pass 1022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1042da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass LogicalConnectionClosedException(Exception): 1052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Raised when logical connection is gracefully closed.""" 1062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis pass 1072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1092da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _encode_channel_id(channel_id): 1102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if channel_id < 0: 1112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise ValueError('Channel id %d must not be negative' % channel_id) 1122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if channel_id < 2 ** 7: 1142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return chr(channel_id) 1152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if channel_id < 2 ** 14: 1162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return struct.pack('!H', 0x8000 + channel_id) 1172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if channel_id < 2 ** 21: 1182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis first = chr(0xc0 + (channel_id >> 16)) 1192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return first + struct.pack('!H', channel_id & 0xffff) 1202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if channel_id < 2 ** 29: 1212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return struct.pack('!L', 0xe0000000 + channel_id) 1222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise ValueError('Channel id %d is too large' % channel_id) 1242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1262da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _size_of_number_in_bytes_minus_1(number): 1272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # Calculate the minimum number of bytes minus 1 that are required to store 1282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # the data. 1292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if number < 0: 1302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise ValueError('Invalid number: %d' % number) 1312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif number < 2 ** 8: 1322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return 0 1332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif number < 2 ** 16: 1342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return 1 1352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif number < 2 ** 24: 1362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return 2 1372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif number < 2 ** 32: 1382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return 3 1392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis else: 1402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise ValueError('Invalid number %d' % number) 1412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1432da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _encode_number(number): 1442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if number < 2 ** 8: 1452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return chr(number) 1462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif number < 2 ** 16: 1472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return struct.pack('!H', number) 1482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif number < 2 ** 24: 1492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return chr(number >> 16) + struct.pack('!H', number & 0xffff) 1502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis else: 1512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return struct.pack('!L', number) 1522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1542da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _create_control_block_length_value(channel_id, opcode, flags, value): 1552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Creates a control block that consists of objective channel id, opcode, 1562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis flags, encoded length of opcode specific value, and the value. 1572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Most of control blocks have this structure. 1582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Args: 1602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id: objective channel id. 1612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis opcode: opcode of the control block. 1622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis flags: 3bit opcode specific flags. 1632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis value: opcode specific data. 1642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 1652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if channel_id < 0 or channel_id > _MAX_CHANNEL_ID: 1672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise ValueError('Invalid channel id: %d' % channel_id) 1682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if (opcode != _MUX_OPCODE_ADD_CHANNEL_REQUEST and 1692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis opcode != _MUX_OPCODE_ADD_CHANNEL_RESPONSE and 1702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis opcode != _MUX_OPCODE_DROP_CHANNEL): 1712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise ValueError('Invalid opcode: %d' % opcode) 1722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if flags < 0 or flags > 7: 1732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise ValueError('Invalid flags: %x' % flags) 1742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis length = len(value) 1752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if length < 0 or length > 2 ** 32 - 1: 1762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise ValueError('Invalid length: %d' % length) 1772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # The first byte consists of opcode, opcode specific flags, and size of 1792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # the size of value in bytes minus 1. 1802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis bytes_of_length = _size_of_number_in_bytes_minus_1(length) 1812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis first_byte = (opcode << 5) | (flags << 2) | bytes_of_length 1822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis encoded_length = _encode_number(length) 1842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return (chr(first_byte) + _encode_channel_id(channel_id) + 1862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis encoded_length + value) 1872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1892da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _create_add_channel_response(channel_id, encoded_handshake, 1902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis encoding=0, rejected=False, 1912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis outer_frame_mask=False): 1922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if encoding != 0 and encoding != 1: 1932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise ValueError('Invalid encoding %d' % encoding) 1942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 1952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis flags = (rejected << 2) | encoding 1962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis block = _create_control_block_length_value( 1972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id, _MUX_OPCODE_ADD_CHANNEL_RESPONSE, flags, encoded_handshake) 1982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis payload = _encode_channel_id(_CONTROL_CHANNEL_ID) + block 1992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return create_binary_frame(payload, mask=outer_frame_mask) 2002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2022da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _create_drop_channel(channel_id, reason='', mux_error=False, 2032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis outer_frame_mask=False): 2042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if not mux_error and len(reason) > 0: 2052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise ValueError('Reason must be empty if mux_error is False') 2062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis flags = mux_error << 2 2082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis block = _create_control_block_length_value( 2092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id, _MUX_OPCODE_DROP_CHANNEL, flags, reason) 2102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis payload = _encode_channel_id(_CONTROL_CHANNEL_ID) + block 2112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return create_binary_frame(payload, mask=outer_frame_mask) 2122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2142da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _create_flow_control(channel_id, replenished_quota, 2152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis outer_frame_mask=False): 2162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if replenished_quota < 0 or replenished_quota >= 2 ** 32: 2172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise ValueError('Invalid quota: %d' % replenished_quota) 2182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis first_byte = ((_MUX_OPCODE_FLOW_CONTROL << 5) | 2192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis _size_of_number_in_bytes_minus_1(replenished_quota)) 2202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis payload = (_encode_channel_id(_CONTROL_CHANNEL_ID) + chr(first_byte) + 2212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis _encode_channel_id(channel_id) + 2222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis _encode_number(replenished_quota)) 2232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return create_binary_frame(payload, mask=outer_frame_mask) 2242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2262da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _create_new_channel_slot(slots, send_quota, outer_frame_mask=False): 2272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if slots < 0 or slots >= 2 ** 32: 2282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise ValueError('Invalid number of slots: %d' % slots) 2292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if send_quota < 0 or send_quota >= 2 ** 32: 2302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise ValueError('Invalid send quota: %d' % send_quota) 2312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis slots_size = _size_of_number_in_bytes_minus_1(slots) 2322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis send_quota_size = _size_of_number_in_bytes_minus_1(send_quota) 2332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis first_byte = ((_MUX_OPCODE_NEW_CHANNEL_SLOT << 5) | 2352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis (slots_size << 2) | send_quota_size) 2362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis payload = (_encode_channel_id(_CONTROL_CHANNEL_ID) + chr(first_byte) + 2372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis _encode_number(slots) + _encode_number(send_quota)) 2382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return create_binary_frame(payload, mask=outer_frame_mask) 2392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2412da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef _parse_request_text(request_text): 2422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis request_line, header_lines = request_text.split('\r\n', 1) 2432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis words = request_line.split(' ') 2452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if len(words) != 3: 2462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise ValueError('Bad Request-Line syntax %r' % request_line) 2472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis [command, path, version] = words 2482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if version != 'HTTP/1.1': 2492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise ValueError('Bad request version %r' % version) 2502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # email.parser.Parser() parses RFC 2822 (RFC 822) style headers. 2522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # RFC 6455 refers RFC 2616 for handshake parsing, and RFC 2616 refers 2532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # RFC 822. 2542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis headers = email.parser.Parser().parsestr(header_lines) 2552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return command, path, version, headers 2562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2582da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _ControlBlock(object): 2592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """A structure that holds parsing result of multiplexing control block. 2602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Control block specific attributes will be added by _MuxFramePayloadParser. 2612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis (e.g. encoded_handshake will be added for AddChannelRequest and 2622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis AddChannelResponse) 2632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 2642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def __init__(self, opcode): 2662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.opcode = opcode 2672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2692da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _MuxFramePayloadParser(object): 2702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """A class that parses multiplexed frame payload.""" 2712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def __init__(self, payload): 2732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._data = payload 2742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._read_position = 0 2752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger = util.get_class_logger(self) 2762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def read_channel_id(self): 2782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Reads channel id. 2792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Raises: 2812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis InvalidMuxFrameException: when the payload doesn't contain 2822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis valid channel id. 2832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 2842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis remaining_length = len(self._data) - self._read_position 2862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis pos = self._read_position 2872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if remaining_length == 0: 2882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise InvalidMuxFrameException('No channel id found') 2892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 2902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id = ord(self._data[pos]) 2912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id_length = 1 2922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if channel_id & 0xe0 == 0xe0: 2932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if remaining_length < 4: 2942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise InvalidMuxFrameException( 2952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'Invalid channel id format') 2962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id = struct.unpack('!L', 2972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._data[pos:pos+4])[0] & 0x1fffffff 2982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id_length = 4 2992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif channel_id & 0xc0 == 0xc0: 3002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if remaining_length < 3: 3012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise InvalidMuxFrameException( 3022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'Invalid channel id format') 3032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id = (((channel_id & 0x1f) << 16) + 3042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis struct.unpack('!H', self._data[pos+1:pos+3])[0]) 3052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id_length = 3 3062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif channel_id & 0x80 == 0x80: 3072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if remaining_length < 2: 3082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise InvalidMuxFrameException( 3092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'Invalid channel id format') 3102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id = struct.unpack('!H', 3112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._data[pos:pos+2])[0] & 0x3fff 3122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id_length = 2 3132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._read_position += channel_id_length 3142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return channel_id 3162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def read_inner_frame(self): 3182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Reads an inner frame. 3192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Raises: 3212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis InvalidMuxFrameException: when the inner frame is invalid. 3222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 3232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if len(self._data) == self._read_position: 3252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise InvalidMuxFrameException('No inner frame bits found') 3262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis bits = ord(self._data[self._read_position]) 3272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._read_position += 1 3282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis fin = (bits & 0x80) == 0x80 3292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis rsv1 = (bits & 0x40) == 0x40 3302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis rsv2 = (bits & 0x20) == 0x20 3312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis rsv3 = (bits & 0x10) == 0x10 3322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis opcode = bits & 0xf 3332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis payload = self.remaining_data() 3342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # Consume rest of the message which is payload data of the original 3352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # frame. 3362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._read_position = len(self._data) 3372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return fin, rsv1, rsv2, rsv3, opcode, payload 3382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _read_number(self, size): 3402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if self._read_position + size > len(self._data): 3412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise InvalidMuxControlBlock( 3422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'Cannot read %d byte(s) number' % size) 3432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis pos = self._read_position 3452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if size == 1: 3462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._read_position += 1 3472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return ord(self._data[pos]) 3482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif size == 2: 3492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._read_position += 2 3502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return struct.unpack('!H', self._data[pos:pos+2])[0] 3512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif size == 3: 3522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._read_position += 3 3532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return ((ord(self._data[pos]) << 16) 3542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis + struct.unpack('!H', self._data[pos+1:pos+3])[0]) 3552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif size == 4: 3562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._read_position += 4 3572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return struct.unpack('!L', self._data[pos:pos+4])[0] 3582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis else: 3592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise InvalidMuxControlBlockException( 3602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'Cannot read %d byte(s) number' % size) 3612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _read_opcode_specific_data(self, opcode, size_of_size): 3632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Reads opcode specific data that consists of followings: 3642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis - the size of the opcode specific data (1-4 bytes) 3652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis - the opcode specific data 3662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis AddChannelRequest and DropChannel have this structure. 3672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 3682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if self._read_position + size_of_size > len(self._data): 3702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise InvalidMuxControlBlockException( 3712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'No size field for opcode %d' % opcode) 3722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis size = self._read_number(size_of_size) 3742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis pos = self._read_position 3762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if pos + size > len(self._data): 3772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise InvalidMuxControlBlockException( 3782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'No data field for opcode %d (%d + %d > %d)' % 3792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis (opcode, pos, size, len(self._data))) 3802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis specific_data = self._data[pos:pos+size] 3822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._read_position += size 3832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return specific_data 3842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _read_add_channel_request(self, first_byte, control_block): 3862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis reserved = (first_byte >> 4) & 0x1 3872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis encoding = (first_byte >> 2) & 0x3 3882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis size_of_handshake_size = (first_byte & 0x3) + 1 3892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis control_block.channel_id = self.read_channel_id() 3912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis encoded_handshake = self._read_opcode_specific_data( 3922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis _MUX_OPCODE_ADD_CHANNEL_REQUEST, 3932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis size_of_handshake_size) 3942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis control_block.encoding = encoding 3952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis control_block.encoded_handshake = encoded_handshake 3962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return control_block 3972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 3982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _read_flow_control(self, first_byte, control_block): 3992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis quota_size = (first_byte & 0x3) + 1 4002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis control_block.channel_id = self.read_channel_id() 4012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis control_block.send_quota = self._read_number(quota_size) 4022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return control_block 4032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _read_drop_channel(self, first_byte, control_block): 4052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis mux_error = (first_byte >> 4) & 0x1 4062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis reserved = (first_byte >> 2) & 0x3 4072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis size_of_reason_size = (first_byte & 0x3) + 1 4082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis control_block.channel_id = self.read_channel_id() 4102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis reason = self._read_opcode_specific_data( 4112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis _MUX_OPCODE_ADD_CHANNEL_RESPONSE, 4122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis size_of_reason_size) 4132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if mux_error and len(reason) > 0: 4142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise InvalidMuxControlBlockException( 4152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'Reason must be empty when F bit is set') 4162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis control_block.mux_error = mux_error 4172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis control_block.reason = reason 4182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return control_block 4192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _read_new_channel_slot(self, first_byte, control_block): 4212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # TODO(bashi): Implement 4222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise MuxNotImplementedException('NewChannelSlot is not implemented') 4232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def read_control_blocks(self): 4252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Reads control block(s). 4262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Raises: 4282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis InvalidMuxControlBlock: when the payload contains invalid control 4292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis block(s). 4302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis StopIteration: when no control blocks left. 4312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 4322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis while self._read_position < len(self._data): 4342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if self._read_position >= len(self._data): 4352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise InvalidMuxControlBlockException( 4362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'No control opcode found') 4372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis first_byte = ord(self._data[self._read_position]) 4382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._read_position += 1 4392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis opcode = (first_byte >> 5) & 0x7 4402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis control_block = _ControlBlock(opcode=opcode) 4412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if opcode == _MUX_OPCODE_ADD_CHANNEL_REQUEST: 4422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis yield self._read_add_channel_request(first_byte, control_block) 4432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif opcode == _MUX_OPCODE_FLOW_CONTROL: 4442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis yield self._read_flow_control(first_byte, control_block) 4452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif opcode == _MUX_OPCODE_DROP_CHANNEL: 4462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis yield self._read_drop_channel(first_byte, control_block) 4472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT: 4482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis yield self._read_new_channel_slot(first_byte, control_block) 4492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis else: 4502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise InvalidMuxControlBlockException( 4512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'Invalid opcode %d' % opcode) 4522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis assert self._read_position == len(self._data) 4532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise StopIteration 4542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def remaining_data(self): 4562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Returns remaining data.""" 4572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return self._data[self._read_position:] 4592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4612da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _LogicalRequest(object): 4622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Mimics mod_python request.""" 4632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def __init__(self, channel_id, command, path, headers, connection): 4652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Constructs an instance. 4662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Args: 4682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id: the channel id of the logical channel. 4692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis command: HTTP request command. 4702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis path: HTTP request path. 4712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis headers: HTTP headers. 4722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis connection: _LogicalConnection instance. 4732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 4742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.channel_id = channel_id 4762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.method = command 4772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.uri = path 4782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.headers_in = headers 4792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.connection = connection 4802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.server_terminated = False 4812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.client_terminated = False 4822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def is_https(self): 4842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Mimics request.is_https(). Returns False because this method is 4852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis used only by old protocols (hixie and hybi00). 4862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 4872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return False 4892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4912da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _LogicalConnection(object): 4922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Mimics mod_python mp_conn.""" 4932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # For details, see the comment of set_read_state(). 4952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis STATE_ACTIVE = 1 4962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis STATE_GRACEFULLY_CLOSED = 2 4972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis STATE_TERMINATED = 3 4982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 4992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def __init__(self, mux_handler, channel_id): 5002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Constructs an instance. 5012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Args: 5032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis mux_handler: _MuxHandler instance. 5042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id: channel id of this connection. 5052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 5062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._mux_handler = mux_handler 5082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._channel_id = channel_id 5092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._incoming_data = '' 5102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._write_condition = threading.Condition() 5112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._waiting_write_completion = False 5122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._read_condition = threading.Condition() 5132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._read_state = self.STATE_ACTIVE 5142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def get_local_addr(self): 5162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Getter to mimic mp_conn.local_addr.""" 5172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return self._mux_handler.physical_connection.get_local_addr() 5192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis local_addr = property(get_local_addr) 5202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def get_remote_addr(self): 5222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Getter to mimic mp_conn.remote_addr.""" 5232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return self._mux_handler.physical_connection.get_remote_addr() 5252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis remote_addr = property(get_remote_addr) 5262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def get_memorized_lines(self): 5282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Gets memorized lines. Not supported.""" 5292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise MuxUnexpectedException('_LogicalConnection does not support ' 5312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'get_memorized_lines') 5322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def write(self, data): 5342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Writes data. mux_handler sends data asynchronously. The caller will 5352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis be suspended until write done. 5362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Args: 5382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis data: data to be written. 5392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Raises: 5412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis MuxUnexpectedException: when called before finishing the previous 5422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis write. 5432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 5442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 5462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._write_condition.acquire() 5472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if self._waiting_write_completion: 5482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise MuxUnexpectedException( 5492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'Logical connection %d is already waiting the completion ' 5502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'of write' % self._channel_id) 5512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._waiting_write_completion = True 5532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._mux_handler.send_data(self._channel_id, data) 5542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._write_condition.wait() 5552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis finally: 5562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._write_condition.release() 5572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def write_control_data(self, data): 5592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Writes data via the control channel. Don't wait finishing write 5602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis because this method can be called by mux dispatcher. 5612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Args: 5632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis data: data to be written. 5642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 5652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._mux_handler.send_control_data(data) 5672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def notify_write_done(self): 5692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Called when sending data is completed.""" 5702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 5722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._write_condition.acquire() 5732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if not self._waiting_write_completion: 5742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise MuxUnexpectedException( 5752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'Invalid call of notify_write_done for logical connection' 5762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis ' %d' % self._channel_id) 5772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._waiting_write_completion = False 5782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._write_condition.notify() 5792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis finally: 5802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._write_condition.release() 5812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def append_frame_data(self, frame_data): 5832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Appends incoming frame data. Called when mux_handler dispatches 5842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis frame data to the corresponding application. 5852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Args: 5872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis frame_data: incoming frame data. 5882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 5892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._read_condition.acquire() 5912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._incoming_data += frame_data 5922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._read_condition.notify() 5932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._read_condition.release() 5942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def read(self, length): 5962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Reads data. Blocks until enough data has arrived via physical 5972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis connection. 5982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 5992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Args: 6002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis length: length of data to be read. 6012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Raises: 6022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis LogicalConnectionClosedException: when closing handshake for this 6032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis logical channel has been received. 6042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis ConnectionTerminatedException: when the physical connection has 6052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis closed, or an error is caused on the reader thread. 6062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 6072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._read_condition.acquire() 6092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis while (self._read_state == self.STATE_ACTIVE and 6102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis len(self._incoming_data) < length): 6112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._read_condition.wait() 6122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 6142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if self._read_state == self.STATE_GRACEFULLY_CLOSED: 6152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise LogicalConnectionClosedException( 6162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'Logical channel %d has closed.' % self._channel_id) 6172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif self._read_state == self.STATE_TERMINATED: 6182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise ConnectionTerminatedException( 6192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'Receiving %d byte failed. Logical channel (%d) closed' % 6202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis (length, self._channel_id)) 6212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis value = self._incoming_data[:length] 6232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._incoming_data = self._incoming_data[length:] 6242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis finally: 6252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._read_condition.release() 6262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return value 6282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def set_read_state(self, new_state): 6302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Sets the state of this connection. Called when an event for this 6312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis connection has occurred. 6322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Args: 6342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis new_state: state to be set. new_state must be one of followings: 6352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis - STATE_GRACEFULLY_CLOSED: when closing handshake for this 6362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis connection has been received. 6372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis - STATE_TERMINATED: when the physical connection has closed or 6382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis DropChannel of this connection has received. 6392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 6402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._read_condition.acquire() 6422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._read_state = new_state 6432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._read_condition.notify() 6442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._read_condition.release() 6452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6472da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _LogicalStream(Stream): 6482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Mimics the Stream class. This class interprets multiplexed WebSocket 6492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis frames. 6502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 6512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def __init__(self, request, send_quota, receive_quota): 6532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Constructs an instance. 6542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Args: 6562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis request: _LogicalRequest instance. 6572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis send_quota: Initial send quota. 6582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis receive_quota: Initial receive quota. 6592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 6602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # TODO(bashi): Support frame filters. 6622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis stream_options = StreamOptions() 6632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # Physical stream is responsible for masking. 6642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis stream_options.unmask_receive = False 6652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # Control frames can be fragmented on logical channel. 6662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis stream_options.allow_fragmented_control_frame = True 6672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Stream.__init__(self, request, stream_options) 6682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._send_quota = send_quota 6692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._send_quota_condition = threading.Condition() 6702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._receive_quota = receive_quota 6712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._write_inner_frame_semaphore = threading.Semaphore() 6722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _create_inner_frame(self, opcode, payload, end=True): 6742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # TODO(bashi): Support extensions that use reserved bits. 6752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis first_byte = (end << 7) | opcode 6762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return (_encode_channel_id(self._request.channel_id) + 6772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis chr(first_byte) + payload) 6782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _write_inner_frame(self, opcode, payload, end=True): 6802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis payload_length = len(payload) 6812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis write_position = 0 6822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 6832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 6842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # An inner frame will be fragmented if there is no enough send 6852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # quota. This semaphore ensures that fragmented inner frames are 6862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # sent in order on the logical channel. 6872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # Note that frames that come from other logical channels or 6882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # multiplexing control blocks can be inserted between fragmented 6892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # inner frames on the physical channel. 6902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._write_inner_frame_semaphore.acquire() 6912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis while write_position < payload_length: 6922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 6932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._send_quota_condition.acquire() 6942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis while self._send_quota == 0: 6952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug( 6962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'No quota. Waiting FlowControl message for %d.' % 6972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._request.channel_id) 6982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._send_quota_condition.wait() 6992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis remaining = payload_length - write_position 7012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis write_length = min(self._send_quota, remaining) 7022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis inner_frame_end = ( 7032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis end and 7042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis (write_position + write_length == payload_length)) 7052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis inner_frame = self._create_inner_frame( 7072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis opcode, 7082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis payload[write_position:write_position+write_length], 7092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis inner_frame_end) 7102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis frame_data = self._writer.build( 7112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis inner_frame, end=True, binary=True) 7122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._send_quota -= write_length 7132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug('Consumed quota=%d, remaining=%d' % 7142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis (write_length, self._send_quota)) 7152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis finally: 7162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._send_quota_condition.release() 7172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # Writing data will block the worker so we need to release 7192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # _send_quota_condition before writing. 7202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug('Sending inner frame: %r' % frame_data) 7212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._request.connection.write(frame_data) 7222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis write_position += write_length 7232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis opcode = common.OPCODE_CONTINUATION 7252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis except ValueError, e: 7272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise BadOperationException(e) 7282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis finally: 7292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._write_inner_frame_semaphore.release() 7302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def replenish_send_quota(self, send_quota): 7322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Replenish send quota.""" 7332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._send_quota_condition.acquire() 7352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._send_quota += send_quota 7362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug('Replenished send quota for channel id %d: %d' % 7372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis (self._request.channel_id, self._send_quota)) 7382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._send_quota_condition.notify() 7392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._send_quota_condition.release() 7402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def consume_receive_quota(self, amount): 7422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Consumes receive quota. Returns False on failure.""" 7432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if self._receive_quota < amount: 7452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug('Violate quota on channel id %d: %d < %d' % 7462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis (self._request.channel_id, 7472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._receive_quota, amount)) 7482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return False 7492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._receive_quota -= amount 7502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return True 7512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def send_message(self, message, end=True, binary=False): 7532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Override Stream.send_message.""" 7542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if self._request.server_terminated: 7562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise BadOperationException( 7572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'Requested send_message after sending out a closing handshake') 7582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if binary and isinstance(message, unicode): 7602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise BadOperationException( 7612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'Message for binary frame must be instance of str') 7622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if binary: 7642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis opcode = common.OPCODE_BINARY 7652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis else: 7662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis opcode = common.OPCODE_TEXT 7672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis message = message.encode('utf-8') 7682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._write_inner_frame(opcode, message, end) 7702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _receive_frame(self): 7722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Overrides Stream._receive_frame. 7732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis In addition to call Stream._receive_frame, this method adds the amount 7752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis of payload to receiving quota and sends FlowControl to the client. 7762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis We need to do it here because Stream.receive_message() handles 7772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis control frames internally. 7782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 7792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis opcode, payload, fin, rsv1, rsv2, rsv3 = Stream._receive_frame(self) 7812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis amount = len(payload) 7822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._receive_quota += amount 7832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis frame_data = _create_flow_control(self._request.channel_id, 7842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis amount) 7852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug('Sending flow control for %d, replenished=%d' % 7862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis (self._request.channel_id, amount)) 7872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._request.connection.write_control_data(frame_data) 7882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return opcode, payload, fin, rsv1, rsv2, rsv3 7892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def receive_message(self): 7912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Overrides Stream.receive_message.""" 7922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 7932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # Just call Stream.receive_message(), but catch 7942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # LogicalConnectionClosedException, which is raised when the logical 7952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # connection has closed gracefully. 7962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 7972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return Stream.receive_message(self) 7982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis except LogicalConnectionClosedException, e: 7992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug('%s', e) 8002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return None 8012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 8022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _send_closing_handshake(self, code, reason): 8032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Overrides Stream._send_closing_handshake.""" 8042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 8052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis body = create_closing_handshake_body(code, reason) 8062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug('Sending closing handshake for %d: (%r, %r)' % 8072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis (self._request.channel_id, code, reason)) 8082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._write_inner_frame(common.OPCODE_CLOSE, body, end=True) 8092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 8102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._request.server_terminated = True 8112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 8122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def send_ping(self, body=''): 8132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Overrides Stream.send_ping""" 8142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 8152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug('Sending ping on logical channel %d: %r' % 8162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis (self._request.channel_id, body)) 8172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._write_inner_frame(common.OPCODE_PING, body, end=True) 8182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 8192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._ping_queue.append(body) 8202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 8212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _send_pong(self, body): 8222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Overrides Stream._send_pong""" 8232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 8242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug('Sending pong on logical channel %d: %r' % 8252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis (self._request.channel_id, body)) 8262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._write_inner_frame(common.OPCODE_PONG, body, end=True) 8272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 8282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def close_connection(self, code=common.STATUS_NORMAL_CLOSURE, reason=''): 8292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Overrides Stream.close_connection.""" 8302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 8312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # TODO(bashi): Implement 8322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug('Closing logical connection %d' % 8332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._request.channel_id) 8342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._request.server_terminated = True 8352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 8362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _drain_received_data(self): 8372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Overrides Stream._drain_received_data. Nothing need to be done for 8382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis logical channel. 8392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 8402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 8412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis pass 8422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 8432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 8442da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _OutgoingData(object): 8452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """A structure that holds data to be sent via physical connection and 8462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis origin of the data. 8472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 8482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 8492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def __init__(self, channel_id, data): 8502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.channel_id = channel_id 8512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.data = data 8522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 8532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 8542da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _PhysicalConnectionWriter(threading.Thread): 8552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """A thread that is responsible for writing data to physical connection. 8562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 8572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis TODO(bashi): Make sure there is no thread-safety problem when the reader 8582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis thread reads data from the same socket at a time. 8592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 8602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 8612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def __init__(self, mux_handler): 8622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Constructs an instance. 8632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 8642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Args: 8652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis mux_handler: _MuxHandler instance. 8662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 8672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 8682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis threading.Thread.__init__(self) 8692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger = util.get_class_logger(self) 8702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._mux_handler = mux_handler 8712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.setDaemon(True) 8722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._stop_requested = False 8732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._deque = collections.deque() 8742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._deque_condition = threading.Condition() 8752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 8762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def put_outgoing_data(self, data): 8772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Puts outgoing data. 8782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 8792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Args: 8802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis data: _OutgoingData instance. 8812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 8822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Raises: 8832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis BadOperationException: when the thread has been requested to 8842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis terminate. 8852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 8862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 8872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 8882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._deque_condition.acquire() 8892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if self._stop_requested: 8902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise BadOperationException('Cannot write data anymore') 8912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 8922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._deque.append(data) 8932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._deque_condition.notify() 8942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis finally: 8952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._deque_condition.release() 8962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 8972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _write_data(self, outgoing_data): 8982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 8992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._mux_handler.physical_connection.write(outgoing_data.data) 9002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis except Exception, e: 9012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis util.prepend_message_to_exception( 9022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'Failed to send message to %r: ' % 9032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis (self._mux_handler.physical_connection.remote_addr,), e) 9042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise 9052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 9062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # TODO(bashi): It would be better to block the thread that sends 9072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # control data as well. 9082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if outgoing_data.channel_id != _CONTROL_CHANNEL_ID: 9092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._mux_handler.notify_write_done(outgoing_data.channel_id) 9102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 9112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def run(self): 9122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._deque_condition.acquire() 9132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis while not self._stop_requested: 9142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if len(self._deque) == 0: 9152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._deque_condition.wait() 9162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis continue 9172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 9182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis outgoing_data = self._deque.popleft() 9192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._deque_condition.release() 9202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._write_data(outgoing_data) 9212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._deque_condition.acquire() 9222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 9232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # Flush deque 9242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 9252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis while len(self._deque) > 0: 9262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis outgoing_data = self._deque.popleft() 9272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._write_data(outgoing_data) 9282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis finally: 9292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._deque_condition.release() 9302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 9312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def stop(self): 9322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Stops the writer thread.""" 9332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 9342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._deque_condition.acquire() 9352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._stop_requested = True 9362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._deque_condition.notify() 9372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._deque_condition.release() 9382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 9392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 9402da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _PhysicalConnectionReader(threading.Thread): 9412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """A thread that is responsible for reading data from physical connection. 9422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 9432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 9442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def __init__(self, mux_handler): 9452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Constructs an instance. 9462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 9472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Args: 9482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis mux_handler: _MuxHandler instance. 9492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 9502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 9512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis threading.Thread.__init__(self) 9522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger = util.get_class_logger(self) 9532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._mux_handler = mux_handler 9542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.setDaemon(True) 9552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 9562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def run(self): 9572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis while True: 9582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 9592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis physical_stream = self._mux_handler.physical_stream 9602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis message = physical_stream.receive_message() 9612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if message is None: 9622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis break 9632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis opcode = physical_stream.get_last_received_opcode() 9642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if opcode == common.OPCODE_TEXT: 9652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise MuxUnexpectedException( 9662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'Received a text message on physical connection') 9672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis except ConnectionTerminatedException, e: 9682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug('%s', e) 9692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis break 9702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 9712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 9722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._mux_handler.dispatch_message(message) 9732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis except Exception, e: 9742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug(traceback.format_exc()) 9752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis break 9762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 9772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._mux_handler.notify_reader_done() 9782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 9792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 9802da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _Worker(threading.Thread): 9812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """A thread that is responsible for running the corresponding application 9822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis handler. 9832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 9842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 9852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def __init__(self, mux_handler, request): 9862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Constructs an instance. 9872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 9882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Args: 9892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis mux_handler: _MuxHandler instance. 9902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis request: _LogicalRequest instance. 9912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 9922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 9932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis threading.Thread.__init__(self) 9942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger = util.get_class_logger(self) 9952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._mux_handler = mux_handler 9962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._request = request 9972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.setDaemon(True) 9982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 9992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def run(self): 10002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug('Logical channel worker started. (id=%d)' % 10012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._request.channel_id) 10022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 10032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # Non-critical exceptions will be handled by dispatcher. 10042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._mux_handler.dispatcher.transfer_data(self._request) 10052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis finally: 10062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._mux_handler.notify_worker_done(self._request.channel_id) 10072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 10082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 10092da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _MuxHandshaker(hybi.Handshaker): 10102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Opening handshake processor for multiplexing.""" 10112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 10122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def __init__(self, request, dispatcher, send_quota, receive_quota): 10132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Constructs an instance. 10142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Args: 10152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis request: _LogicalRequest instance. 10162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis dispatcher: Dispatcher instance (dispatch.Dispatcher). 10172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis send_quota: Initial send quota. 10182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis receive_quota: Initial receive quota. 10192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 10202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 10212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis hybi.Handshaker.__init__(self, request, dispatcher) 10222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._send_quota = send_quota 10232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._receive_quota = receive_quota 10242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 10252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _create_stream(self, stream_options): 10262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Override hybi.Handshaker._create_stream.""" 10272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 10282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug('Creating logical stream for %d' % 10292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._request.channel_id) 10302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return _LogicalStream(self._request, self._send_quota, 10312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._receive_quota) 10322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 10332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _send_handshake(self, accept): 10342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Override hybi.Handshaker._send_handshake.""" 10352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 10362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # Don't send handshake response for the default channel 10372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if self._request.channel_id == _DEFAULT_CHANNEL_ID: 10382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return 10392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 10402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis handshake_response = self._create_handshake_response(accept) 10412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis frame_data = _create_add_channel_response( 10422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._request.channel_id, 10432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis handshake_response) 10442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug('Sending handshake response for %d: %r' % 10452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis (self._request.channel_id, frame_data)) 10462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._request.connection.write_control_data(frame_data) 10472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 10482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 10492da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _LogicalChannelData(object): 10502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """A structure that holds information about logical channel. 10512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 10522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 10532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def __init__(self, request, worker): 10542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.request = request 10552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.worker = worker 10562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.mux_error_occurred = False 10572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.mux_error_reason = '' 10582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 10592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 10602da489cd246702bee5938545b18a6f710ed214bcJamie Gennisclass _MuxHandler(object): 10612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Multiplexing handler. When a handler starts, it launches three 10622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis threads; the reader thread, the writer thread, and a worker thread. 10632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 10642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis The reader thread reads data from the physical stream, i.e., the 10652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis ws_stream object of the underlying websocket connection. The reader 10662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis thread interprets multiplexed frames and dispatches them to logical 10672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channels. Methods of this class are mostly called by the reader thread. 10682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 10692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis The writer thread sends multiplexed frames which are created by 10702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis logical channels via the physical connection. 10712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 10722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis The worker thread launched at the starting point handles the 10732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis "Implicitly Opened Connection". If multiplexing handler receives 10742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis an AddChannelRequest and accepts it, the handler will launch a new worker 10752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis thread and dispatch the request to it. 10762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 10772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 10782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def __init__(self, request, dispatcher): 10792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Constructs an instance. 10802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 10812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Args: 10822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis request: mod_python request of the physical connection. 10832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis dispatcher: Dispatcher instance (dispatch.Dispatcher). 10842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 10852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 10862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.original_request = request 10872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.dispatcher = dispatcher 10882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.physical_connection = request.connection 10892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.physical_stream = request.ws_stream 10902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger = util.get_class_logger(self) 10912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels = {} 10922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition = threading.Condition() 10932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # Holds client's initial quota 10942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._channel_slots = collections.deque() 10952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._worker_done_notify_received = False 10962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._reader = None 10972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._writer = None 10982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 10992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def start(self): 11002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Starts the handler. 11012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 11022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Raises: 11032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis MuxUnexpectedException: when the handler already started, or when 11042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis opening handshake of the default channel fails. 11052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 11062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 11072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if self._reader or self._writer: 11082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise MuxUnexpectedException('MuxHandler already started') 11092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 11102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._reader = _PhysicalConnectionReader(self) 11112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._writer = _PhysicalConnectionWriter(self) 11122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._reader.start() 11132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._writer.start() 11142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 11152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # Create "Implicitly Opened Connection". 11162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis logical_connection = _LogicalConnection(self, _DEFAULT_CHANNEL_ID) 11172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis headers_in = copy.copy(self.original_request.headers_in) 11182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # TODO(bashi): Support extensions 11192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis headers_in['Sec-WebSocket-Extensions'] = '' 11202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis logical_request = _LogicalRequest(_DEFAULT_CHANNEL_ID, 11212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.original_request.method, 11222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.original_request.uri, 11232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis headers_in, 11242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis logical_connection) 11252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # Client's send quota for the implicitly opened connection is zero, 11262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # but we will send FlowControl later so set the initial quota to 11272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # _INITIAL_QUOTA_FOR_CLIENT. 11282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._channel_slots.append(_INITIAL_QUOTA_FOR_CLIENT) 11292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if not self._do_handshake_for_logical_request( 11302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis logical_request, send_quota=self.original_request.mux_quota): 11312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise MuxUnexpectedException( 11322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'Failed handshake on the default channel id') 11332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._add_logical_channel(logical_request) 11342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 11352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # Send FlowControl for the implicitly opened connection. 11362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis frame_data = _create_flow_control(_DEFAULT_CHANNEL_ID, 11372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis _INITIAL_QUOTA_FOR_CLIENT) 11382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis logical_request.connection.write_control_data(frame_data) 11392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 11402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def add_channel_slots(self, slots, send_quota): 11412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Adds channel slots. 11422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 11432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Args: 11442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis slots: number of slots to be added. 11452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis send_quota: initial send quota for slots. 11462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 11472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 11482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._channel_slots.extend([send_quota] * slots) 11492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # Send NewChannelSlot to client. 11502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis frame_data = _create_new_channel_slot(slots, send_quota) 11512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.send_control_data(frame_data) 11522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 11532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def wait_until_done(self, timeout=None): 11542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Waits until all workers are done. Returns False when timeout has 11552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis occurred. Returns True on success. 11562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 11572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Args: 11582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis timeout: timeout in sec. 11592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 11602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 11612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.acquire() 11622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 11632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis while len(self._logical_channels) > 0: 11642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug('Waiting workers(%d)...' % 11652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis len(self._logical_channels)) 11662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._worker_done_notify_received = False 11672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.wait(timeout) 11682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if not self._worker_done_notify_received: 11692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug('Waiting worker(s) timed out') 11702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return False 11712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 11722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis finally: 11732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.release() 11742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 11752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # Flush pending outgoing data 11762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._writer.stop() 11772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._writer.join() 11782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 11792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return True 11802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 11812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def notify_write_done(self, channel_id): 11822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Called by the writer thread when a write operation has done. 11832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 11842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Args: 11852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id: objective channel id. 11862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 11872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 11882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 11892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.acquire() 11902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if channel_id in self._logical_channels: 11912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_data = self._logical_channels[channel_id] 11922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_data.request.connection.notify_write_done() 11932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis else: 11942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug('Seems that logical channel for %d has gone' 11952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis % channel_id) 11962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis finally: 11972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.release() 11982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 11992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def send_control_data(self, data): 12002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Sends data via the control channel. 12012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 12022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Args: 12032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis data: data to be sent. 12042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 12052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 12062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._writer.put_outgoing_data(_OutgoingData( 12072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id=_CONTROL_CHANNEL_ID, data=data)) 12082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 12092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def send_data(self, channel_id, data): 12102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Sends data via given logical channel. This method is called by 12112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis worker threads. 12122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 12132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Args: 12142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id: objective channel id. 12152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis data: data to be sent. 12162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 12172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 12182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._writer.put_outgoing_data(_OutgoingData( 12192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id=channel_id, data=data)) 12202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 12212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _send_drop_channel(self, channel_id, reason='', mux_error=False): 12222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis frame_data = _create_drop_channel(channel_id, reason, mux_error) 12232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug( 12242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'Sending drop channel for channel id %d' % channel_id) 12252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.send_control_data(frame_data) 12262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 12272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _send_error_add_channel_response(self, channel_id, status=None): 12282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if status is None: 12292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis status = common.HTTP_STATUS_BAD_REQUEST 12302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 12312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if status in _HTTP_BAD_RESPONSE_MESSAGES: 12322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis message = _HTTP_BAD_RESPONSE_MESSAGES[status] 12332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis else: 12342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug('Response message for %d is not found' % status) 12352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis message = '???' 12362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 12372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis response = 'HTTP/1.1 %d %s\r\n\r\n' % (status, message) 12382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis frame_data = _create_add_channel_response(channel_id, 12392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis encoded_handshake=response, 12402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis encoding=0, rejected=True) 12412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self.send_control_data(frame_data) 12422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 12432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _create_logical_request(self, block): 12442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if block.channel_id == _CONTROL_CHANNEL_ID: 12452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise MuxUnexpectedException( 12462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'Received the control channel id (0) as objective channel ' 12472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'id for AddChannel') 12482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 12492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if block.encoding != 0: 12502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise MuxNotImplementedException( 12512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'delta-encoding not supported yet') 12522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis connection = _LogicalConnection(self, block.channel_id) 12532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis command, path, version, headers = _parse_request_text( 12542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis block.encoded_handshake) 12552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis request = _LogicalRequest(block.channel_id, command, path, 12562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis headers, connection) 12572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 12582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return request 12592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 12602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _do_handshake_for_logical_request(self, request, send_quota=0): 12612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 12622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis receive_quota = self._channel_slots.popleft() 12632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis except IndexError: 12642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise MuxUnexpectedException('No room in channel pool') 12652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 12662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis handshaker = _MuxHandshaker(request, self.dispatcher, 12672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis send_quota, receive_quota) 12682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 12692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis handshaker.do_handshake() 12702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis except handshake.VersionException, e: 12712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.info('%s', e) 12722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._send_error_add_channel_response( 12732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis block.channel_id, status=common.HTTP_STATUS_BAD_REQUEST) 12742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return False 12752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis except handshake.HandshakeException, e: 12762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.info('%s', e) 12772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._send_error_add_channel_response(request.channel_id, 12782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis status=e.status) 12792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return False 12802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis except handshake.AbortedByUserException, e: 12812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.info('%s', e) 12822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._send_error_add_channel_response(request.channel_id) 12832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return False 12842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 12852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return True 12862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 12872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _add_logical_channel(self, logical_request): 12882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 12892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.acquire() 12902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if logical_request.channel_id in self._logical_channels: 12912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise MuxUnexpectedException('Channel id %d already exists' % 12922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis logical_request.channel_id) 12932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis worker = _Worker(self, logical_request) 12942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_data = _LogicalChannelData(logical_request, worker) 12952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels[logical_request.channel_id] = channel_data 12962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis worker.start() 12972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis finally: 12982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.release() 12992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 13002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _process_add_channel_request(self, block): 13012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 13022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis logical_request = self._create_logical_request(block) 13032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis except ValueError, e: 13042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug('Failed to create logical request: %r' % e) 13052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._send_error_add_channel_response( 13062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis block.channel_id, status=common.HTTP_STATUS_BAD_REQUEST) 13072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return 13082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if self._do_handshake_for_logical_request(logical_request): 13092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._add_logical_channel(logical_request) 13102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis else: 13112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._send_error_add_channel_response( 13122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis block.channel_id, status=common.HTTP_STATUS_BAD_REQUEST) 13132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 13142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _process_flow_control(self, block): 13152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 13162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.acquire() 13172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if not block.channel_id in self._logical_channels: 13182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return 13192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_data = self._logical_channels[block.channel_id] 13202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_data.request.ws_stream.replenish_send_quota( 13212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis block.send_quota) 13222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis finally: 13232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.release() 13242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 13252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _process_drop_channel(self, block): 13262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug('DropChannel received for %d: reason=%r' % 13272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis (block.channel_id, block.reason)) 13282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 13292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.acquire() 13302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if not block.channel_id in self._logical_channels: 13312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return 13322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_data = self._logical_channels[block.channel_id] 13332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if not block.mux_error: 13342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_data.request.connection.set_read_state( 13352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis _LogicalConnection.STATE_TERMINATED) 13362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis else: 13372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # TODO(bashi): What should we do? 13382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_data.request.connection.set_read_state( 13392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis _LogicalConnection.STATE_TERMINATED) 13402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis finally: 13412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.release() 13422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 13432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _process_new_channel_slot(self, block): 13442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise MuxUnexpectedException('Client should not send NewChannelSlot') 13452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 13462da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _process_control_blocks(self, parser): 13472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis for control_block in parser.read_control_blocks(): 13482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis opcode = control_block.opcode 13492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug('control block received, opcode: %d' % opcode) 13502da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if opcode == _MUX_OPCODE_ADD_CHANNEL_REQUEST: 13512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._process_add_channel_request(control_block) 13522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif opcode == _MUX_OPCODE_FLOW_CONTROL: 13532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._process_flow_control(control_block) 13542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif opcode == _MUX_OPCODE_DROP_CHANNEL: 13552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._process_drop_channel(control_block) 13562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis elif opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT: 13572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._process_new_channel_slot(control_block) 13582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis else: 13592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise InvalidMuxControlBlockException( 13602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'Invalid opcode') 13612da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 13622da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def _process_logical_frame(self, channel_id, parser): 13632da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug('Received a frame. channel id=%d' % channel_id) 13642da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 13652da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.acquire() 13662da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if not channel_id in self._logical_channels: 13672da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise MuxUnexpectedException( 13682da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'Channel id %d not found' % channel_id) 13692da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_data = self._logical_channels[channel_id] 13702da489cd246702bee5938545b18a6f710ed214bcJamie Gennis fin, rsv1, rsv2, rsv3, opcode, payload = parser.read_inner_frame() 13712da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if not channel_data.request.ws_stream.consume_receive_quota( 13722da489cd246702bee5938545b18a6f710ed214bcJamie Gennis len(payload)): 13732da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # The client violates quota. Close logical channel. 13742da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_data.mux_error_occurred = True 13752da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_data.mux_error_reason = 'Quota violation' 13762da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_data.request.connection.set_read_state( 13772da489cd246702bee5938545b18a6f710ed214bcJamie Gennis _LogicalConnection.STATE_TERMINATED) 13782da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return 13792da489cd246702bee5938545b18a6f710ed214bcJamie Gennis header = create_header(opcode, len(payload), fin, rsv1, rsv2, rsv3, 13802da489cd246702bee5938545b18a6f710ed214bcJamie Gennis mask=False) 13812da489cd246702bee5938545b18a6f710ed214bcJamie Gennis frame_data = header + payload 13822da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_data.request.connection.append_frame_data(frame_data) 13832da489cd246702bee5938545b18a6f710ed214bcJamie Gennis finally: 13842da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.release() 13852da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 13862da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def dispatch_message(self, message): 13872da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Dispatches message. The reader thread calls this method. 13882da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 13892da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Args: 13902da489cd246702bee5938545b18a6f710ed214bcJamie Gennis message: a message that contains encapsulated frame. 13912da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Raises: 13922da489cd246702bee5938545b18a6f710ed214bcJamie Gennis InvalidMuxFrameException: if the message is invalid. 13932da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 13942da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 13952da489cd246702bee5938545b18a6f710ed214bcJamie Gennis parser = _MuxFramePayloadParser(message) 13962da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id = parser.read_channel_id() 13972da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if channel_id == _CONTROL_CHANNEL_ID: 13982da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._process_control_blocks(parser) 13992da489cd246702bee5938545b18a6f710ed214bcJamie Gennis else: 14002da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._process_logical_frame(channel_id, parser) 14012da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 14022da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def notify_worker_done(self, channel_id): 14032da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """Called when a worker has finished. 14042da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 14052da489cd246702bee5938545b18a6f710ed214bcJamie Gennis Args: 14062da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id: channel id corresponded with the worker. 14072da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 14082da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 14092da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug('Worker for channel id %d terminated' % channel_id) 14102da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 14112da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.acquire() 14122da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if not channel_id in self._logical_channels: 14132da489cd246702bee5938545b18a6f710ed214bcJamie Gennis raise MuxUnexpectedException( 14142da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 'Channel id %d not found' % channel_id) 14152da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_data = self._logical_channels.pop(channel_id) 14162da489cd246702bee5938545b18a6f710ed214bcJamie Gennis finally: 14172da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._worker_done_notify_received = True 14182da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.notify() 14192da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.release() 14202da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 14212da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if not channel_data.request.server_terminated: 14222da489cd246702bee5938545b18a6f710ed214bcJamie Gennis if channel_data.mux_error_occurred: 14232da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._send_drop_channel( 14242da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_id, reason=channel_data.mux_error_reason, 14252da489cd246702bee5938545b18a6f710ed214bcJamie Gennis mux_error=True) 14262da489cd246702bee5938545b18a6f710ed214bcJamie Gennis else: 14272da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._send_drop_channel(channel_id) 14282da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 14292da489cd246702bee5938545b18a6f710ed214bcJamie Gennis def notify_reader_done(self): 14302da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """This method is called by the reader thread when the reader has 14312da489cd246702bee5938545b18a6f710ed214bcJamie Gennis finished. 14322da489cd246702bee5938545b18a6f710ed214bcJamie Gennis """ 14332da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 14342da489cd246702bee5938545b18a6f710ed214bcJamie Gennis # Terminate all logical connections 14352da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logger.debug('termiating all logical connections...') 14362da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.acquire() 14372da489cd246702bee5938545b18a6f710ed214bcJamie Gennis for channel_data in self._logical_channels.values(): 14382da489cd246702bee5938545b18a6f710ed214bcJamie Gennis try: 14392da489cd246702bee5938545b18a6f710ed214bcJamie Gennis channel_data.request.connection.set_read_state( 14402da489cd246702bee5938545b18a6f710ed214bcJamie Gennis _LogicalConnection.STATE_TERMINATED) 14412da489cd246702bee5938545b18a6f710ed214bcJamie Gennis except Exception: 14422da489cd246702bee5938545b18a6f710ed214bcJamie Gennis pass 14432da489cd246702bee5938545b18a6f710ed214bcJamie Gennis self._logical_channels_condition.release() 14442da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 14452da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 14462da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef use_mux(request): 14472da489cd246702bee5938545b18a6f710ed214bcJamie Gennis return hasattr(request, 'mux') and request.mux 14482da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 14492da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 14502da489cd246702bee5938545b18a6f710ed214bcJamie Gennisdef start(request, dispatcher): 14512da489cd246702bee5938545b18a6f710ed214bcJamie Gennis mux_handler = _MuxHandler(request, dispatcher) 14522da489cd246702bee5938545b18a6f710ed214bcJamie Gennis mux_handler.start() 14532da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 14542da489cd246702bee5938545b18a6f710ed214bcJamie Gennis mux_handler.add_channel_slots(_INITIAL_NUMBER_OF_CHANNEL_SLOTS, 14552da489cd246702bee5938545b18a6f710ed214bcJamie Gennis _INITIAL_QUOTA_FOR_CLIENT) 14562da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 14572da489cd246702bee5938545b18a6f710ed214bcJamie Gennis mux_handler.wait_until_done() 14582da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 14592da489cd246702bee5938545b18a6f710ed214bcJamie Gennis 14602da489cd246702bee5938545b18a6f710ed214bcJamie Gennis# vi:sts=4 sw=4 et 1461