1#!/usr/bin/env python
2#
3# Copyright 2012, Google Inc.
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions are
8# met:
9#
10#     * Redistributions of source code must retain the above copyright
11# notice, this list of conditions and the following disclaimer.
12#     * Redistributions in binary form must reproduce the above
13# copyright notice, this list of conditions and the following disclaimer
14# in the documentation and/or other materials provided with the
15# distribution.
16#     * Neither the name of Google Inc. nor the names of its
17# contributors may be used to endorse or promote products derived from
18# this software without specific prior written permission.
19#
20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31
32
33"""WebSocket client utility for testing mux extension.
34
35This code should be independent from mod_pywebsocket. See the comment of
36client_for_testing.py.
37
38NOTE: This code is far from robust like client_for_testing.py.
39"""
40
41
42
43import Queue
44import base64
45import collections
46import email
47import email.parser
48import logging
49import math
50import os
51import random
52import socket
53import struct
54import threading
55
56from mod_pywebsocket import util
57
58from test import client_for_testing
59
60
61_CONTROL_CHANNEL_ID = 0
62_DEFAULT_CHANNEL_ID = 1
63
64_MUX_OPCODE_ADD_CHANNEL_REQUEST = 0
65_MUX_OPCODE_ADD_CHANNEL_RESPONSE = 1
66_MUX_OPCODE_FLOW_CONTROL = 2
67_MUX_OPCODE_DROP_CHANNEL = 3
68_MUX_OPCODE_NEW_CHANNEL_SLOT = 4
69
70
71class _ControlBlock:
72    def __init__(self, opcode):
73        self.opcode = opcode
74
75
76def _parse_handshake_response(response):
77    status_line, header_lines = response.split('\r\n', 1)
78
79    words = status_line.split(' ')
80    if len(words) < 3:
81        raise ValueError('Bad Status-Line syntax %r' % status_line)
82    [version, response_code] = words[:2]
83    if version != 'HTTP/1.1':
84        raise ValueError('Bad response version %r' % version)
85
86    if response_code != '101':
87        raise ValueError('Bad response code %r ' % response_code)
88    headers = email.parser.Parser().parsestr(header_lines)
89    return headers
90
91
92def _parse_channel_id(data, offset=0):
93    length = len(data)
94    remaining = length - offset
95
96    if remaining <= 0:
97        raise Exception('No channel id found')
98
99    channel_id = ord(data[offset])
100    channel_id_length = 1
101    if channel_id & 0xe0 == 0xe0:
102        if remaining < 4:
103            raise Exception('Invalid channel id format')
104        channel_id = struct.unpack('!L',
105                                   data[offset:offset+4])[0] & 0x1fffffff
106        channel_id_length = 4
107    elif channel_id & 0xc0 == 0xc0:
108        if remaining < 3:
109            raise Exception('Invalid channel id format')
110        channel_id = (((channel_id & 0x1f) << 16) +
111                      struct.unpack('!H', data[offset+1:offset+3])[0])
112        channel_id_length = 3
113    elif channel_id & 0x80 == 0x80:
114        if remaining < 2:
115            raise Exception('Invalid channel id format')
116        channel_id = struct.unpack('!H', data[offset:offset+2])[0] & 0x3fff
117        channel_id_length = 2
118
119    return channel_id, channel_id_length
120
121
122def _read_number(data, size_of_size, offset=0):
123    if size_of_size == 1:
124        return ord(data[offset])
125    elif size_of_size == 2:
126        return struct.unpack('!H', data[offset:offset+2])[0]
127    elif size_of_size == 3:
128        return ((ord(data[offset]) << 16)
129                + struct.unpack('!H', data[offset+1:offset+3])[0])
130    elif size_of_size == 4:
131        return struct.unpack('!L', data[offset:offset+4])[0]
132    else:
133        raise Exception('Invalid "size of size" in control block')
134
135
136def _parse_control_block_specific_data(data, size_of_size, offset=0):
137    remaining = len(data) - offset
138    if remaining < size_of_size:
139        raise Exception('Invalid control block received')
140
141    size = _read_number(data, size_of_size, offset)
142
143    start_position = offset + size_of_size
144    end_position = start_position + size
145    if len(data) < end_position:
146        raise Exception('Invalid size of control block (%d < %d)' % (
147                len(data), end_position))
148    return data[start_position:end_position], size_of_size + size
149
150
151def _parse_control_blocks(data):
152    blocks = []
153    length = len(data)
154    pos = 0
155
156    while pos < length:
157        first_byte = ord(data[pos])
158        pos += 1
159        opcode = (first_byte >> 5) & 0x7
160        block = _ControlBlock(opcode)
161
162        # TODO(bashi): Support more opcode
163        if opcode == _MUX_OPCODE_ADD_CHANNEL_RESPONSE:
164            block.encode = (first_byte >> 2) & 3
165            block.rejected = (first_byte >> 4) & 1
166
167            channel_id, advance = _parse_channel_id(data, pos)
168            block.channel_id = channel_id
169            pos += advance
170
171            size_of_size = (first_byte & 3) + 1
172            encoded_handshake, advance = _parse_control_block_specific_data(
173                data, size_of_size, pos)
174            block.encoded_handshake = encoded_handshake
175            pos += advance
176            blocks.append(block)
177        elif opcode == _MUX_OPCODE_DROP_CHANNEL:
178            block.mux_error = (first_byte >> 4) & 1
179
180            channel_id, channel_id_length = _parse_channel_id(data, pos)
181            block.channel_id = channel_id
182            pos += channel_id_length
183
184            size_of_size = first_byte & 3
185            reason, size = _parse_control_block_specific_data(
186                data, size_of_size, pos)
187            block.reason = reason
188            pos += size
189            blocks.append(block)
190        elif opcode == _MUX_OPCODE_FLOW_CONTROL:
191            channel_id, advance = _parse_channel_id(data, pos)
192            block.channel_id = channel_id
193            pos += advance
194            size_of_quota = (first_byte & 3) + 1
195            block.send_quota = _read_number(data, size_of_quota, pos)
196            pos += size_of_quota
197            blocks.append(block)
198        elif opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT:
199            size_of_slots = ((first_byte >> 2) & 3) + 1
200            size_of_quota = (first_byte & 3) + 1
201            block.slots = _read_number(data, size_of_slots, pos)
202            pos += size_of_slots
203            block.send_quota = _read_number(data, size_of_quota, pos)
204            pos += size_of_quota
205            blocks.append(block)
206        else:
207            raise Exception(
208                'Unsupported mux opcode %d received' % opcode)
209
210    return blocks
211
212
213def _encode_channel_id(channel_id):
214    if channel_id < 0:
215        raise ValueError('Channel id %d must not be negative' % channel_id)
216
217    if channel_id < 2 ** 7:
218        return chr(channel_id)
219    if channel_id < 2 ** 14:
220        return struct.pack('!H', 0x8000 + channel_id)
221    if channel_id < 2 ** 21:
222        first = chr(0xc0 + (channel_id >> 16))
223        return first + struct.pack('!H', channel_id & 0xffff)
224    if channel_id < 2 ** 29:
225        return struct.pack('!L', 0xe0000000 + channel_id)
226
227    raise ValueError('Channel id %d is too large' % channel_id)
228
229
230def _size_of_number_in_bytes_minus_1(number):
231    # Calculate the minimum number of bytes minus 1 that are required to store
232    # the data.
233    if number < 0:
234        raise ValueError('Invalid number: %d' % number)
235    elif number < 2 ** 8:
236        return 0
237    elif number < 2 ** 16:
238        return 1
239    elif number < 2 ** 24:
240        return 2
241    elif number < 2 ** 32:
242        return 3
243    else:
244        raise ValueError('Invalid number %d' % number)
245
246
247def _encode_number(number):
248    if number < 2 ** 8:
249        return chr(number)
250    elif number < 2 ** 16:
251        return struct.pack('!H', number)
252    elif number < 2 ** 24:
253        return chr(number >> 16) + struct.pack('!H', number & 0xffff)
254    else:
255        return struct.pack('!L', number)
256
257
258def _create_add_channel_request(channel_id, encoded_handshake,
259                                encoding=0):
260    length = len(encoded_handshake)
261    size_of_length = _size_of_number_in_bytes_minus_1(length)
262
263    first_byte = ((_MUX_OPCODE_ADD_CHANNEL_REQUEST << 5) | (encoding << 2) |
264                  size_of_length)
265    encoded_length = _encode_number(length)
266
267    return (chr(first_byte) + _encode_channel_id(channel_id) +
268            encoded_length + encoded_handshake)
269
270
271def _create_flow_control(channel_id, replenished_quota):
272    size_of_quota = _size_of_number_in_bytes_minus_1(replenished_quota)
273    first_byte = ((_MUX_OPCODE_FLOW_CONTROL << 5) | size_of_quota)
274    return (chr(first_byte) + _encode_channel_id(channel_id) +
275            _encode_number(replenished_quota))
276
277
278class _MuxReaderThread(threading.Thread):
279    """Mux reader thread.
280
281    Reads frames and passes them to the mux client. This thread accesses
282    private functions/variables of the mux client.
283    """
284
285    def __init__(self, mux):
286        threading.Thread.__init__(self)
287        self.setDaemon(True)
288        self._mux = mux
289        self._stop_requested = False
290
291    def _receive_message(self):
292        first_opcode = None
293        pending_payload = []
294        while not self._stop_requested:
295            fin, rsv1, rsv2, rsv3, opcode, payload_length = (
296                client_for_testing.read_frame_header(self._mux._socket))
297
298            if not first_opcode:
299                if opcode == client_for_testing.OPCODE_TEXT:
300                    raise Exception('Received a text message on physical '
301                                    'connection')
302                if opcode == client_for_testing.OPCODE_CONTINUATION:
303                    raise Exception('Received an intermediate frame but '
304                                    'fragmentation was not started')
305                if (opcode == client_for_testing.OPCODE_BINARY or
306                    opcode == client_for_testing.OPCODE_PONG or
307                    opcode == client_for_testing.OPCODE_PONG or
308                    opcode == client_for_testing.OPCODE_CLOSE):
309                    first_opcode = opcode
310                else:
311                    raise Exception('Received an undefined opcode frame: %d' %
312                                    opcode)
313
314            elif opcode != client_for_testing.OPCODE_CONTINUATION:
315                raise Exception('Received a new opcode before '
316                                'terminating fragmentation')
317
318            payload = client_for_testing.receive_bytes(
319                self._mux._socket, payload_length)
320
321            if self._mux._incoming_frame_filter is not None:
322                payload = self._mux._incoming_frame_filter.filter(payload)
323
324            pending_payload.append(payload)
325
326            if fin:
327                break
328
329        if self._stop_requested:
330            return None, None
331
332        message = ''.join(pending_payload)
333        return first_opcode, message
334
335    def request_stop(self):
336        self._stop_requested = True
337
338    def run(self):
339        try:
340            while not self._stop_requested:
341                # opcode is OPCODE_BINARY or control opcodes when a message
342                # is succesfully received.
343                opcode, message = self._receive_message()
344                if not opcode:
345                    return
346                if opcode == client_for_testing.OPCODE_BINARY:
347                    channel_id, advance = _parse_channel_id(message)
348                    self._mux._dispatch_frame(channel_id, message[advance:])
349                else:
350                    self._mux._process_control_message(opcode, message)
351        finally:
352            self._mux._notify_reader_done()
353
354
355class _InnerFrame(object):
356    def __init__(self, fin, rsv1, rsv2, rsv3, opcode, payload):
357        self.fin = fin
358        self.rsv1 = rsv1
359        self.rsv2 = rsv2
360        self.rsv3 = rsv3
361        self.opcode = opcode
362        self.payload = payload
363
364
365class _LogicalChannelData(object):
366    def __init__(self):
367        self.queue = Queue.Queue()
368        self.send_quota = 0
369        self.receive_quota = 0
370
371
372class MuxClient(object):
373    """WebSocket mux client.
374
375    Note that this class is NOT thread-safe. Do not access an instance of this
376    class from multiple threads at a same time.
377    """
378
379    def __init__(self, options):
380        self._logger = util.get_class_logger(self)
381
382        self._options = options
383        self._options.enable_mux()
384        self._stream = None
385        self._socket = None
386        self._handshake = client_for_testing.WebSocketHandshake(self._options)
387        self._incoming_frame_filter = None
388        self._outgoing_frame_filter = None
389
390        self._is_active = False
391        self._read_thread = None
392        self._control_blocks_condition = threading.Condition()
393        self._control_blocks = []
394        self._channel_slots = collections.deque()
395        self._logical_channels_condition = threading.Condition();
396        self._logical_channels = {}
397        self._timeout = 2
398        self._physical_connection_close_event = None
399        self._physical_connection_close_message = None
400
401    def _parse_inner_frame(self, data):
402        if len(data) == 0:
403            raise Exception('Invalid encapsulated frame received')
404
405        first_byte = ord(data[0])
406        fin = (first_byte << 7) & 1
407        rsv1 = (first_byte << 6) & 1
408        rsv2 = (first_byte << 5) & 1
409        rsv3 = (first_byte << 4) & 1
410        opcode = first_byte & 0xf
411
412        if self._outgoing_frame_filter:
413            payload = self._outgoing_frame_filter.filter(
414                data[1:])
415        else:
416            payload = data[1:]
417
418        return _InnerFrame(fin, rsv1, rsv2, rsv3, opcode, payload)
419
420    def _process_mux_control_blocks(self):
421        for block in self._control_blocks:
422            if block.opcode == _MUX_OPCODE_ADD_CHANNEL_RESPONSE:
423                # AddChannelResponse will be handled in add_channel().
424                continue
425            elif block.opcode == _MUX_OPCODE_FLOW_CONTROL:
426                try:
427                    self._logical_channels_condition.acquire()
428                    if not block.channel_id in self._logical_channels:
429                        raise Exception('Invalid flow control received for '
430                                        'channel id %d' % block.channel_id)
431                    self._logical_channels[block.channel_id].send_quota += (
432                        block.send_quota)
433                    self._logical_channels_condition.notify()
434                finally:
435                    self._logical_channels_condition.release()
436            elif block.opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT:
437                self._channel_slots.extend([block.send_quota] * block.slots)
438
439    def _dispatch_frame(self, channel_id, payload):
440        if channel_id == _CONTROL_CHANNEL_ID:
441            try:
442                self._control_blocks_condition.acquire()
443                self._control_blocks += _parse_control_blocks(payload)
444                self._process_mux_control_blocks()
445                self._control_blocks_condition.notify()
446            finally:
447                self._control_blocks_condition.release()
448        else:
449            try:
450                self._logical_channels_condition.acquire()
451                if not channel_id in self._logical_channels:
452                    raise Exception('Received logical frame on channel id '
453                                    '%d, which is not established' %
454                                    channel_id)
455
456                inner_frame = self._parse_inner_frame(payload)
457                self._logical_channels[channel_id].receive_quota -= (
458                        len(inner_frame.payload))
459                if self._logical_channels[channel_id].receive_quota < 0:
460                    raise Exception('The server violates quota on '
461                                    'channel id %d' % channel_id)
462            finally:
463                self._logical_channels_condition.release()
464            self._logical_channels[channel_id].queue.put(inner_frame)
465
466    def _process_control_message(self, opcode, message):
467        # Ping/Pong are not supported.
468        if opcode == client_for_testing.OPCODE_CLOSE:
469            self._physical_connection_close_message = message
470            if self._is_active:
471                self._stream.send_close(
472                    code=client_for_testing.STATUS_NORMAL_CLOSURE, reason='')
473                self._read_thread.request_stop()
474
475            if self._physical_connection_close_event:
476                self._physical_connection_close_event.set()
477
478    def _notify_reader_done(self):
479        self._logger.debug('Read thread terminated.')
480        self.close_socket()
481
482    def _assert_channel_slot_available(self):
483        try:
484            self._control_blocks_condition.acquire()
485            if len(self._channel_slots) == 0:
486                # Wait once
487                self._control_blocks_condition.wait(timeout=self._timeout)
488        finally:
489            self._control_blocks_condition.release()
490
491        if len(self._channel_slots) == 0:
492            raise Exception('Failed to receive NewChannelSlot')
493
494    def _assert_send_quota_available(self, channel_id):
495        try:
496            self._logical_channels_condition.acquire()
497            if self._logical_channels[channel_id].send_quota == 0:
498                # Wait once
499                self._logical_channels_condition.wait(timeout=self._timeout)
500        finally:
501            self._logical_channels_condition.release()
502
503        if self._logical_channels[channel_id].send_quota == 0:
504            raise Exception('Failed to receive FlowControl for channel id %d' %
505                            channel_id)
506
507    def connect(self):
508        self._socket = socket.socket()
509        self._socket.settimeout(self._options.socket_timeout)
510
511        self._socket.connect((self._options.server_host,
512                              self._options.server_port))
513        if self._options.use_tls:
514            self._socket = _TLSSocket(self._socket)
515
516        self._handshake.handshake(self._socket)
517        self._stream = client_for_testing.WebSocketStream(
518            self._socket, self._handshake)
519
520        self._logical_channels[_DEFAULT_CHANNEL_ID] = _LogicalChannelData()
521
522        self._read_thread = _MuxReaderThread(self)
523        self._read_thread.start()
524
525        self._assert_channel_slot_available()
526        self._assert_send_quota_available(_DEFAULT_CHANNEL_ID)
527
528        self._is_active = True
529        self._logger.info('Connection established')
530
531    def add_channel(self, channel_id, options):
532        if not self._is_active:
533            raise Exception('Mux client is not active')
534
535        if channel_id in self._logical_channels:
536            raise Exception('Channel id %d already exists' % channel_id)
537
538        try:
539            send_quota = self._channel_slots.popleft()
540        except IndexError, e:
541            raise Exception('No channel slots: %r' % e)
542
543        # Create AddChannel request
544        request_line = 'GET %s HTTP/1.1\r\n' % options.resource
545        fields = []
546        fields.append('Upgrade: websocket\r\n')
547        fields.append('Connection: Upgrade\r\n')
548        if options.server_port == client_for_testing.DEFAULT_PORT:
549            fields.append('Host: %s\r\n' % options.server_host.lower())
550        else:
551            fields.append('Host: %s:%d\r\n' % (options.server_host.lower(),
552                                               options.server_port))
553        fields.append('Origin: %s\r\n' % options.origin.lower())
554
555        original_key = os.urandom(16)
556        key = base64.b64encode(original_key)
557        fields.append('Sec-WebSocket-Key: %s\r\n' % key)
558
559        fields.append('Sec-WebSocket-Version: 13\r\n')
560
561        if len(options.extensions) > 0:
562            fields.append('Sec-WebSocket-Extensions: %s\r\n' %
563                          ', '.join(options.extensions))
564
565        handshake = request_line + ''.join(fields) + '\r\n'
566        add_channel_request = _create_add_channel_request(
567            channel_id, handshake)
568        payload = _encode_channel_id(_CONTROL_CHANNEL_ID) + add_channel_request
569        self._stream.send_binary(payload)
570
571        # Wait AddChannelResponse
572        self._logger.debug('Waiting AddChannelResponse for the request...')
573        response = None
574        try:
575            self._control_blocks_condition.acquire()
576            while True:
577                for block in self._control_blocks:
578                    if block.opcode != _MUX_OPCODE_ADD_CHANNEL_RESPONSE:
579                        continue
580                    if block.channel_id == channel_id:
581                        response = block
582                        self._control_blocks.remove(response)
583                        break
584                if response:
585                    break
586                self._control_blocks_condition.wait(self._timeout)
587                if not self._is_active:
588                    raise Exception('AddChannelRequest timed out')
589        finally:
590            self._control_blocks_condition.release()
591
592        # Validate AddChannelResponse
593        if response.rejected:
594            raise Exception('The server rejected AddChannelRequest')
595
596        fields = _parse_handshake_response(response.encoded_handshake)
597
598        if not 'upgrade' in fields:
599            raise Exception('No Upgrade header')
600        if fields['upgrade'] != 'websocket':
601            raise Exception('Wrong Upgrade header')
602        if not 'connection' in fields:
603            raise Exception('No Connection header')
604        if fields['connection'] != 'Upgrade':
605            raise Exception('Wrong Connection header')
606        if not 'sec-websocket-accept' in fields:
607            raise Exception('No Sec-WebSocket-Accept header')
608
609        accept = fields['sec-websocket-accept']
610        try:
611            decoded_accept = base64.b64decode(accept)
612        except TypeError, e:
613            raise Exception(
614                'Illegal value for header Sec-WebSocket-Accept: ' + accept)
615
616        if len(decoded_accept) != 20:
617            raise Exception(
618                'Decoded value of Sec-WebSocket-Accept is not 20-byte long')
619
620        original_expected_accept = util.sha1_hash(
621            key + client_for_testing.WEBSOCKET_ACCEPT_UUID).digest()
622        expected_accept = base64.b64encode(original_expected_accept)
623
624        if accept != expected_accept:
625            raise Exception(
626                'Invalid Sec-WebSocket-Accept header: %r (expected) != %r '
627                '(actual)' % (accept, expected_accept))
628
629        self._logical_channels_condition.acquire()
630        self._logical_channels[channel_id] = _LogicalChannelData()
631        self._logical_channels[channel_id].send_quota = send_quota
632        self._logical_channels_condition.release()
633
634        self._logger.debug('Logical channel %d established' % channel_id)
635
636    def _check_logical_channel_is_opened(self, channel_id):
637        if not self._is_active:
638            raise Exception('Mux client is not active')
639
640        if not channel_id in self._logical_channels:
641            raise Exception('Logical channel %d is not established.')
642
643    def drop_channel(self, channel_id):
644        # TODO(bashi): Implement
645        pass
646
647    def send_flow_control(self, channel_id, replenished_quota):
648        self._check_logical_channel_is_opened(channel_id)
649        flow_control = _create_flow_control(channel_id, replenished_quota)
650        payload = _encode_channel_id(_CONTROL_CHANNEL_ID) + flow_control
651        # Replenish receive quota
652        try:
653            self._logical_channels_condition.acquire()
654            self._logical_channels[channel_id].receive_quota += (
655                replenished_quota)
656        finally:
657            self._logical_channels_condition.release()
658        self._stream.send_binary(payload)
659
660    def send_message(self, channel_id, message, end=True, binary=False):
661        self._check_logical_channel_is_opened(channel_id)
662
663        if binary:
664            first_byte = (end << 7) | client_for_testing.OPCODE_BINARY
665        else:
666            first_byte = (end << 7) | client_for_testing.OPCODE_TEXT
667            message = message.encode('utf-8')
668
669        try:
670            self._logical_channels_condition.acquire()
671            if self._logical_channels[channel_id].send_quota < len(message):
672                raise Exception('Send quota violation: %d < %d' % (
673                        self._logical_channels[channel_id].send_quota,
674                        len(message)))
675
676            self._logical_channels[channel_id].send_quota -= len(message)
677        finally:
678            self._logical_channels_condition.release()
679        payload = _encode_channel_id(channel_id) + chr(first_byte) + message
680        self._stream.send_binary(payload)
681
682    def assert_receive(self, channel_id, payload, binary=False):
683        self._check_logical_channel_is_opened(channel_id)
684
685        try:
686            inner_frame = self._logical_channels[channel_id].queue.get(
687                timeout=self._timeout)
688        except Queue.Empty, e:
689            raise Exception('Cannot receive message from channel id %d' %
690                            channel_id)
691
692        if binary:
693            opcode = client_for_testing.OPCODE_BINARY
694        else:
695            opcode = client_for_testing.OPCODE_TEXT
696
697        if inner_frame.opcode != opcode:
698            raise Exception('Unexpected opcode received (%r != %r)' %
699                            (expected_opcode, inner_frame.opcode))
700
701        if inner_frame.payload != payload:
702            raise Exception('Unexpected payload received')
703
704    def send_close(self, channel_id, code=None, reason=''):
705        self._check_logical_channel_is_opened(channel_id)
706
707        if code is not None:
708            body = struct.pack('!H', code) + reason.encode('utf-8')
709        else:
710            body = ''
711
712        first_byte = (1 << 7) | client_for_testing.OPCODE_CLOSE
713        payload = _encode_channel_id(channel_id) + chr(first_byte) + body
714        self._stream.send_binary(payload)
715
716    def assert_receive_close(self, channel_id):
717        self._check_logical_channel_is_opened(channel_id)
718
719        try:
720            inner_frame = self._logical_channels[channel_id].queue.get(
721                timeout=self._timeout)
722        except Queue.Empty, e:
723            raise Exception('Cannot receive message from channel id %d' %
724                            channel_id)
725        if inner_frame.opcode != client_for_testing.OPCODE_CLOSE:
726            raise Exception('Didn\'t receive close frame')
727
728    def send_physical_connection_close(self, code=None, reason=''):
729        self._physical_connection_close_event = threading.Event()
730        self._stream.send_close(code, reason)
731
732    # This method can be used only after calling
733    # send_physical_connection_close().
734    def assert_physical_connection_receive_close(
735        self, code=client_for_testing.STATUS_NORMAL_CLOSURE, reason=''):
736        self._physical_connection_close_event.wait(timeout=self._timeout)
737        if (not self._physical_connection_close_event.isSet() or
738            not self._physical_connection_close_message):
739            raise Exception('Didn\'t receive closing handshake')
740
741    def close_socket(self):
742        self._is_active = False
743        self._socket.close()
744