10a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
20a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# A higher level module for using sockets (or Windows named pipes)
30a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
40a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# multiprocessing/connection.py
50a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
60a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Copyright (c) 2006-2008, R Oudkerk
70a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# All rights reserved.
80a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
90a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Redistribution and use in source and binary forms, with or without
100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# modification, are permitted provided that the following conditions
110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# are met:
120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 1. Redistributions of source code must retain the above copyright
140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#    notice, this list of conditions and the following disclaimer.
150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 2. Redistributions in binary form must reproduce the above copyright
160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#    notice, this list of conditions and the following disclaimer in the
170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#    documentation and/or other materials provided with the distribution.
180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 3. Neither the name of author nor the names of any contributors may be
190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#    used to endorse or promote products derived from this software
200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#    without specific prior written permission.
210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# SUCH DAMAGE.
330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao__all__ = [ 'Client', 'Listener', 'Pipe' ]
360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
370a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport os
380a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport sys
390a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport socket
400a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport errno
410a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport time
420a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport tempfile
430a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport itertools
440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
450a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport _multiprocessing
460a8c90248264a8b26970b4473770bcc3df8515fJosh Gaofrom multiprocessing import current_process, AuthenticationError
470a8c90248264a8b26970b4473770bcc3df8515fJosh Gaofrom multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
480a8c90248264a8b26970b4473770bcc3df8515fJosh Gaofrom multiprocessing.forking import duplicate, close
490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
550a8c90248264a8b26970b4473770bcc3df8515fJosh GaoBUFSIZE = 8192
560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# A very generous timeout when it comes to local connections...
570a8c90248264a8b26970b4473770bcc3df8515fJosh GaoCONNECTION_TIMEOUT = 20.
580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao_mmap_counter = itertools.count()
600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
610a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodefault_family = 'AF_INET'
620a8c90248264a8b26970b4473770bcc3df8515fJosh Gaofamilies = ['AF_INET']
630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
640a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoif hasattr(socket, 'AF_UNIX'):
650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    default_family = 'AF_UNIX'
660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    families += ['AF_UNIX']
670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
680a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoif sys.platform == 'win32':
690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    default_family = 'AF_PIPE'
700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    families += ['AF_PIPE']
710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
730a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef _init_timeout(timeout=CONNECTION_TIMEOUT):
740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    return time.time() + timeout
750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
760a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef _check_timeout(t):
770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    return time.time() > t
780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
830a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef arbitrary_address(family):
840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    '''
850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    Return an arbitrary free address for the given family
860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    '''
870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    if family == 'AF_INET':
880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return ('localhost', 0)
890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    elif family == 'AF_UNIX':
900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    elif family == 'AF_PIPE':
920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                               (os.getpid(), _mmap_counter.next()))
940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    else:
950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        raise ValueError('unrecognized family')
960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
980a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef address_type(address):
990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    '''
1000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    Return the types of the address
1010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
1030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    '''
1040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    if type(address) == tuple:
1050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return 'AF_INET'
1060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    elif type(address) is str and address.startswith('\\\\'):
1070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return 'AF_PIPE'
1080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    elif type(address) is str:
1090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return 'AF_UNIX'
1100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    else:
1110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        raise ValueError('address type of %r unrecognized' % address)
1120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
1140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Public functions
1150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
1160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1170a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass Listener(object):
1180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    '''
1190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    Returns a listener object.
1200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    This is a wrapper for a bound socket which is 'listening' for
1220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    connections, or for a Windows named pipe.
1230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    '''
1240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def __init__(self, address=None, family=None, backlog=1, authkey=None):
1250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        family = family or (address and address_type(address)) \
1260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                 or default_family
1270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        address = address or arbitrary_address(family)
1280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if family == 'AF_PIPE':
1300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._listener = PipeListener(address, backlog)
1310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        else:
1320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._listener = SocketListener(address, family, backlog)
1330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if authkey is not None and not isinstance(authkey, bytes):
1350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            raise TypeError, 'authkey should be a byte string'
1360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._authkey = authkey
1380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def accept(self):
1400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        '''
1410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        Accept a connection on the bound socket or named pipe of `self`.
1420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        Returns a `Connection` object.
1440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        '''
1450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        c = self._listener.accept()
1460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if self._authkey:
1470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            deliver_challenge(c, self._authkey)
1480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            answer_challenge(c, self._authkey)
1490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return c
1500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def close(self):
1520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        '''
1530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        Close the bound socket or named pipe of `self`.
1540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        '''
1550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return self._listener.close()
1560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    address = property(lambda self: self._listener._address)
1580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    last_accepted = property(lambda self: self._listener._last_accepted)
1590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1610a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef Client(address, family=None, authkey=None):
1620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    '''
1630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    Returns a connection to the address of a `Listener`
1640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    '''
1650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    family = family or address_type(address)
1660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    if family == 'AF_PIPE':
1670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        c = PipeClient(address)
1680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    else:
1690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        c = SocketClient(address)
1700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    if authkey is not None and not isinstance(authkey, bytes):
1720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        raise TypeError, 'authkey should be a byte string'
1730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    if authkey is not None:
1750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        answer_challenge(c, authkey)
1760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        deliver_challenge(c, authkey)
1770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    return c
1790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1810a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoif sys.platform != 'win32':
1820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def Pipe(duplex=True):
1840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        '''
1850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        Returns pair of connection objects at either end of a pipe
1860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        '''
1870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if duplex:
1880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            s1, s2 = socket.socketpair()
1890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            s1.setblocking(True)
1900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            s2.setblocking(True)
1910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
1920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
1930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            s1.close()
1940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            s2.close()
1950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        else:
1960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            fd1, fd2 = os.pipe()
1970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            c1 = _multiprocessing.Connection(fd1, writable=False)
1980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            c2 = _multiprocessing.Connection(fd2, readable=False)
1990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return c1, c2
2010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2020a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoelse:
2030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    from _multiprocessing import win32
2040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def Pipe(duplex=True):
2060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        '''
2070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        Returns pair of connection objects at either end of a pipe
2080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        '''
2090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        address = arbitrary_address('AF_PIPE')
2100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if duplex:
2110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            openmode = win32.PIPE_ACCESS_DUPLEX
2120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            access = win32.GENERIC_READ | win32.GENERIC_WRITE
2130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            obsize, ibsize = BUFSIZE, BUFSIZE
2140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        else:
2150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            openmode = win32.PIPE_ACCESS_INBOUND
2160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            access = win32.GENERIC_WRITE
2170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            obsize, ibsize = 0, BUFSIZE
2180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        h1 = win32.CreateNamedPipe(
2200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            address, openmode,
2210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
2220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            win32.PIPE_WAIT,
2230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
2240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            )
2250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        h2 = win32.CreateFile(
2260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
2270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            )
2280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        win32.SetNamedPipeHandleState(
2290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            h2, win32.PIPE_READMODE_MESSAGE, None, None
2300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            )
2310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        try:
2330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            win32.ConnectNamedPipe(h1, win32.NULL)
2340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        except WindowsError, e:
2350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            if e.args[0] != win32.ERROR_PIPE_CONNECTED:
2360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                raise
2370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        c1 = _multiprocessing.PipeConnection(h1, writable=duplex)
2390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        c2 = _multiprocessing.PipeConnection(h2, readable=duplex)
2400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return c1, c2
2420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
2440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Definitions for connections based on sockets
2450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
2460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2470a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass SocketListener(object):
2480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    '''
2490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    Representation of a socket which is bound to an address and listening
2500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    '''
2510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def __init__(self, address, family, backlog=1):
2520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._socket = socket.socket(getattr(socket, family))
2530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        try:
2540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
2550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._socket.setblocking(True)
2560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._socket.bind(address)
2570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._socket.listen(backlog)
2580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._address = self._socket.getsockname()
2590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        except socket.error:
2600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._socket.close()
2610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            raise
2620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._family = family
2630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._last_accepted = None
2640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if family == 'AF_UNIX':
2660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._unlink = Finalize(
2670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self, os.unlink, args=(address,), exitpriority=0
2680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                )
2690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        else:
2700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._unlink = None
2710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def accept(self):
2730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        s, self._last_accepted = self._socket.accept()
2740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        s.setblocking(True)
2750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        fd = duplicate(s.fileno())
2760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        conn = _multiprocessing.Connection(fd)
2770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        s.close()
2780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return conn
2790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def close(self):
2810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._socket.close()
2820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if self._unlink is not None:
2830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._unlink()
2840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2860a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef SocketClient(address):
2870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    '''
2880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    Return a connection object connected to the socket given by `address`
2890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    '''
2900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    family = address_type(address)
2910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    s = socket.socket( getattr(socket, family) )
2920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    s.setblocking(True)
2930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    t = _init_timeout()
2940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    while 1:
2960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        try:
2970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            s.connect(address)
2980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        except socket.error, e:
2990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            if e.args[0] != errno.ECONNREFUSED or _check_timeout(t):
3000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                debug('failed to connect to address %s', address)
3010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                raise
3020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            time.sleep(0.01)
3030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        else:
3040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            break
3050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    else:
3060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        raise
3070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    fd = duplicate(s.fileno())
3090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    conn = _multiprocessing.Connection(fd)
3100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    s.close()
3110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    return conn
3120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
3140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Definitions for connections based on named pipes
3150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
3160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3170a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoif sys.platform == 'win32':
3180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    class PipeListener(object):
3200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        '''
3210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        Representation of a named pipe
3220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        '''
3230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        def __init__(self, address, backlog=None):
3240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._address = address
3250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            handle = win32.CreateNamedPipe(
3260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                address, win32.PIPE_ACCESS_DUPLEX,
3270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
3280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                win32.PIPE_WAIT,
3290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
3300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                win32.NMPWAIT_WAIT_FOREVER, win32.NULL
3310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                )
3320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._handle_queue = [handle]
3330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._last_accepted = None
3340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            sub_debug('listener created with address=%r', self._address)
3360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.close = Finalize(
3380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self, PipeListener._finalize_pipe_listener,
3390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                args=(self._handle_queue, self._address), exitpriority=0
3400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                )
3410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        def accept(self):
3430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            newhandle = win32.CreateNamedPipe(
3440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self._address, win32.PIPE_ACCESS_DUPLEX,
3450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
3460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                win32.PIPE_WAIT,
3470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
3480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                win32.NMPWAIT_WAIT_FOREVER, win32.NULL
3490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                )
3500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._handle_queue.append(newhandle)
3510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            handle = self._handle_queue.pop(0)
3520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            try:
3530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                win32.ConnectNamedPipe(handle, win32.NULL)
3540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            except WindowsError, e:
3550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                # ERROR_NO_DATA can occur if a client has already connected,
3560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                # written data and then disconnected -- see Issue 14725.
3570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                if e.args[0] not in (win32.ERROR_PIPE_CONNECTED,
3580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                                     win32.ERROR_NO_DATA):
3590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    raise
3600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            return _multiprocessing.PipeConnection(handle)
3610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        @staticmethod
3630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        def _finalize_pipe_listener(queue, address):
3640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            sub_debug('closing listener with address=%r', address)
3650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            for handle in queue:
3660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                close(handle)
3670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def PipeClient(address):
3690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        '''
3700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        Return a connection object connected to the pipe given by `address`
3710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        '''
3720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        t = _init_timeout()
3730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        while 1:
3740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            try:
3750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                win32.WaitNamedPipe(address, 1000)
3760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                h = win32.CreateFile(
3770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    address, win32.GENERIC_READ | win32.GENERIC_WRITE,
3780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
3790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    )
3800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            except WindowsError, e:
3810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                if e.args[0] not in (win32.ERROR_SEM_TIMEOUT,
3820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                                     win32.ERROR_PIPE_BUSY) or _check_timeout(t):
3830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    raise
3840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            else:
3850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                break
3860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        else:
3870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            raise
3880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        win32.SetNamedPipeHandleState(
3900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            h, win32.PIPE_READMODE_MESSAGE, None, None
3910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            )
3920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return _multiprocessing.PipeConnection(h)
3930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
3950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Authentication stuff
3960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
3970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3980a8c90248264a8b26970b4473770bcc3df8515fJosh GaoMESSAGE_LENGTH = 20
3990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4000a8c90248264a8b26970b4473770bcc3df8515fJosh GaoCHALLENGE = b'#CHALLENGE#'
4010a8c90248264a8b26970b4473770bcc3df8515fJosh GaoWELCOME = b'#WELCOME#'
4020a8c90248264a8b26970b4473770bcc3df8515fJosh GaoFAILURE = b'#FAILURE#'
4030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4040a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef deliver_challenge(connection, authkey):
4050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    import hmac
4060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    assert isinstance(authkey, bytes)
4070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    message = os.urandom(MESSAGE_LENGTH)
4080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    connection.send_bytes(CHALLENGE + message)
4090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    digest = hmac.new(authkey, message).digest()
4100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    response = connection.recv_bytes(256)        # reject large message
4110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    if response == digest:
4120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        connection.send_bytes(WELCOME)
4130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    else:
4140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        connection.send_bytes(FAILURE)
4150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        raise AuthenticationError('digest received was wrong')
4160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4170a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef answer_challenge(connection, authkey):
4180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    import hmac
4190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    assert isinstance(authkey, bytes)
4200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    message = connection.recv_bytes(256)         # reject large message
4210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
4220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    message = message[len(CHALLENGE):]
4230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    digest = hmac.new(authkey, message).digest()
4240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    connection.send_bytes(digest)
4250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    response = connection.recv_bytes(256)        # reject large message
4260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    if response != WELCOME:
4270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        raise AuthenticationError('digest sent was rejected')
4280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
4300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Support for using xmlrpclib for serialization
4310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
4320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4330a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass ConnectionWrapper(object):
4340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def __init__(self, conn, dumps, loads):
4350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._conn = conn
4360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._dumps = dumps
4370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._loads = loads
4380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
4390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            obj = getattr(conn, attr)
4400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            setattr(self, attr, obj)
4410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def send(self, obj):
4420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        s = self._dumps(obj)
4430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._conn.send_bytes(s)
4440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def recv(self):
4450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        s = self._conn.recv_bytes()
4460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return self._loads(s)
4470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4480a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef _xml_dumps(obj):
4490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8')
4500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4510a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef _xml_loads(s):
4520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    (obj,), method = xmlrpclib.loads(s.decode('utf8'))
4530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    return obj
4540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4550a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass XmlListener(Listener):
4560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def accept(self):
4570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        global xmlrpclib
4580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        import xmlrpclib
4590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        obj = Listener.accept(self)
4600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
4610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4620a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef XmlClient(*args, **kwds):
4630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    global xmlrpclib
4640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    import xmlrpclib
4650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
466