1# Copyright 2012, Google Inc.
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met:
7#
8#     * Redistributions of source code must retain the above copyright
9# notice, this list of conditions and the following disclaimer.
10#     * Redistributions in binary form must reproduce the above
11# copyright notice, this list of conditions and the following disclaimer
12# in the documentation and/or other materials provided with the
13# distribution.
14#     * Neither the name of Google Inc. nor the names of its
15# contributors may be used to endorse or promote products derived from
16# this software without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31"""This file provides classes and helper functions for multiplexing extension.
32
33Specification:
34http://tools.ietf.org/html/draft-ietf-hybi-websocket-multiplexing-03
35"""
36
37
38import collections
39import copy
40import email
41import email.parser
42import logging
43import math
44import struct
45import threading
46import traceback
47
48from mod_pywebsocket import common
49from mod_pywebsocket import handshake
50from mod_pywebsocket import util
51from mod_pywebsocket._stream_base import BadOperationException
52from mod_pywebsocket._stream_base import ConnectionTerminatedException
53from mod_pywebsocket._stream_hybi import Frame
54from mod_pywebsocket._stream_hybi import Stream
55from mod_pywebsocket._stream_hybi import StreamOptions
56from mod_pywebsocket._stream_hybi import create_binary_frame
57from mod_pywebsocket._stream_hybi import create_closing_handshake_body
58from mod_pywebsocket._stream_hybi import create_header
59from mod_pywebsocket._stream_hybi import parse_frame
60from mod_pywebsocket.handshake import hybi
61
62
63_CONTROL_CHANNEL_ID = 0
64_DEFAULT_CHANNEL_ID = 1
65
66_MUX_OPCODE_ADD_CHANNEL_REQUEST = 0
67_MUX_OPCODE_ADD_CHANNEL_RESPONSE = 1
68_MUX_OPCODE_FLOW_CONTROL = 2
69_MUX_OPCODE_DROP_CHANNEL = 3
70_MUX_OPCODE_NEW_CHANNEL_SLOT = 4
71
72_MAX_CHANNEL_ID = 2 ** 29 - 1
73
74_INITIAL_NUMBER_OF_CHANNEL_SLOTS = 64
75_INITIAL_QUOTA_FOR_CLIENT = 8 * 1024
76
77# We need only these status code for now.
78_HTTP_BAD_RESPONSE_MESSAGES = {
79    common.HTTP_STATUS_BAD_REQUEST: 'Bad Request',
80}
81
82
83class MuxUnexpectedException(Exception):
84    """Exception in handling multiplexing extension."""
85    pass
86
87
88# Temporary
89class MuxNotImplementedException(Exception):
90    """Raised when a flow enters unimplemented code path."""
91    pass
92
93
94class InvalidMuxFrameException(Exception):
95    """Raised when an invalid multiplexed frame received."""
96    pass
97
98
99class InvalidMuxControlBlockException(Exception):
100    """Raised when an invalid multiplexing control block received."""
101    pass
102
103
104class LogicalConnectionClosedException(Exception):
105    """Raised when logical connection is gracefully closed."""
106    pass
107
108
109def _encode_channel_id(channel_id):
110    if channel_id < 0:
111        raise ValueError('Channel id %d must not be negative' % channel_id)
112
113    if channel_id < 2 ** 7:
114        return chr(channel_id)
115    if channel_id < 2 ** 14:
116        return struct.pack('!H', 0x8000 + channel_id)
117    if channel_id < 2 ** 21:
118        first = chr(0xc0 + (channel_id >> 16))
119        return first + struct.pack('!H', channel_id & 0xffff)
120    if channel_id < 2 ** 29:
121        return struct.pack('!L', 0xe0000000 + channel_id)
122
123    raise ValueError('Channel id %d is too large' % channel_id)
124
125
126def _size_of_number_in_bytes_minus_1(number):
127    # Calculate the minimum number of bytes minus 1 that are required to store
128    # the data.
129    if number < 0:
130        raise ValueError('Invalid number: %d' % number)
131    elif number < 2 ** 8:
132        return 0
133    elif number < 2 ** 16:
134        return 1
135    elif number < 2 ** 24:
136        return 2
137    elif number < 2 ** 32:
138        return 3
139    else:
140        raise ValueError('Invalid number %d' % number)
141
142
143def _encode_number(number):
144    if number < 2 ** 8:
145        return chr(number)
146    elif number < 2 ** 16:
147        return struct.pack('!H', number)
148    elif number < 2 ** 24:
149        return chr(number >> 16) + struct.pack('!H', number & 0xffff)
150    else:
151        return struct.pack('!L', number)
152
153
154def _create_control_block_length_value(channel_id, opcode, flags, value):
155    """Creates a control block that consists of objective channel id, opcode,
156    flags, encoded length of opcode specific value, and the value.
157    Most of control blocks have this structure.
158
159    Args:
160        channel_id: objective channel id.
161        opcode: opcode of the control block.
162        flags: 3bit opcode specific flags.
163        value: opcode specific data.
164    """
165
166    if channel_id < 0 or channel_id > _MAX_CHANNEL_ID:
167        raise ValueError('Invalid channel id: %d' % channel_id)
168    if (opcode != _MUX_OPCODE_ADD_CHANNEL_REQUEST and
169        opcode != _MUX_OPCODE_ADD_CHANNEL_RESPONSE and
170        opcode != _MUX_OPCODE_DROP_CHANNEL):
171        raise ValueError('Invalid opcode: %d' % opcode)
172    if flags < 0 or flags > 7:
173        raise ValueError('Invalid flags: %x' % flags)
174    length = len(value)
175    if length < 0 or length > 2 ** 32 - 1:
176        raise ValueError('Invalid length: %d' % length)
177
178    # The first byte consists of opcode, opcode specific flags, and size of
179    # the size of value in bytes minus 1.
180    bytes_of_length = _size_of_number_in_bytes_minus_1(length)
181    first_byte = (opcode << 5) | (flags << 2) | bytes_of_length
182
183    encoded_length = _encode_number(length)
184
185    return (chr(first_byte) + _encode_channel_id(channel_id) +
186            encoded_length + value)
187
188
189def _create_add_channel_response(channel_id, encoded_handshake,
190                                 encoding=0, rejected=False,
191                                 outer_frame_mask=False):
192    if encoding != 0 and encoding != 1:
193        raise ValueError('Invalid encoding %d' % encoding)
194
195    flags = (rejected << 2) | encoding
196    block = _create_control_block_length_value(
197        channel_id, _MUX_OPCODE_ADD_CHANNEL_RESPONSE, flags, encoded_handshake)
198    payload = _encode_channel_id(_CONTROL_CHANNEL_ID) + block
199    return create_binary_frame(payload, mask=outer_frame_mask)
200
201
202def _create_drop_channel(channel_id, reason='', mux_error=False,
203                         outer_frame_mask=False):
204    if not mux_error and len(reason) > 0:
205        raise ValueError('Reason must be empty if mux_error is False')
206
207    flags = mux_error << 2
208    block = _create_control_block_length_value(
209        channel_id, _MUX_OPCODE_DROP_CHANNEL, flags, reason)
210    payload = _encode_channel_id(_CONTROL_CHANNEL_ID) + block
211    return create_binary_frame(payload, mask=outer_frame_mask)
212
213
214def _create_flow_control(channel_id, replenished_quota,
215                         outer_frame_mask=False):
216    if replenished_quota < 0 or replenished_quota >= 2 ** 32:
217        raise ValueError('Invalid quota: %d' % replenished_quota)
218    first_byte = ((_MUX_OPCODE_FLOW_CONTROL << 5) |
219                  _size_of_number_in_bytes_minus_1(replenished_quota))
220    payload = (_encode_channel_id(_CONTROL_CHANNEL_ID) + chr(first_byte) +
221               _encode_channel_id(channel_id) +
222               _encode_number(replenished_quota))
223    return create_binary_frame(payload, mask=outer_frame_mask)
224
225
226def _create_new_channel_slot(slots, send_quota, outer_frame_mask=False):
227    if slots < 0 or slots >= 2 ** 32:
228        raise ValueError('Invalid number of slots: %d' % slots)
229    if send_quota < 0 or send_quota >= 2 ** 32:
230        raise ValueError('Invalid send quota: %d' % send_quota)
231    slots_size = _size_of_number_in_bytes_minus_1(slots)
232    send_quota_size = _size_of_number_in_bytes_minus_1(send_quota)
233
234    first_byte = ((_MUX_OPCODE_NEW_CHANNEL_SLOT << 5) |
235                  (slots_size << 2) | send_quota_size)
236    payload = (_encode_channel_id(_CONTROL_CHANNEL_ID) + chr(first_byte) +
237               _encode_number(slots) + _encode_number(send_quota))
238    return create_binary_frame(payload, mask=outer_frame_mask)
239
240
241def _parse_request_text(request_text):
242    request_line, header_lines = request_text.split('\r\n', 1)
243
244    words = request_line.split(' ')
245    if len(words) != 3:
246        raise ValueError('Bad Request-Line syntax %r' % request_line)
247    [command, path, version] = words
248    if version != 'HTTP/1.1':
249        raise ValueError('Bad request version %r' % version)
250
251    # email.parser.Parser() parses RFC 2822 (RFC 822) style headers.
252    # RFC 6455 refers RFC 2616 for handshake parsing, and RFC 2616 refers
253    # RFC 822.
254    headers = email.parser.Parser().parsestr(header_lines)
255    return command, path, version, headers
256
257
258class _ControlBlock(object):
259    """A structure that holds parsing result of multiplexing control block.
260    Control block specific attributes will be added by _MuxFramePayloadParser.
261    (e.g. encoded_handshake will be added for AddChannelRequest and
262    AddChannelResponse)
263    """
264
265    def __init__(self, opcode):
266        self.opcode = opcode
267
268
269class _MuxFramePayloadParser(object):
270    """A class that parses multiplexed frame payload."""
271
272    def __init__(self, payload):
273        self._data = payload
274        self._read_position = 0
275        self._logger = util.get_class_logger(self)
276
277    def read_channel_id(self):
278        """Reads channel id.
279
280        Raises:
281            InvalidMuxFrameException: when the payload doesn't contain
282                valid channel id.
283        """
284
285        remaining_length = len(self._data) - self._read_position
286        pos = self._read_position
287        if remaining_length == 0:
288            raise InvalidMuxFrameException('No channel id found')
289
290        channel_id = ord(self._data[pos])
291        channel_id_length = 1
292        if channel_id & 0xe0 == 0xe0:
293            if remaining_length < 4:
294                raise InvalidMuxFrameException(
295                    'Invalid channel id format')
296            channel_id = struct.unpack('!L',
297                                       self._data[pos:pos+4])[0] & 0x1fffffff
298            channel_id_length = 4
299        elif channel_id & 0xc0 == 0xc0:
300            if remaining_length < 3:
301                raise InvalidMuxFrameException(
302                    'Invalid channel id format')
303            channel_id = (((channel_id & 0x1f) << 16) +
304                          struct.unpack('!H', self._data[pos+1:pos+3])[0])
305            channel_id_length = 3
306        elif channel_id & 0x80 == 0x80:
307            if remaining_length < 2:
308                raise InvalidMuxFrameException(
309                    'Invalid channel id format')
310            channel_id = struct.unpack('!H',
311                                       self._data[pos:pos+2])[0] & 0x3fff
312            channel_id_length = 2
313        self._read_position += channel_id_length
314
315        return channel_id
316
317    def read_inner_frame(self):
318        """Reads an inner frame.
319
320        Raises:
321            InvalidMuxFrameException: when the inner frame is invalid.
322        """
323
324        if len(self._data) == self._read_position:
325            raise InvalidMuxFrameException('No inner frame bits found')
326        bits = ord(self._data[self._read_position])
327        self._read_position += 1
328        fin = (bits & 0x80) == 0x80
329        rsv1 = (bits & 0x40) == 0x40
330        rsv2 = (bits & 0x20) == 0x20
331        rsv3 = (bits & 0x10) == 0x10
332        opcode = bits & 0xf
333        payload = self.remaining_data()
334        # Consume rest of the message which is payload data of the original
335        # frame.
336        self._read_position = len(self._data)
337        return fin, rsv1, rsv2, rsv3, opcode, payload
338
339    def _read_number(self, size):
340        if self._read_position + size > len(self._data):
341            raise InvalidMuxControlBlock(
342                'Cannot read %d byte(s) number' % size)
343
344        pos = self._read_position
345        if size == 1:
346            self._read_position += 1
347            return ord(self._data[pos])
348        elif size == 2:
349            self._read_position += 2
350            return struct.unpack('!H', self._data[pos:pos+2])[0]
351        elif size == 3:
352            self._read_position += 3
353            return ((ord(self._data[pos]) << 16)
354                    + struct.unpack('!H', self._data[pos+1:pos+3])[0])
355        elif size == 4:
356            self._read_position += 4
357            return struct.unpack('!L', self._data[pos:pos+4])[0]
358        else:
359            raise InvalidMuxControlBlockException(
360                'Cannot read %d byte(s) number' % size)
361
362    def _read_opcode_specific_data(self, opcode, size_of_size):
363        """Reads opcode specific data that consists of followings:
364            - the size of the opcode specific data (1-4 bytes)
365            - the opcode specific data
366        AddChannelRequest and DropChannel have this structure.
367        """
368
369        if self._read_position + size_of_size > len(self._data):
370            raise InvalidMuxControlBlockException(
371                'No size field for opcode %d' % opcode)
372
373        size = self._read_number(size_of_size)
374
375        pos = self._read_position
376        if pos + size > len(self._data):
377            raise InvalidMuxControlBlockException(
378                'No data field for opcode %d (%d + %d > %d)' %
379                (opcode, pos, size, len(self._data)))
380
381        specific_data = self._data[pos:pos+size]
382        self._read_position += size
383        return specific_data
384
385    def _read_add_channel_request(self, first_byte, control_block):
386        reserved = (first_byte >> 4) & 0x1
387        encoding = (first_byte >> 2) & 0x3
388        size_of_handshake_size = (first_byte & 0x3) + 1
389
390        control_block.channel_id = self.read_channel_id()
391        encoded_handshake = self._read_opcode_specific_data(
392                                _MUX_OPCODE_ADD_CHANNEL_REQUEST,
393                                size_of_handshake_size)
394        control_block.encoding = encoding
395        control_block.encoded_handshake = encoded_handshake
396        return control_block
397
398    def _read_flow_control(self, first_byte, control_block):
399        quota_size = (first_byte & 0x3) + 1
400        control_block.channel_id = self.read_channel_id()
401        control_block.send_quota = self._read_number(quota_size)
402        return control_block
403
404    def _read_drop_channel(self, first_byte, control_block):
405        mux_error = (first_byte >> 4) & 0x1
406        reserved = (first_byte >> 2) & 0x3
407        size_of_reason_size = (first_byte & 0x3) + 1
408
409        control_block.channel_id = self.read_channel_id()
410        reason = self._read_opcode_specific_data(
411                     _MUX_OPCODE_ADD_CHANNEL_RESPONSE,
412                     size_of_reason_size)
413        if mux_error and len(reason) > 0:
414            raise InvalidMuxControlBlockException(
415                'Reason must be empty when F bit is set')
416        control_block.mux_error = mux_error
417        control_block.reason = reason
418        return control_block
419
420    def _read_new_channel_slot(self, first_byte, control_block):
421        # TODO(bashi): Implement
422        raise MuxNotImplementedException('NewChannelSlot is not implemented')
423
424    def read_control_blocks(self):
425        """Reads control block(s).
426
427        Raises:
428           InvalidMuxControlBlock: when the payload contains invalid control
429               block(s).
430           StopIteration: when no control blocks left.
431        """
432
433        while self._read_position < len(self._data):
434            if self._read_position >= len(self._data):
435                raise InvalidMuxControlBlockException(
436                    'No control opcode found')
437            first_byte = ord(self._data[self._read_position])
438            self._read_position += 1
439            opcode = (first_byte >> 5) & 0x7
440            control_block = _ControlBlock(opcode=opcode)
441            if opcode == _MUX_OPCODE_ADD_CHANNEL_REQUEST:
442                yield self._read_add_channel_request(first_byte, control_block)
443            elif opcode == _MUX_OPCODE_FLOW_CONTROL:
444                yield self._read_flow_control(first_byte, control_block)
445            elif opcode == _MUX_OPCODE_DROP_CHANNEL:
446                yield self._read_drop_channel(first_byte, control_block)
447            elif opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT:
448                yield self._read_new_channel_slot(first_byte, control_block)
449            else:
450                raise InvalidMuxControlBlockException(
451                    'Invalid opcode %d' % opcode)
452        assert self._read_position == len(self._data)
453        raise StopIteration
454
455    def remaining_data(self):
456        """Returns remaining data."""
457
458        return self._data[self._read_position:]
459
460
461class _LogicalRequest(object):
462    """Mimics mod_python request."""
463
464    def __init__(self, channel_id, command, path, headers, connection):
465        """Constructs an instance.
466
467        Args:
468            channel_id: the channel id of the logical channel.
469            command: HTTP request command.
470            path: HTTP request path.
471            headers: HTTP headers.
472            connection: _LogicalConnection instance.
473        """
474
475        self.channel_id = channel_id
476        self.method = command
477        self.uri = path
478        self.headers_in = headers
479        self.connection = connection
480        self.server_terminated = False
481        self.client_terminated = False
482
483    def is_https(self):
484        """Mimics request.is_https(). Returns False because this method is
485        used only by old protocols (hixie and hybi00).
486        """
487
488        return False
489
490
491class _LogicalConnection(object):
492    """Mimics mod_python mp_conn."""
493
494    # For details, see the comment of set_read_state().
495    STATE_ACTIVE = 1
496    STATE_GRACEFULLY_CLOSED = 2
497    STATE_TERMINATED = 3
498
499    def __init__(self, mux_handler, channel_id):
500        """Constructs an instance.
501
502        Args:
503            mux_handler: _MuxHandler instance.
504            channel_id: channel id of this connection.
505        """
506
507        self._mux_handler = mux_handler
508        self._channel_id = channel_id
509        self._incoming_data = ''
510        self._write_condition = threading.Condition()
511        self._waiting_write_completion = False
512        self._read_condition = threading.Condition()
513        self._read_state = self.STATE_ACTIVE
514
515    def get_local_addr(self):
516        """Getter to mimic mp_conn.local_addr."""
517
518        return self._mux_handler.physical_connection.get_local_addr()
519    local_addr = property(get_local_addr)
520
521    def get_remote_addr(self):
522        """Getter to mimic mp_conn.remote_addr."""
523
524        return self._mux_handler.physical_connection.get_remote_addr()
525    remote_addr = property(get_remote_addr)
526
527    def get_memorized_lines(self):
528        """Gets memorized lines. Not supported."""
529
530        raise MuxUnexpectedException('_LogicalConnection does not support '
531                                     'get_memorized_lines')
532
533    def write(self, data):
534        """Writes data. mux_handler sends data asynchronously. The caller will
535        be suspended until write done.
536
537        Args:
538            data: data to be written.
539
540        Raises:
541            MuxUnexpectedException: when called before finishing the previous
542                write.
543        """
544
545        try:
546            self._write_condition.acquire()
547            if self._waiting_write_completion:
548                raise MuxUnexpectedException(
549                    'Logical connection %d is already waiting the completion '
550                    'of write' % self._channel_id)
551
552            self._waiting_write_completion = True
553            self._mux_handler.send_data(self._channel_id, data)
554            self._write_condition.wait()
555        finally:
556            self._write_condition.release()
557
558    def write_control_data(self, data):
559        """Writes data via the control channel. Don't wait finishing write
560        because this method can be called by mux dispatcher.
561
562        Args:
563            data: data to be written.
564        """
565
566        self._mux_handler.send_control_data(data)
567
568    def notify_write_done(self):
569        """Called when sending data is completed."""
570
571        try:
572            self._write_condition.acquire()
573            if not self._waiting_write_completion:
574                raise MuxUnexpectedException(
575                    'Invalid call of notify_write_done for logical connection'
576                    ' %d' % self._channel_id)
577            self._waiting_write_completion = False
578            self._write_condition.notify()
579        finally:
580            self._write_condition.release()
581
582    def append_frame_data(self, frame_data):
583        """Appends incoming frame data. Called when mux_handler dispatches
584        frame data to the corresponding application.
585
586        Args:
587            frame_data: incoming frame data.
588        """
589
590        self._read_condition.acquire()
591        self._incoming_data += frame_data
592        self._read_condition.notify()
593        self._read_condition.release()
594
595    def read(self, length):
596        """Reads data. Blocks until enough data has arrived via physical
597        connection.
598
599        Args:
600            length: length of data to be read.
601        Raises:
602            LogicalConnectionClosedException: when closing handshake for this
603                logical channel has been received.
604            ConnectionTerminatedException: when the physical connection has
605                closed, or an error is caused on the reader thread.
606        """
607
608        self._read_condition.acquire()
609        while (self._read_state == self.STATE_ACTIVE and
610               len(self._incoming_data) < length):
611            self._read_condition.wait()
612
613        try:
614            if self._read_state == self.STATE_GRACEFULLY_CLOSED:
615                raise LogicalConnectionClosedException(
616                    'Logical channel %d has closed.' % self._channel_id)
617            elif self._read_state == self.STATE_TERMINATED:
618                raise ConnectionTerminatedException(
619                    'Receiving %d byte failed. Logical channel (%d) closed' %
620                    (length, self._channel_id))
621
622            value = self._incoming_data[:length]
623            self._incoming_data = self._incoming_data[length:]
624        finally:
625            self._read_condition.release()
626
627        return value
628
629    def set_read_state(self, new_state):
630        """Sets the state of this connection. Called when an event for this
631        connection has occurred.
632
633        Args:
634            new_state: state to be set. new_state must be one of followings:
635            - STATE_GRACEFULLY_CLOSED: when closing handshake for this
636                connection has been received.
637            - STATE_TERMINATED: when the physical connection has closed or
638                DropChannel of this connection has received.
639        """
640
641        self._read_condition.acquire()
642        self._read_state = new_state
643        self._read_condition.notify()
644        self._read_condition.release()
645
646
647class _LogicalStream(Stream):
648    """Mimics the Stream class. This class interprets multiplexed WebSocket
649    frames.
650    """
651
652    def __init__(self, request, send_quota, receive_quota):
653        """Constructs an instance.
654
655        Args:
656            request: _LogicalRequest instance.
657            send_quota: Initial send quota.
658            receive_quota: Initial receive quota.
659        """
660
661        # TODO(bashi): Support frame filters.
662        stream_options = StreamOptions()
663        # Physical stream is responsible for masking.
664        stream_options.unmask_receive = False
665        # Control frames can be fragmented on logical channel.
666        stream_options.allow_fragmented_control_frame = True
667        Stream.__init__(self, request, stream_options)
668        self._send_quota = send_quota
669        self._send_quota_condition = threading.Condition()
670        self._receive_quota = receive_quota
671        self._write_inner_frame_semaphore = threading.Semaphore()
672
673    def _create_inner_frame(self, opcode, payload, end=True):
674        # TODO(bashi): Support extensions that use reserved bits.
675        first_byte = (end << 7) | opcode
676        return (_encode_channel_id(self._request.channel_id) +
677                chr(first_byte) + payload)
678
679    def _write_inner_frame(self, opcode, payload, end=True):
680        payload_length = len(payload)
681        write_position = 0
682
683        try:
684            # An inner frame will be fragmented if there is no enough send
685            # quota. This semaphore ensures that fragmented inner frames are
686            # sent in order on the logical channel.
687            # Note that frames that come from other logical channels or
688            # multiplexing control blocks can be inserted between fragmented
689            # inner frames on the physical channel.
690            self._write_inner_frame_semaphore.acquire()
691            while write_position < payload_length:
692                try:
693                    self._send_quota_condition.acquire()
694                    while self._send_quota == 0:
695                        self._logger.debug(
696                            'No quota. Waiting FlowControl message for %d.' %
697                            self._request.channel_id)
698                        self._send_quota_condition.wait()
699
700                    remaining = payload_length - write_position
701                    write_length = min(self._send_quota, remaining)
702                    inner_frame_end = (
703                        end and
704                        (write_position + write_length == payload_length))
705
706                    inner_frame = self._create_inner_frame(
707                        opcode,
708                        payload[write_position:write_position+write_length],
709                        inner_frame_end)
710                    frame_data = self._writer.build(
711                        inner_frame, end=True, binary=True)
712                    self._send_quota -= write_length
713                    self._logger.debug('Consumed quota=%d, remaining=%d' %
714                                       (write_length, self._send_quota))
715                finally:
716                    self._send_quota_condition.release()
717
718                # Writing data will block the worker so we need to release
719                # _send_quota_condition before writing.
720                self._logger.debug('Sending inner frame: %r' % frame_data)
721                self._request.connection.write(frame_data)
722                write_position += write_length
723
724                opcode = common.OPCODE_CONTINUATION
725
726        except ValueError, e:
727            raise BadOperationException(e)
728        finally:
729            self._write_inner_frame_semaphore.release()
730
731    def replenish_send_quota(self, send_quota):
732        """Replenish send quota."""
733
734        self._send_quota_condition.acquire()
735        self._send_quota += send_quota
736        self._logger.debug('Replenished send quota for channel id %d: %d' %
737                           (self._request.channel_id, self._send_quota))
738        self._send_quota_condition.notify()
739        self._send_quota_condition.release()
740
741    def consume_receive_quota(self, amount):
742        """Consumes receive quota. Returns False on failure."""
743
744        if self._receive_quota < amount:
745            self._logger.debug('Violate quota on channel id %d: %d < %d' %
746                               (self._request.channel_id,
747                                self._receive_quota, amount))
748            return False
749        self._receive_quota -= amount
750        return True
751
752    def send_message(self, message, end=True, binary=False):
753        """Override Stream.send_message."""
754
755        if self._request.server_terminated:
756            raise BadOperationException(
757                'Requested send_message after sending out a closing handshake')
758
759        if binary and isinstance(message, unicode):
760            raise BadOperationException(
761                'Message for binary frame must be instance of str')
762
763        if binary:
764            opcode = common.OPCODE_BINARY
765        else:
766            opcode = common.OPCODE_TEXT
767            message = message.encode('utf-8')
768
769        self._write_inner_frame(opcode, message, end)
770
771    def _receive_frame(self):
772        """Overrides Stream._receive_frame.
773
774        In addition to call Stream._receive_frame, this method adds the amount
775        of payload to receiving quota and sends FlowControl to the client.
776        We need to do it here because Stream.receive_message() handles
777        control frames internally.
778        """
779
780        opcode, payload, fin, rsv1, rsv2, rsv3 = Stream._receive_frame(self)
781        amount = len(payload)
782        self._receive_quota += amount
783        frame_data = _create_flow_control(self._request.channel_id,
784                                          amount)
785        self._logger.debug('Sending flow control for %d, replenished=%d' %
786                           (self._request.channel_id, amount))
787        self._request.connection.write_control_data(frame_data)
788        return opcode, payload, fin, rsv1, rsv2, rsv3
789
790    def receive_message(self):
791        """Overrides Stream.receive_message."""
792
793        # Just call Stream.receive_message(), but catch
794        # LogicalConnectionClosedException, which is raised when the logical
795        # connection has closed gracefully.
796        try:
797            return Stream.receive_message(self)
798        except LogicalConnectionClosedException, e:
799            self._logger.debug('%s', e)
800            return None
801
802    def _send_closing_handshake(self, code, reason):
803        """Overrides Stream._send_closing_handshake."""
804
805        body = create_closing_handshake_body(code, reason)
806        self._logger.debug('Sending closing handshake for %d: (%r, %r)' %
807                           (self._request.channel_id, code, reason))
808        self._write_inner_frame(common.OPCODE_CLOSE, body, end=True)
809
810        self._request.server_terminated = True
811
812    def send_ping(self, body=''):
813        """Overrides Stream.send_ping"""
814
815        self._logger.debug('Sending ping on logical channel %d: %r' %
816                           (self._request.channel_id, body))
817        self._write_inner_frame(common.OPCODE_PING, body, end=True)
818
819        self._ping_queue.append(body)
820
821    def _send_pong(self, body):
822        """Overrides Stream._send_pong"""
823
824        self._logger.debug('Sending pong on logical channel %d: %r' %
825                           (self._request.channel_id, body))
826        self._write_inner_frame(common.OPCODE_PONG, body, end=True)
827
828    def close_connection(self, code=common.STATUS_NORMAL_CLOSURE, reason=''):
829        """Overrides Stream.close_connection."""
830
831        # TODO(bashi): Implement
832        self._logger.debug('Closing logical connection %d' %
833                           self._request.channel_id)
834        self._request.server_terminated = True
835
836    def _drain_received_data(self):
837        """Overrides Stream._drain_received_data. Nothing need to be done for
838        logical channel.
839        """
840
841        pass
842
843
844class _OutgoingData(object):
845    """A structure that holds data to be sent via physical connection and
846    origin of the data.
847    """
848
849    def __init__(self, channel_id, data):
850        self.channel_id = channel_id
851        self.data = data
852
853
854class _PhysicalConnectionWriter(threading.Thread):
855    """A thread that is responsible for writing data to physical connection.
856
857    TODO(bashi): Make sure there is no thread-safety problem when the reader
858    thread reads data from the same socket at a time.
859    """
860
861    def __init__(self, mux_handler):
862        """Constructs an instance.
863
864        Args:
865            mux_handler: _MuxHandler instance.
866        """
867
868        threading.Thread.__init__(self)
869        self._logger = util.get_class_logger(self)
870        self._mux_handler = mux_handler
871        self.setDaemon(True)
872        self._stop_requested = False
873        self._deque = collections.deque()
874        self._deque_condition = threading.Condition()
875
876    def put_outgoing_data(self, data):
877        """Puts outgoing data.
878
879        Args:
880            data: _OutgoingData instance.
881
882        Raises:
883            BadOperationException: when the thread has been requested to
884                terminate.
885        """
886
887        try:
888            self._deque_condition.acquire()
889            if self._stop_requested:
890                raise BadOperationException('Cannot write data anymore')
891
892            self._deque.append(data)
893            self._deque_condition.notify()
894        finally:
895            self._deque_condition.release()
896
897    def _write_data(self, outgoing_data):
898        try:
899            self._mux_handler.physical_connection.write(outgoing_data.data)
900        except Exception, e:
901            util.prepend_message_to_exception(
902                'Failed to send message to %r: ' %
903                (self._mux_handler.physical_connection.remote_addr,), e)
904            raise
905
906        # TODO(bashi): It would be better to block the thread that sends
907        # control data as well.
908        if outgoing_data.channel_id != _CONTROL_CHANNEL_ID:
909            self._mux_handler.notify_write_done(outgoing_data.channel_id)
910
911    def run(self):
912        self._deque_condition.acquire()
913        while not self._stop_requested:
914            if len(self._deque) == 0:
915                self._deque_condition.wait()
916                continue
917
918            outgoing_data = self._deque.popleft()
919            self._deque_condition.release()
920            self._write_data(outgoing_data)
921            self._deque_condition.acquire()
922
923        # Flush deque
924        try:
925            while len(self._deque) > 0:
926                outgoing_data = self._deque.popleft()
927                self._write_data(outgoing_data)
928        finally:
929            self._deque_condition.release()
930
931    def stop(self):
932        """Stops the writer thread."""
933
934        self._deque_condition.acquire()
935        self._stop_requested = True
936        self._deque_condition.notify()
937        self._deque_condition.release()
938
939
940class _PhysicalConnectionReader(threading.Thread):
941    """A thread that is responsible for reading data from physical connection.
942    """
943
944    def __init__(self, mux_handler):
945        """Constructs an instance.
946
947        Args:
948            mux_handler: _MuxHandler instance.
949        """
950
951        threading.Thread.__init__(self)
952        self._logger = util.get_class_logger(self)
953        self._mux_handler = mux_handler
954        self.setDaemon(True)
955
956    def run(self):
957        while True:
958            try:
959                physical_stream = self._mux_handler.physical_stream
960                message = physical_stream.receive_message()
961                if message is None:
962                    break
963                opcode = physical_stream.get_last_received_opcode()
964                if opcode == common.OPCODE_TEXT:
965                    raise MuxUnexpectedException(
966                        'Received a text message on physical connection')
967            except ConnectionTerminatedException, e:
968                self._logger.debug('%s', e)
969                break
970
971            try:
972                self._mux_handler.dispatch_message(message)
973            except Exception, e:
974                self._logger.debug(traceback.format_exc())
975                break
976
977        self._mux_handler.notify_reader_done()
978
979
980class _Worker(threading.Thread):
981    """A thread that is responsible for running the corresponding application
982    handler.
983    """
984
985    def __init__(self, mux_handler, request):
986        """Constructs an instance.
987
988        Args:
989            mux_handler: _MuxHandler instance.
990            request: _LogicalRequest instance.
991        """
992
993        threading.Thread.__init__(self)
994        self._logger = util.get_class_logger(self)
995        self._mux_handler = mux_handler
996        self._request = request
997        self.setDaemon(True)
998
999    def run(self):
1000        self._logger.debug('Logical channel worker started. (id=%d)' %
1001                           self._request.channel_id)
1002        try:
1003            # Non-critical exceptions will be handled by dispatcher.
1004            self._mux_handler.dispatcher.transfer_data(self._request)
1005        finally:
1006            self._mux_handler.notify_worker_done(self._request.channel_id)
1007
1008
1009class _MuxHandshaker(hybi.Handshaker):
1010    """Opening handshake processor for multiplexing."""
1011
1012    def __init__(self, request, dispatcher, send_quota, receive_quota):
1013        """Constructs an instance.
1014        Args:
1015            request: _LogicalRequest instance.
1016            dispatcher: Dispatcher instance (dispatch.Dispatcher).
1017            send_quota: Initial send quota.
1018            receive_quota: Initial receive quota.
1019        """
1020
1021        hybi.Handshaker.__init__(self, request, dispatcher)
1022        self._send_quota = send_quota
1023        self._receive_quota = receive_quota
1024
1025    def _create_stream(self, stream_options):
1026        """Override hybi.Handshaker._create_stream."""
1027
1028        self._logger.debug('Creating logical stream for %d' %
1029                           self._request.channel_id)
1030        return _LogicalStream(self._request, self._send_quota,
1031                              self._receive_quota)
1032
1033    def _send_handshake(self, accept):
1034        """Override hybi.Handshaker._send_handshake."""
1035
1036        # Don't send handshake response for the default channel
1037        if self._request.channel_id == _DEFAULT_CHANNEL_ID:
1038            return
1039
1040        handshake_response = self._create_handshake_response(accept)
1041        frame_data = _create_add_channel_response(
1042                         self._request.channel_id,
1043                         handshake_response)
1044        self._logger.debug('Sending handshake response for %d: %r' %
1045                           (self._request.channel_id, frame_data))
1046        self._request.connection.write_control_data(frame_data)
1047
1048
1049class _LogicalChannelData(object):
1050    """A structure that holds information about logical channel.
1051    """
1052
1053    def __init__(self, request, worker):
1054        self.request = request
1055        self.worker = worker
1056        self.mux_error_occurred = False
1057        self.mux_error_reason = ''
1058
1059
1060class _MuxHandler(object):
1061    """Multiplexing handler. When a handler starts, it launches three
1062    threads; the reader thread, the writer thread, and a worker thread.
1063
1064    The reader thread reads data from the physical stream, i.e., the
1065    ws_stream object of the underlying websocket connection. The reader
1066    thread interprets multiplexed frames and dispatches them to logical
1067    channels. Methods of this class are mostly called by the reader thread.
1068
1069    The writer thread sends multiplexed frames which are created by
1070    logical channels via the physical connection.
1071
1072    The worker thread launched at the starting point handles the
1073    "Implicitly Opened Connection". If multiplexing handler receives
1074    an AddChannelRequest and accepts it, the handler will launch a new worker
1075    thread and dispatch the request to it.
1076    """
1077
1078    def __init__(self, request, dispatcher):
1079        """Constructs an instance.
1080
1081        Args:
1082            request: mod_python request of the physical connection.
1083            dispatcher: Dispatcher instance (dispatch.Dispatcher).
1084        """
1085
1086        self.original_request = request
1087        self.dispatcher = dispatcher
1088        self.physical_connection = request.connection
1089        self.physical_stream = request.ws_stream
1090        self._logger = util.get_class_logger(self)
1091        self._logical_channels = {}
1092        self._logical_channels_condition = threading.Condition()
1093        # Holds client's initial quota
1094        self._channel_slots = collections.deque()
1095        self._worker_done_notify_received = False
1096        self._reader = None
1097        self._writer = None
1098
1099    def start(self):
1100        """Starts the handler.
1101
1102        Raises:
1103            MuxUnexpectedException: when the handler already started, or when
1104                opening handshake of the default channel fails.
1105        """
1106
1107        if self._reader or self._writer:
1108            raise MuxUnexpectedException('MuxHandler already started')
1109
1110        self._reader = _PhysicalConnectionReader(self)
1111        self._writer = _PhysicalConnectionWriter(self)
1112        self._reader.start()
1113        self._writer.start()
1114
1115        # Create "Implicitly Opened Connection".
1116        logical_connection = _LogicalConnection(self, _DEFAULT_CHANNEL_ID)
1117        headers_in = copy.copy(self.original_request.headers_in)
1118        # TODO(bashi): Support extensions
1119        headers_in['Sec-WebSocket-Extensions'] = ''
1120        logical_request = _LogicalRequest(_DEFAULT_CHANNEL_ID,
1121                                          self.original_request.method,
1122                                          self.original_request.uri,
1123                                          headers_in,
1124                                          logical_connection)
1125        # Client's send quota for the implicitly opened connection is zero,
1126        # but we will send FlowControl later so set the initial quota to
1127        # _INITIAL_QUOTA_FOR_CLIENT.
1128        self._channel_slots.append(_INITIAL_QUOTA_FOR_CLIENT)
1129        if not self._do_handshake_for_logical_request(
1130            logical_request, send_quota=self.original_request.mux_quota):
1131            raise MuxUnexpectedException(
1132                'Failed handshake on the default channel id')
1133        self._add_logical_channel(logical_request)
1134
1135        # Send FlowControl for the implicitly opened connection.
1136        frame_data = _create_flow_control(_DEFAULT_CHANNEL_ID,
1137                                          _INITIAL_QUOTA_FOR_CLIENT)
1138        logical_request.connection.write_control_data(frame_data)
1139
1140    def add_channel_slots(self, slots, send_quota):
1141        """Adds channel slots.
1142
1143        Args:
1144            slots: number of slots to be added.
1145            send_quota: initial send quota for slots.
1146        """
1147
1148        self._channel_slots.extend([send_quota] * slots)
1149        # Send NewChannelSlot to client.
1150        frame_data = _create_new_channel_slot(slots, send_quota)
1151        self.send_control_data(frame_data)
1152
1153    def wait_until_done(self, timeout=None):
1154        """Waits until all workers are done. Returns False when timeout has
1155        occurred. Returns True on success.
1156
1157        Args:
1158            timeout: timeout in sec.
1159        """
1160
1161        self._logical_channels_condition.acquire()
1162        try:
1163            while len(self._logical_channels) > 0:
1164                self._logger.debug('Waiting workers(%d)...' %
1165                                   len(self._logical_channels))
1166                self._worker_done_notify_received = False
1167                self._logical_channels_condition.wait(timeout)
1168                if not self._worker_done_notify_received:
1169                    self._logger.debug('Waiting worker(s) timed out')
1170                    return False
1171
1172        finally:
1173            self._logical_channels_condition.release()
1174
1175        # Flush pending outgoing data
1176        self._writer.stop()
1177        self._writer.join()
1178
1179        return True
1180
1181    def notify_write_done(self, channel_id):
1182        """Called by the writer thread when a write operation has done.
1183
1184        Args:
1185            channel_id: objective channel id.
1186        """
1187
1188        try:
1189            self._logical_channels_condition.acquire()
1190            if channel_id in self._logical_channels:
1191                channel_data = self._logical_channels[channel_id]
1192                channel_data.request.connection.notify_write_done()
1193            else:
1194                self._logger.debug('Seems that logical channel for %d has gone'
1195                                   % channel_id)
1196        finally:
1197            self._logical_channels_condition.release()
1198
1199    def send_control_data(self, data):
1200        """Sends data via the control channel.
1201
1202        Args:
1203            data: data to be sent.
1204        """
1205
1206        self._writer.put_outgoing_data(_OutgoingData(
1207                channel_id=_CONTROL_CHANNEL_ID, data=data))
1208
1209    def send_data(self, channel_id, data):
1210        """Sends data via given logical channel. This method is called by
1211        worker threads.
1212
1213        Args:
1214            channel_id: objective channel id.
1215            data: data to be sent.
1216        """
1217
1218        self._writer.put_outgoing_data(_OutgoingData(
1219                channel_id=channel_id, data=data))
1220
1221    def _send_drop_channel(self, channel_id, reason='', mux_error=False):
1222        frame_data = _create_drop_channel(channel_id, reason, mux_error)
1223        self._logger.debug(
1224            'Sending drop channel for channel id %d' % channel_id)
1225        self.send_control_data(frame_data)
1226
1227    def _send_error_add_channel_response(self, channel_id, status=None):
1228        if status is None:
1229            status = common.HTTP_STATUS_BAD_REQUEST
1230
1231        if status in _HTTP_BAD_RESPONSE_MESSAGES:
1232            message = _HTTP_BAD_RESPONSE_MESSAGES[status]
1233        else:
1234            self._logger.debug('Response message for %d is not found' % status)
1235            message = '???'
1236
1237        response = 'HTTP/1.1 %d %s\r\n\r\n' % (status, message)
1238        frame_data = _create_add_channel_response(channel_id,
1239                                                  encoded_handshake=response,
1240                                                  encoding=0, rejected=True)
1241        self.send_control_data(frame_data)
1242
1243    def _create_logical_request(self, block):
1244        if block.channel_id == _CONTROL_CHANNEL_ID:
1245            raise MuxUnexpectedException(
1246                'Received the control channel id (0) as objective channel '
1247                'id for AddChannel')
1248
1249        if block.encoding != 0:
1250            raise MuxNotImplementedException(
1251                'delta-encoding not supported yet')
1252        connection = _LogicalConnection(self, block.channel_id)
1253        command, path, version, headers = _parse_request_text(
1254                                              block.encoded_handshake)
1255        request = _LogicalRequest(block.channel_id, command, path,
1256                                  headers, connection)
1257
1258        return request
1259
1260    def _do_handshake_for_logical_request(self, request, send_quota=0):
1261        try:
1262            receive_quota = self._channel_slots.popleft()
1263        except IndexError:
1264            raise MuxUnexpectedException('No room in channel pool')
1265
1266        handshaker = _MuxHandshaker(request, self.dispatcher,
1267                                    send_quota, receive_quota)
1268        try:
1269            handshaker.do_handshake()
1270        except handshake.VersionException, e:
1271            self._logger.info('%s', e)
1272            self._send_error_add_channel_response(
1273                block.channel_id, status=common.HTTP_STATUS_BAD_REQUEST)
1274            return False
1275        except handshake.HandshakeException, e:
1276            self._logger.info('%s', e)
1277            self._send_error_add_channel_response(request.channel_id,
1278                                                  status=e.status)
1279            return False
1280        except handshake.AbortedByUserException, e:
1281            self._logger.info('%s', e)
1282            self._send_error_add_channel_response(request.channel_id)
1283            return False
1284
1285        return True
1286
1287    def _add_logical_channel(self, logical_request):
1288        try:
1289            self._logical_channels_condition.acquire()
1290            if logical_request.channel_id in self._logical_channels:
1291                raise MuxUnexpectedException('Channel id %d already exists' %
1292                                             logical_request.channel_id)
1293            worker = _Worker(self, logical_request)
1294            channel_data = _LogicalChannelData(logical_request, worker)
1295            self._logical_channels[logical_request.channel_id] = channel_data
1296            worker.start()
1297        finally:
1298            self._logical_channels_condition.release()
1299
1300    def _process_add_channel_request(self, block):
1301        try:
1302            logical_request = self._create_logical_request(block)
1303        except ValueError, e:
1304            self._logger.debug('Failed to create logical request: %r' % e)
1305            self._send_error_add_channel_response(
1306                block.channel_id, status=common.HTTP_STATUS_BAD_REQUEST)
1307            return
1308        if self._do_handshake_for_logical_request(logical_request):
1309            self._add_logical_channel(logical_request)
1310        else:
1311            self._send_error_add_channel_response(
1312                block.channel_id, status=common.HTTP_STATUS_BAD_REQUEST)
1313
1314    def _process_flow_control(self, block):
1315        try:
1316            self._logical_channels_condition.acquire()
1317            if not block.channel_id in self._logical_channels:
1318                return
1319            channel_data = self._logical_channels[block.channel_id]
1320            channel_data.request.ws_stream.replenish_send_quota(
1321                block.send_quota)
1322        finally:
1323            self._logical_channels_condition.release()
1324
1325    def _process_drop_channel(self, block):
1326        self._logger.debug('DropChannel received for %d: reason=%r' %
1327                           (block.channel_id, block.reason))
1328        try:
1329            self._logical_channels_condition.acquire()
1330            if not block.channel_id in self._logical_channels:
1331                return
1332            channel_data = self._logical_channels[block.channel_id]
1333            if not block.mux_error:
1334                channel_data.request.connection.set_read_state(
1335                    _LogicalConnection.STATE_TERMINATED)
1336            else:
1337                # TODO(bashi): What should we do?
1338                channel_data.request.connection.set_read_state(
1339                    _LogicalConnection.STATE_TERMINATED)
1340        finally:
1341            self._logical_channels_condition.release()
1342
1343    def _process_new_channel_slot(self, block):
1344        raise MuxUnexpectedException('Client should not send NewChannelSlot')
1345
1346    def _process_control_blocks(self, parser):
1347        for control_block in parser.read_control_blocks():
1348            opcode = control_block.opcode
1349            self._logger.debug('control block received, opcode: %d' % opcode)
1350            if opcode == _MUX_OPCODE_ADD_CHANNEL_REQUEST:
1351                self._process_add_channel_request(control_block)
1352            elif opcode == _MUX_OPCODE_FLOW_CONTROL:
1353                self._process_flow_control(control_block)
1354            elif opcode == _MUX_OPCODE_DROP_CHANNEL:
1355                self._process_drop_channel(control_block)
1356            elif opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT:
1357                self._process_new_channel_slot(control_block)
1358            else:
1359                raise InvalidMuxControlBlockException(
1360                    'Invalid opcode')
1361
1362    def _process_logical_frame(self, channel_id, parser):
1363        self._logger.debug('Received a frame. channel id=%d' % channel_id)
1364        try:
1365            self._logical_channels_condition.acquire()
1366            if not channel_id in self._logical_channels:
1367                raise MuxUnexpectedException(
1368                    'Channel id %d not found' % channel_id)
1369            channel_data = self._logical_channels[channel_id]
1370            fin, rsv1, rsv2, rsv3, opcode, payload = parser.read_inner_frame()
1371            if not channel_data.request.ws_stream.consume_receive_quota(
1372                len(payload)):
1373                # The client violates quota. Close logical channel.
1374                channel_data.mux_error_occurred = True
1375                channel_data.mux_error_reason = 'Quota violation'
1376                channel_data.request.connection.set_read_state(
1377                    _LogicalConnection.STATE_TERMINATED)
1378                return
1379            header = create_header(opcode, len(payload), fin, rsv1, rsv2, rsv3,
1380                                   mask=False)
1381            frame_data = header + payload
1382            channel_data.request.connection.append_frame_data(frame_data)
1383        finally:
1384            self._logical_channels_condition.release()
1385
1386    def dispatch_message(self, message):
1387        """Dispatches message. The reader thread calls this method.
1388
1389        Args:
1390            message: a message that contains encapsulated frame.
1391        Raises:
1392            InvalidMuxFrameException: if the message is invalid.
1393        """
1394
1395        parser = _MuxFramePayloadParser(message)
1396        channel_id = parser.read_channel_id()
1397        if channel_id == _CONTROL_CHANNEL_ID:
1398            self._process_control_blocks(parser)
1399        else:
1400            self._process_logical_frame(channel_id, parser)
1401
1402    def notify_worker_done(self, channel_id):
1403        """Called when a worker has finished.
1404
1405        Args:
1406            channel_id: channel id corresponded with the worker.
1407        """
1408
1409        self._logger.debug('Worker for channel id %d terminated' % channel_id)
1410        try:
1411            self._logical_channels_condition.acquire()
1412            if not channel_id in self._logical_channels:
1413                raise MuxUnexpectedException(
1414                    'Channel id %d not found' % channel_id)
1415            channel_data = self._logical_channels.pop(channel_id)
1416        finally:
1417            self._worker_done_notify_received = True
1418            self._logical_channels_condition.notify()
1419            self._logical_channels_condition.release()
1420
1421        if not channel_data.request.server_terminated:
1422            if channel_data.mux_error_occurred:
1423                self._send_drop_channel(
1424                    channel_id, reason=channel_data.mux_error_reason,
1425                    mux_error=True)
1426            else:
1427                self._send_drop_channel(channel_id)
1428
1429    def notify_reader_done(self):
1430        """This method is called by the reader thread when the reader has
1431        finished.
1432        """
1433
1434        # Terminate all logical connections
1435        self._logger.debug('termiating all logical connections...')
1436        self._logical_channels_condition.acquire()
1437        for channel_data in self._logical_channels.values():
1438            try:
1439                channel_data.request.connection.set_read_state(
1440                    _LogicalConnection.STATE_TERMINATED)
1441            except Exception:
1442                pass
1443        self._logical_channels_condition.release()
1444
1445
1446def use_mux(request):
1447    return hasattr(request, 'mux') and request.mux
1448
1449
1450def start(request, dispatcher):
1451    mux_handler = _MuxHandler(request, dispatcher)
1452    mux_handler.start()
1453
1454    mux_handler.add_channel_slots(_INITIAL_NUMBER_OF_CHANNEL_SLOTS,
1455                                  _INITIAL_QUOTA_FOR_CLIENT)
1456
1457    mux_handler.wait_until_done()
1458
1459
1460# vi:sts=4 sw=4 et
1461