1#!/usr/bin/env python 2# 3# Copyright 2012, Google Inc. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are 8# met: 9# 10# * Redistributions of source code must retain the above copyright 11# notice, this list of conditions and the following disclaimer. 12# * Redistributions in binary form must reproduce the above 13# copyright notice, this list of conditions and the following disclaimer 14# in the documentation and/or other materials provided with the 15# distribution. 16# * Neither the name of Google Inc. nor the names of its 17# contributors may be used to endorse or promote products derived from 18# this software without specific prior written permission. 19# 20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 21# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 22# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 23# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 24# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 25# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 26# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 27# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 28# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 29# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 30# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 31 32 33"""WebSocket client utility for testing mux extension. 34 35This code should be independent from mod_pywebsocket. See the comment of 36client_for_testing.py. 37 38NOTE: This code is far from robust like client_for_testing.py. 39""" 40 41 42 43import Queue 44import base64 45import collections 46import email 47import email.parser 48import logging 49import math 50import os 51import random 52import socket 53import struct 54import threading 55 56from mod_pywebsocket import util 57 58from test import client_for_testing 59 60 61_CONTROL_CHANNEL_ID = 0 62_DEFAULT_CHANNEL_ID = 1 63 64_MUX_OPCODE_ADD_CHANNEL_REQUEST = 0 65_MUX_OPCODE_ADD_CHANNEL_RESPONSE = 1 66_MUX_OPCODE_FLOW_CONTROL = 2 67_MUX_OPCODE_DROP_CHANNEL = 3 68_MUX_OPCODE_NEW_CHANNEL_SLOT = 4 69 70 71class _ControlBlock: 72 def __init__(self, opcode): 73 self.opcode = opcode 74 75 76def _parse_handshake_response(response): 77 status_line, header_lines = response.split('\r\n', 1) 78 79 words = status_line.split(' ') 80 if len(words) < 3: 81 raise ValueError('Bad Status-Line syntax %r' % status_line) 82 [version, response_code] = words[:2] 83 if version != 'HTTP/1.1': 84 raise ValueError('Bad response version %r' % version) 85 86 if response_code != '101': 87 raise ValueError('Bad response code %r ' % response_code) 88 headers = email.parser.Parser().parsestr(header_lines) 89 return headers 90 91 92def _parse_channel_id(data, offset=0): 93 length = len(data) 94 remaining = length - offset 95 96 if remaining <= 0: 97 raise Exception('No channel id found') 98 99 channel_id = ord(data[offset]) 100 channel_id_length = 1 101 if channel_id & 0xe0 == 0xe0: 102 if remaining < 4: 103 raise Exception('Invalid channel id format') 104 channel_id = struct.unpack('!L', 105 data[offset:offset+4])[0] & 0x1fffffff 106 channel_id_length = 4 107 elif channel_id & 0xc0 == 0xc0: 108 if remaining < 3: 109 raise Exception('Invalid channel id format') 110 channel_id = (((channel_id & 0x1f) << 16) + 111 struct.unpack('!H', data[offset+1:offset+3])[0]) 112 channel_id_length = 3 113 elif channel_id & 0x80 == 0x80: 114 if remaining < 2: 115 raise Exception('Invalid channel id format') 116 channel_id = struct.unpack('!H', data[offset:offset+2])[0] & 0x3fff 117 channel_id_length = 2 118 119 return channel_id, channel_id_length 120 121 122def _read_number(data, size_of_size, offset=0): 123 if size_of_size == 1: 124 return ord(data[offset]) 125 elif size_of_size == 2: 126 return struct.unpack('!H', data[offset:offset+2])[0] 127 elif size_of_size == 3: 128 return ((ord(data[offset]) << 16) 129 + struct.unpack('!H', data[offset+1:offset+3])[0]) 130 elif size_of_size == 4: 131 return struct.unpack('!L', data[offset:offset+4])[0] 132 else: 133 raise Exception('Invalid "size of size" in control block') 134 135 136def _parse_control_block_specific_data(data, size_of_size, offset=0): 137 remaining = len(data) - offset 138 if remaining < size_of_size: 139 raise Exception('Invalid control block received') 140 141 size = _read_number(data, size_of_size, offset) 142 143 start_position = offset + size_of_size 144 end_position = start_position + size 145 if len(data) < end_position: 146 raise Exception('Invalid size of control block (%d < %d)' % ( 147 len(data), end_position)) 148 return data[start_position:end_position], size_of_size + size 149 150 151def _parse_control_blocks(data): 152 blocks = [] 153 length = len(data) 154 pos = 0 155 156 while pos < length: 157 first_byte = ord(data[pos]) 158 pos += 1 159 opcode = (first_byte >> 5) & 0x7 160 block = _ControlBlock(opcode) 161 162 # TODO(bashi): Support more opcode 163 if opcode == _MUX_OPCODE_ADD_CHANNEL_RESPONSE: 164 block.encode = (first_byte >> 2) & 3 165 block.rejected = (first_byte >> 4) & 1 166 167 channel_id, advance = _parse_channel_id(data, pos) 168 block.channel_id = channel_id 169 pos += advance 170 171 size_of_size = (first_byte & 3) + 1 172 encoded_handshake, advance = _parse_control_block_specific_data( 173 data, size_of_size, pos) 174 block.encoded_handshake = encoded_handshake 175 pos += advance 176 blocks.append(block) 177 elif opcode == _MUX_OPCODE_DROP_CHANNEL: 178 block.mux_error = (first_byte >> 4) & 1 179 180 channel_id, channel_id_length = _parse_channel_id(data, pos) 181 block.channel_id = channel_id 182 pos += channel_id_length 183 184 size_of_size = first_byte & 3 185 reason, size = _parse_control_block_specific_data( 186 data, size_of_size, pos) 187 block.reason = reason 188 pos += size 189 blocks.append(block) 190 elif opcode == _MUX_OPCODE_FLOW_CONTROL: 191 channel_id, advance = _parse_channel_id(data, pos) 192 block.channel_id = channel_id 193 pos += advance 194 size_of_quota = (first_byte & 3) + 1 195 block.send_quota = _read_number(data, size_of_quota, pos) 196 pos += size_of_quota 197 blocks.append(block) 198 elif opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT: 199 size_of_slots = ((first_byte >> 2) & 3) + 1 200 size_of_quota = (first_byte & 3) + 1 201 block.slots = _read_number(data, size_of_slots, pos) 202 pos += size_of_slots 203 block.send_quota = _read_number(data, size_of_quota, pos) 204 pos += size_of_quota 205 blocks.append(block) 206 else: 207 raise Exception( 208 'Unsupported mux opcode %d received' % opcode) 209 210 return blocks 211 212 213def _encode_channel_id(channel_id): 214 if channel_id < 0: 215 raise ValueError('Channel id %d must not be negative' % channel_id) 216 217 if channel_id < 2 ** 7: 218 return chr(channel_id) 219 if channel_id < 2 ** 14: 220 return struct.pack('!H', 0x8000 + channel_id) 221 if channel_id < 2 ** 21: 222 first = chr(0xc0 + (channel_id >> 16)) 223 return first + struct.pack('!H', channel_id & 0xffff) 224 if channel_id < 2 ** 29: 225 return struct.pack('!L', 0xe0000000 + channel_id) 226 227 raise ValueError('Channel id %d is too large' % channel_id) 228 229 230def _size_of_number_in_bytes_minus_1(number): 231 # Calculate the minimum number of bytes minus 1 that are required to store 232 # the data. 233 if number < 0: 234 raise ValueError('Invalid number: %d' % number) 235 elif number < 2 ** 8: 236 return 0 237 elif number < 2 ** 16: 238 return 1 239 elif number < 2 ** 24: 240 return 2 241 elif number < 2 ** 32: 242 return 3 243 else: 244 raise ValueError('Invalid number %d' % number) 245 246 247def _encode_number(number): 248 if number < 2 ** 8: 249 return chr(number) 250 elif number < 2 ** 16: 251 return struct.pack('!H', number) 252 elif number < 2 ** 24: 253 return chr(number >> 16) + struct.pack('!H', number & 0xffff) 254 else: 255 return struct.pack('!L', number) 256 257 258def _create_add_channel_request(channel_id, encoded_handshake, 259 encoding=0): 260 length = len(encoded_handshake) 261 size_of_length = _size_of_number_in_bytes_minus_1(length) 262 263 first_byte = ((_MUX_OPCODE_ADD_CHANNEL_REQUEST << 5) | (encoding << 2) | 264 size_of_length) 265 encoded_length = _encode_number(length) 266 267 return (chr(first_byte) + _encode_channel_id(channel_id) + 268 encoded_length + encoded_handshake) 269 270 271def _create_flow_control(channel_id, replenished_quota): 272 size_of_quota = _size_of_number_in_bytes_minus_1(replenished_quota) 273 first_byte = ((_MUX_OPCODE_FLOW_CONTROL << 5) | size_of_quota) 274 return (chr(first_byte) + _encode_channel_id(channel_id) + 275 _encode_number(replenished_quota)) 276 277 278class _MuxReaderThread(threading.Thread): 279 """Mux reader thread. 280 281 Reads frames and passes them to the mux client. This thread accesses 282 private functions/variables of the mux client. 283 """ 284 285 def __init__(self, mux): 286 threading.Thread.__init__(self) 287 self.setDaemon(True) 288 self._mux = mux 289 self._stop_requested = False 290 291 def _receive_message(self): 292 first_opcode = None 293 pending_payload = [] 294 while not self._stop_requested: 295 fin, rsv1, rsv2, rsv3, opcode, payload_length = ( 296 client_for_testing.read_frame_header(self._mux._socket)) 297 298 if not first_opcode: 299 if opcode == client_for_testing.OPCODE_TEXT: 300 raise Exception('Received a text message on physical ' 301 'connection') 302 if opcode == client_for_testing.OPCODE_CONTINUATION: 303 raise Exception('Received an intermediate frame but ' 304 'fragmentation was not started') 305 if (opcode == client_for_testing.OPCODE_BINARY or 306 opcode == client_for_testing.OPCODE_PONG or 307 opcode == client_for_testing.OPCODE_PONG or 308 opcode == client_for_testing.OPCODE_CLOSE): 309 first_opcode = opcode 310 else: 311 raise Exception('Received an undefined opcode frame: %d' % 312 opcode) 313 314 elif opcode != client_for_testing.OPCODE_CONTINUATION: 315 raise Exception('Received a new opcode before ' 316 'terminating fragmentation') 317 318 payload = client_for_testing.receive_bytes( 319 self._mux._socket, payload_length) 320 321 if self._mux._incoming_frame_filter is not None: 322 payload = self._mux._incoming_frame_filter.filter(payload) 323 324 pending_payload.append(payload) 325 326 if fin: 327 break 328 329 if self._stop_requested: 330 return None, None 331 332 message = ''.join(pending_payload) 333 return first_opcode, message 334 335 def request_stop(self): 336 self._stop_requested = True 337 338 def run(self): 339 try: 340 while not self._stop_requested: 341 # opcode is OPCODE_BINARY or control opcodes when a message 342 # is succesfully received. 343 opcode, message = self._receive_message() 344 if not opcode: 345 return 346 if opcode == client_for_testing.OPCODE_BINARY: 347 channel_id, advance = _parse_channel_id(message) 348 self._mux._dispatch_frame(channel_id, message[advance:]) 349 else: 350 self._mux._process_control_message(opcode, message) 351 finally: 352 self._mux._notify_reader_done() 353 354 355class _InnerFrame(object): 356 def __init__(self, fin, rsv1, rsv2, rsv3, opcode, payload): 357 self.fin = fin 358 self.rsv1 = rsv1 359 self.rsv2 = rsv2 360 self.rsv3 = rsv3 361 self.opcode = opcode 362 self.payload = payload 363 364 365class _LogicalChannelData(object): 366 def __init__(self): 367 self.queue = Queue.Queue() 368 self.send_quota = 0 369 self.receive_quota = 0 370 371 372class MuxClient(object): 373 """WebSocket mux client. 374 375 Note that this class is NOT thread-safe. Do not access an instance of this 376 class from multiple threads at a same time. 377 """ 378 379 def __init__(self, options): 380 self._logger = util.get_class_logger(self) 381 382 self._options = options 383 self._options.enable_mux() 384 self._stream = None 385 self._socket = None 386 self._handshake = client_for_testing.WebSocketHandshake(self._options) 387 self._incoming_frame_filter = None 388 self._outgoing_frame_filter = None 389 390 self._is_active = False 391 self._read_thread = None 392 self._control_blocks_condition = threading.Condition() 393 self._control_blocks = [] 394 self._channel_slots = collections.deque() 395 self._logical_channels_condition = threading.Condition(); 396 self._logical_channels = {} 397 self._timeout = 2 398 self._physical_connection_close_event = None 399 self._physical_connection_close_message = None 400 401 def _parse_inner_frame(self, data): 402 if len(data) == 0: 403 raise Exception('Invalid encapsulated frame received') 404 405 first_byte = ord(data[0]) 406 fin = (first_byte << 7) & 1 407 rsv1 = (first_byte << 6) & 1 408 rsv2 = (first_byte << 5) & 1 409 rsv3 = (first_byte << 4) & 1 410 opcode = first_byte & 0xf 411 412 if self._outgoing_frame_filter: 413 payload = self._outgoing_frame_filter.filter( 414 data[1:]) 415 else: 416 payload = data[1:] 417 418 return _InnerFrame(fin, rsv1, rsv2, rsv3, opcode, payload) 419 420 def _process_mux_control_blocks(self): 421 for block in self._control_blocks: 422 if block.opcode == _MUX_OPCODE_ADD_CHANNEL_RESPONSE: 423 # AddChannelResponse will be handled in add_channel(). 424 continue 425 elif block.opcode == _MUX_OPCODE_FLOW_CONTROL: 426 try: 427 self._logical_channels_condition.acquire() 428 if not block.channel_id in self._logical_channels: 429 raise Exception('Invalid flow control received for ' 430 'channel id %d' % block.channel_id) 431 self._logical_channels[block.channel_id].send_quota += ( 432 block.send_quota) 433 self._logical_channels_condition.notify() 434 finally: 435 self._logical_channels_condition.release() 436 elif block.opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT: 437 self._channel_slots.extend([block.send_quota] * block.slots) 438 439 def _dispatch_frame(self, channel_id, payload): 440 if channel_id == _CONTROL_CHANNEL_ID: 441 try: 442 self._control_blocks_condition.acquire() 443 self._control_blocks += _parse_control_blocks(payload) 444 self._process_mux_control_blocks() 445 self._control_blocks_condition.notify() 446 finally: 447 self._control_blocks_condition.release() 448 else: 449 try: 450 self._logical_channels_condition.acquire() 451 if not channel_id in self._logical_channels: 452 raise Exception('Received logical frame on channel id ' 453 '%d, which is not established' % 454 channel_id) 455 456 inner_frame = self._parse_inner_frame(payload) 457 self._logical_channels[channel_id].receive_quota -= ( 458 len(inner_frame.payload)) 459 if self._logical_channels[channel_id].receive_quota < 0: 460 raise Exception('The server violates quota on ' 461 'channel id %d' % channel_id) 462 finally: 463 self._logical_channels_condition.release() 464 self._logical_channels[channel_id].queue.put(inner_frame) 465 466 def _process_control_message(self, opcode, message): 467 # Ping/Pong are not supported. 468 if opcode == client_for_testing.OPCODE_CLOSE: 469 self._physical_connection_close_message = message 470 if self._is_active: 471 self._stream.send_close( 472 code=client_for_testing.STATUS_NORMAL_CLOSURE, reason='') 473 self._read_thread.request_stop() 474 475 if self._physical_connection_close_event: 476 self._physical_connection_close_event.set() 477 478 def _notify_reader_done(self): 479 self._logger.debug('Read thread terminated.') 480 self.close_socket() 481 482 def _assert_channel_slot_available(self): 483 try: 484 self._control_blocks_condition.acquire() 485 if len(self._channel_slots) == 0: 486 # Wait once 487 self._control_blocks_condition.wait(timeout=self._timeout) 488 finally: 489 self._control_blocks_condition.release() 490 491 if len(self._channel_slots) == 0: 492 raise Exception('Failed to receive NewChannelSlot') 493 494 def _assert_send_quota_available(self, channel_id): 495 try: 496 self._logical_channels_condition.acquire() 497 if self._logical_channels[channel_id].send_quota == 0: 498 # Wait once 499 self._logical_channels_condition.wait(timeout=self._timeout) 500 finally: 501 self._logical_channels_condition.release() 502 503 if self._logical_channels[channel_id].send_quota == 0: 504 raise Exception('Failed to receive FlowControl for channel id %d' % 505 channel_id) 506 507 def connect(self): 508 self._socket = socket.socket() 509 self._socket.settimeout(self._options.socket_timeout) 510 511 self._socket.connect((self._options.server_host, 512 self._options.server_port)) 513 if self._options.use_tls: 514 self._socket = _TLSSocket(self._socket) 515 516 self._handshake.handshake(self._socket) 517 self._stream = client_for_testing.WebSocketStream( 518 self._socket, self._handshake) 519 520 self._logical_channels[_DEFAULT_CHANNEL_ID] = _LogicalChannelData() 521 522 self._read_thread = _MuxReaderThread(self) 523 self._read_thread.start() 524 525 self._assert_channel_slot_available() 526 self._assert_send_quota_available(_DEFAULT_CHANNEL_ID) 527 528 self._is_active = True 529 self._logger.info('Connection established') 530 531 def add_channel(self, channel_id, options): 532 if not self._is_active: 533 raise Exception('Mux client is not active') 534 535 if channel_id in self._logical_channels: 536 raise Exception('Channel id %d already exists' % channel_id) 537 538 try: 539 send_quota = self._channel_slots.popleft() 540 except IndexError, e: 541 raise Exception('No channel slots: %r' % e) 542 543 # Create AddChannel request 544 request_line = 'GET %s HTTP/1.1\r\n' % options.resource 545 fields = [] 546 fields.append('Upgrade: websocket\r\n') 547 fields.append('Connection: Upgrade\r\n') 548 if options.server_port == client_for_testing.DEFAULT_PORT: 549 fields.append('Host: %s\r\n' % options.server_host.lower()) 550 else: 551 fields.append('Host: %s:%d\r\n' % (options.server_host.lower(), 552 options.server_port)) 553 fields.append('Origin: %s\r\n' % options.origin.lower()) 554 555 original_key = os.urandom(16) 556 key = base64.b64encode(original_key) 557 fields.append('Sec-WebSocket-Key: %s\r\n' % key) 558 559 fields.append('Sec-WebSocket-Version: 13\r\n') 560 561 if len(options.extensions) > 0: 562 fields.append('Sec-WebSocket-Extensions: %s\r\n' % 563 ', '.join(options.extensions)) 564 565 handshake = request_line + ''.join(fields) + '\r\n' 566 add_channel_request = _create_add_channel_request( 567 channel_id, handshake) 568 payload = _encode_channel_id(_CONTROL_CHANNEL_ID) + add_channel_request 569 self._stream.send_binary(payload) 570 571 # Wait AddChannelResponse 572 self._logger.debug('Waiting AddChannelResponse for the request...') 573 response = None 574 try: 575 self._control_blocks_condition.acquire() 576 while True: 577 for block in self._control_blocks: 578 if block.opcode != _MUX_OPCODE_ADD_CHANNEL_RESPONSE: 579 continue 580 if block.channel_id == channel_id: 581 response = block 582 self._control_blocks.remove(response) 583 break 584 if response: 585 break 586 self._control_blocks_condition.wait(self._timeout) 587 if not self._is_active: 588 raise Exception('AddChannelRequest timed out') 589 finally: 590 self._control_blocks_condition.release() 591 592 # Validate AddChannelResponse 593 if response.rejected: 594 raise Exception('The server rejected AddChannelRequest') 595 596 fields = _parse_handshake_response(response.encoded_handshake) 597 598 if not 'upgrade' in fields: 599 raise Exception('No Upgrade header') 600 if fields['upgrade'] != 'websocket': 601 raise Exception('Wrong Upgrade header') 602 if not 'connection' in fields: 603 raise Exception('No Connection header') 604 if fields['connection'] != 'Upgrade': 605 raise Exception('Wrong Connection header') 606 if not 'sec-websocket-accept' in fields: 607 raise Exception('No Sec-WebSocket-Accept header') 608 609 accept = fields['sec-websocket-accept'] 610 try: 611 decoded_accept = base64.b64decode(accept) 612 except TypeError, e: 613 raise Exception( 614 'Illegal value for header Sec-WebSocket-Accept: ' + accept) 615 616 if len(decoded_accept) != 20: 617 raise Exception( 618 'Decoded value of Sec-WebSocket-Accept is not 20-byte long') 619 620 original_expected_accept = util.sha1_hash( 621 key + client_for_testing.WEBSOCKET_ACCEPT_UUID).digest() 622 expected_accept = base64.b64encode(original_expected_accept) 623 624 if accept != expected_accept: 625 raise Exception( 626 'Invalid Sec-WebSocket-Accept header: %r (expected) != %r ' 627 '(actual)' % (accept, expected_accept)) 628 629 self._logical_channels_condition.acquire() 630 self._logical_channels[channel_id] = _LogicalChannelData() 631 self._logical_channels[channel_id].send_quota = send_quota 632 self._logical_channels_condition.release() 633 634 self._logger.debug('Logical channel %d established' % channel_id) 635 636 def _check_logical_channel_is_opened(self, channel_id): 637 if not self._is_active: 638 raise Exception('Mux client is not active') 639 640 if not channel_id in self._logical_channels: 641 raise Exception('Logical channel %d is not established.') 642 643 def drop_channel(self, channel_id): 644 # TODO(bashi): Implement 645 pass 646 647 def send_flow_control(self, channel_id, replenished_quota): 648 self._check_logical_channel_is_opened(channel_id) 649 flow_control = _create_flow_control(channel_id, replenished_quota) 650 payload = _encode_channel_id(_CONTROL_CHANNEL_ID) + flow_control 651 # Replenish receive quota 652 try: 653 self._logical_channels_condition.acquire() 654 self._logical_channels[channel_id].receive_quota += ( 655 replenished_quota) 656 finally: 657 self._logical_channels_condition.release() 658 self._stream.send_binary(payload) 659 660 def send_message(self, channel_id, message, end=True, binary=False): 661 self._check_logical_channel_is_opened(channel_id) 662 663 if binary: 664 first_byte = (end << 7) | client_for_testing.OPCODE_BINARY 665 else: 666 first_byte = (end << 7) | client_for_testing.OPCODE_TEXT 667 message = message.encode('utf-8') 668 669 try: 670 self._logical_channels_condition.acquire() 671 if self._logical_channels[channel_id].send_quota < len(message): 672 raise Exception('Send quota violation: %d < %d' % ( 673 self._logical_channels[channel_id].send_quota, 674 len(message))) 675 676 self._logical_channels[channel_id].send_quota -= len(message) 677 finally: 678 self._logical_channels_condition.release() 679 payload = _encode_channel_id(channel_id) + chr(first_byte) + message 680 self._stream.send_binary(payload) 681 682 def assert_receive(self, channel_id, payload, binary=False): 683 self._check_logical_channel_is_opened(channel_id) 684 685 try: 686 inner_frame = self._logical_channels[channel_id].queue.get( 687 timeout=self._timeout) 688 except Queue.Empty, e: 689 raise Exception('Cannot receive message from channel id %d' % 690 channel_id) 691 692 if binary: 693 opcode = client_for_testing.OPCODE_BINARY 694 else: 695 opcode = client_for_testing.OPCODE_TEXT 696 697 if inner_frame.opcode != opcode: 698 raise Exception('Unexpected opcode received (%r != %r)' % 699 (expected_opcode, inner_frame.opcode)) 700 701 if inner_frame.payload != payload: 702 raise Exception('Unexpected payload received') 703 704 def send_close(self, channel_id, code=None, reason=''): 705 self._check_logical_channel_is_opened(channel_id) 706 707 if code is not None: 708 body = struct.pack('!H', code) + reason.encode('utf-8') 709 else: 710 body = '' 711 712 first_byte = (1 << 7) | client_for_testing.OPCODE_CLOSE 713 payload = _encode_channel_id(channel_id) + chr(first_byte) + body 714 self._stream.send_binary(payload) 715 716 def assert_receive_close(self, channel_id): 717 self._check_logical_channel_is_opened(channel_id) 718 719 try: 720 inner_frame = self._logical_channels[channel_id].queue.get( 721 timeout=self._timeout) 722 except Queue.Empty, e: 723 raise Exception('Cannot receive message from channel id %d' % 724 channel_id) 725 if inner_frame.opcode != client_for_testing.OPCODE_CLOSE: 726 raise Exception('Didn\'t receive close frame') 727 728 def send_physical_connection_close(self, code=None, reason=''): 729 self._physical_connection_close_event = threading.Event() 730 self._stream.send_close(code, reason) 731 732 # This method can be used only after calling 733 # send_physical_connection_close(). 734 def assert_physical_connection_receive_close( 735 self, code=client_for_testing.STATUS_NORMAL_CLOSURE, reason=''): 736 self._physical_connection_close_event.wait(timeout=self._timeout) 737 if (not self._physical_connection_close_event.isSet() or 738 not self._physical_connection_close_message): 739 raise Exception('Didn\'t receive closing handshake') 740 741 def close_socket(self): 742 self._is_active = False 743 self._socket.close() 744