10a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 20a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# A higher level module for using sockets (or Windows named pipes) 30a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 40a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# multiprocessing/connection.py 50a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 60a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Copyright (c) 2006-2008, R Oudkerk 70a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# All rights reserved. 80a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 90a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Redistribution and use in source and binary forms, with or without 100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# modification, are permitted provided that the following conditions 110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# are met: 120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 1. Redistributions of source code must retain the above copyright 140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# notice, this list of conditions and the following disclaimer. 150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 2. Redistributions in binary form must reproduce the above copyright 160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# notice, this list of conditions and the following disclaimer in the 170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# documentation and/or other materials provided with the distribution. 180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 3. Neither the name of author nor the names of any contributors may be 190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# used to endorse or promote products derived from this software 200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# without specific prior written permission. 210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# SUCH DAMAGE. 330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao__all__ = [ 'Client', 'Listener', 'Pipe' ] 360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 370a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport os 380a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport sys 390a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport socket 400a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport errno 410a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport time 420a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport tempfile 430a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport itertools 440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 450a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport _multiprocessing 460a8c90248264a8b26970b4473770bcc3df8515fJosh Gaofrom multiprocessing import current_process, AuthenticationError 470a8c90248264a8b26970b4473770bcc3df8515fJosh Gaofrom multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug 480a8c90248264a8b26970b4473770bcc3df8515fJosh Gaofrom multiprocessing.forking import duplicate, close 490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 550a8c90248264a8b26970b4473770bcc3df8515fJosh GaoBUFSIZE = 8192 560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# A very generous timeout when it comes to local connections... 570a8c90248264a8b26970b4473770bcc3df8515fJosh GaoCONNECTION_TIMEOUT = 20. 580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao_mmap_counter = itertools.count() 600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 610a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodefault_family = 'AF_INET' 620a8c90248264a8b26970b4473770bcc3df8515fJosh Gaofamilies = ['AF_INET'] 630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 640a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoif hasattr(socket, 'AF_UNIX'): 650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao default_family = 'AF_UNIX' 660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao families += ['AF_UNIX'] 670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 680a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoif sys.platform == 'win32': 690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao default_family = 'AF_PIPE' 700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao families += ['AF_PIPE'] 710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 730a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef _init_timeout(timeout=CONNECTION_TIMEOUT): 740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return time.time() + timeout 750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 760a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef _check_timeout(t): 770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return time.time() > t 780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 830a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef arbitrary_address(family): 840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao Return an arbitrary free address for the given family 860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if family == 'AF_INET': 880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return ('localhost', 0) 890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao elif family == 'AF_UNIX': 900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return tempfile.mktemp(prefix='listener-', dir=get_temp_dir()) 910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao elif family == 'AF_PIPE': 920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' % 930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao (os.getpid(), _mmap_counter.next())) 940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao else: 950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao raise ValueError('unrecognized family') 960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 980a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef address_type(address): 990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 1000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao Return the types of the address 1010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE' 1030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 1040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if type(address) == tuple: 1050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return 'AF_INET' 1060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao elif type(address) is str and address.startswith('\\\\'): 1070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return 'AF_PIPE' 1080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao elif type(address) is str: 1090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return 'AF_UNIX' 1100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao else: 1110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao raise ValueError('address type of %r unrecognized' % address) 1120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 1140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Public functions 1150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 1160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1170a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass Listener(object): 1180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 1190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao Returns a listener object. 1200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao This is a wrapper for a bound socket which is 'listening' for 1220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao connections, or for a Windows named pipe. 1230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 1240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def __init__(self, address=None, family=None, backlog=1, authkey=None): 1250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao family = family or (address and address_type(address)) \ 1260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao or default_family 1270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao address = address or arbitrary_address(family) 1280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if family == 'AF_PIPE': 1300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._listener = PipeListener(address, backlog) 1310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao else: 1320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._listener = SocketListener(address, family, backlog) 1330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if authkey is not None and not isinstance(authkey, bytes): 1350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao raise TypeError, 'authkey should be a byte string' 1360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._authkey = authkey 1380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def accept(self): 1400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 1410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao Accept a connection on the bound socket or named pipe of `self`. 1420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao Returns a `Connection` object. 1440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 1450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao c = self._listener.accept() 1460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if self._authkey: 1470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao deliver_challenge(c, self._authkey) 1480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao answer_challenge(c, self._authkey) 1490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return c 1500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def close(self): 1520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 1530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao Close the bound socket or named pipe of `self`. 1540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 1550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return self._listener.close() 1560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao address = property(lambda self: self._listener._address) 1580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao last_accepted = property(lambda self: self._listener._last_accepted) 1590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1610a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef Client(address, family=None, authkey=None): 1620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 1630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao Returns a connection to the address of a `Listener` 1640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 1650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao family = family or address_type(address) 1660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if family == 'AF_PIPE': 1670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao c = PipeClient(address) 1680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao else: 1690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao c = SocketClient(address) 1700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if authkey is not None and not isinstance(authkey, bytes): 1720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao raise TypeError, 'authkey should be a byte string' 1730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if authkey is not None: 1750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao answer_challenge(c, authkey) 1760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao deliver_challenge(c, authkey) 1770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return c 1790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1810a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoif sys.platform != 'win32': 1820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def Pipe(duplex=True): 1840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 1850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao Returns pair of connection objects at either end of a pipe 1860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 1870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if duplex: 1880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao s1, s2 = socket.socketpair() 1890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao s1.setblocking(True) 1900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao s2.setblocking(True) 1910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao c1 = _multiprocessing.Connection(os.dup(s1.fileno())) 1920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao c2 = _multiprocessing.Connection(os.dup(s2.fileno())) 1930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao s1.close() 1940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao s2.close() 1950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao else: 1960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao fd1, fd2 = os.pipe() 1970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao c1 = _multiprocessing.Connection(fd1, writable=False) 1980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao c2 = _multiprocessing.Connection(fd2, readable=False) 1990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return c1, c2 2010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2020a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoelse: 2030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao from _multiprocessing import win32 2040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def Pipe(duplex=True): 2060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 2070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao Returns pair of connection objects at either end of a pipe 2080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 2090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao address = arbitrary_address('AF_PIPE') 2100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if duplex: 2110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao openmode = win32.PIPE_ACCESS_DUPLEX 2120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao access = win32.GENERIC_READ | win32.GENERIC_WRITE 2130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao obsize, ibsize = BUFSIZE, BUFSIZE 2140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao else: 2150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao openmode = win32.PIPE_ACCESS_INBOUND 2160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao access = win32.GENERIC_WRITE 2170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao obsize, ibsize = 0, BUFSIZE 2180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao h1 = win32.CreateNamedPipe( 2200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao address, openmode, 2210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | 2220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao win32.PIPE_WAIT, 2230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL 2240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ) 2250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao h2 = win32.CreateFile( 2260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL 2270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ) 2280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao win32.SetNamedPipeHandleState( 2290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao h2, win32.PIPE_READMODE_MESSAGE, None, None 2300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ) 2310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 2330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao win32.ConnectNamedPipe(h1, win32.NULL) 2340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao except WindowsError, e: 2350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if e.args[0] != win32.ERROR_PIPE_CONNECTED: 2360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao raise 2370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao c1 = _multiprocessing.PipeConnection(h1, writable=duplex) 2390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao c2 = _multiprocessing.PipeConnection(h2, readable=duplex) 2400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return c1, c2 2420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 2440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Definitions for connections based on sockets 2450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 2460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2470a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass SocketListener(object): 2480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 2490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao Representation of a socket which is bound to an address and listening 2500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 2510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def __init__(self, address, family, backlog=1): 2520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._socket = socket.socket(getattr(socket, family)) 2530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 2540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 2550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._socket.setblocking(True) 2560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._socket.bind(address) 2570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._socket.listen(backlog) 2580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._address = self._socket.getsockname() 2590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao except socket.error: 2600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._socket.close() 2610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao raise 2620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._family = family 2630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._last_accepted = None 2640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if family == 'AF_UNIX': 2660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._unlink = Finalize( 2670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self, os.unlink, args=(address,), exitpriority=0 2680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ) 2690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao else: 2700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._unlink = None 2710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def accept(self): 2730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao s, self._last_accepted = self._socket.accept() 2740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao s.setblocking(True) 2750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao fd = duplicate(s.fileno()) 2760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao conn = _multiprocessing.Connection(fd) 2770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao s.close() 2780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return conn 2790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def close(self): 2810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._socket.close() 2820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if self._unlink is not None: 2830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._unlink() 2840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2860a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef SocketClient(address): 2870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 2880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao Return a connection object connected to the socket given by `address` 2890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 2900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao family = address_type(address) 2910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao s = socket.socket( getattr(socket, family) ) 2920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao s.setblocking(True) 2930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao t = _init_timeout() 2940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao while 1: 2960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 2970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao s.connect(address) 2980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao except socket.error, e: 2990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if e.args[0] != errno.ECONNREFUSED or _check_timeout(t): 3000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('failed to connect to address %s', address) 3010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao raise 3020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao time.sleep(0.01) 3030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao else: 3040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao break 3050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao else: 3060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao raise 3070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao fd = duplicate(s.fileno()) 3090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao conn = _multiprocessing.Connection(fd) 3100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao s.close() 3110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return conn 3120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 3140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Definitions for connections based on named pipes 3150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 3160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3170a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoif sys.platform == 'win32': 3180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao class PipeListener(object): 3200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 3210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao Representation of a named pipe 3220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 3230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def __init__(self, address, backlog=None): 3240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._address = address 3250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao handle = win32.CreateNamedPipe( 3260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao address, win32.PIPE_ACCESS_DUPLEX, 3270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | 3280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao win32.PIPE_WAIT, 3290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, 3300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao win32.NMPWAIT_WAIT_FOREVER, win32.NULL 3310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ) 3320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._handle_queue = [handle] 3330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._last_accepted = None 3340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao sub_debug('listener created with address=%r', self._address) 3360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self.close = Finalize( 3380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self, PipeListener._finalize_pipe_listener, 3390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao args=(self._handle_queue, self._address), exitpriority=0 3400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ) 3410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def accept(self): 3430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao newhandle = win32.CreateNamedPipe( 3440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._address, win32.PIPE_ACCESS_DUPLEX, 3450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | 3460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao win32.PIPE_WAIT, 3470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, 3480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao win32.NMPWAIT_WAIT_FOREVER, win32.NULL 3490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ) 3500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._handle_queue.append(newhandle) 3510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao handle = self._handle_queue.pop(0) 3520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 3530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao win32.ConnectNamedPipe(handle, win32.NULL) 3540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao except WindowsError, e: 3550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # ERROR_NO_DATA can occur if a client has already connected, 3560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # written data and then disconnected -- see Issue 14725. 3570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if e.args[0] not in (win32.ERROR_PIPE_CONNECTED, 3580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao win32.ERROR_NO_DATA): 3590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao raise 3600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return _multiprocessing.PipeConnection(handle) 3610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao @staticmethod 3630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def _finalize_pipe_listener(queue, address): 3640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao sub_debug('closing listener with address=%r', address) 3650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao for handle in queue: 3660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao close(handle) 3670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def PipeClient(address): 3690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 3700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao Return a connection object connected to the pipe given by `address` 3710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 3720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao t = _init_timeout() 3730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao while 1: 3740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 3750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao win32.WaitNamedPipe(address, 1000) 3760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao h = win32.CreateFile( 3770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao address, win32.GENERIC_READ | win32.GENERIC_WRITE, 3780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL 3790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ) 3800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao except WindowsError, e: 3810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if e.args[0] not in (win32.ERROR_SEM_TIMEOUT, 3820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao win32.ERROR_PIPE_BUSY) or _check_timeout(t): 3830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao raise 3840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao else: 3850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao break 3860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao else: 3870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao raise 3880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao win32.SetNamedPipeHandleState( 3900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao h, win32.PIPE_READMODE_MESSAGE, None, None 3910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ) 3920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return _multiprocessing.PipeConnection(h) 3930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 3950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Authentication stuff 3960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 3970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3980a8c90248264a8b26970b4473770bcc3df8515fJosh GaoMESSAGE_LENGTH = 20 3990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 4000a8c90248264a8b26970b4473770bcc3df8515fJosh GaoCHALLENGE = b'#CHALLENGE#' 4010a8c90248264a8b26970b4473770bcc3df8515fJosh GaoWELCOME = b'#WELCOME#' 4020a8c90248264a8b26970b4473770bcc3df8515fJosh GaoFAILURE = b'#FAILURE#' 4030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 4040a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef deliver_challenge(connection, authkey): 4050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao import hmac 4060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao assert isinstance(authkey, bytes) 4070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao message = os.urandom(MESSAGE_LENGTH) 4080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao connection.send_bytes(CHALLENGE + message) 4090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao digest = hmac.new(authkey, message).digest() 4100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao response = connection.recv_bytes(256) # reject large message 4110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if response == digest: 4120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao connection.send_bytes(WELCOME) 4130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao else: 4140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao connection.send_bytes(FAILURE) 4150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao raise AuthenticationError('digest received was wrong') 4160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 4170a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef answer_challenge(connection, authkey): 4180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao import hmac 4190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao assert isinstance(authkey, bytes) 4200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao message = connection.recv_bytes(256) # reject large message 4210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message 4220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao message = message[len(CHALLENGE):] 4230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao digest = hmac.new(authkey, message).digest() 4240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao connection.send_bytes(digest) 4250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao response = connection.recv_bytes(256) # reject large message 4260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if response != WELCOME: 4270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao raise AuthenticationError('digest sent was rejected') 4280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 4290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 4300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Support for using xmlrpclib for serialization 4310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 4320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 4330a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass ConnectionWrapper(object): 4340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def __init__(self, conn, dumps, loads): 4350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._conn = conn 4360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._dumps = dumps 4370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._loads = loads 4380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'): 4390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao obj = getattr(conn, attr) 4400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao setattr(self, attr, obj) 4410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def send(self, obj): 4420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao s = self._dumps(obj) 4430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._conn.send_bytes(s) 4440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def recv(self): 4450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao s = self._conn.recv_bytes() 4460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return self._loads(s) 4470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 4480a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef _xml_dumps(obj): 4490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8') 4500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 4510a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef _xml_loads(s): 4520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao (obj,), method = xmlrpclib.loads(s.decode('utf8')) 4530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return obj 4540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 4550a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass XmlListener(Listener): 4560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def accept(self): 4570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao global xmlrpclib 4580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao import xmlrpclib 4590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao obj = Listener.accept(self) 4600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return ConnectionWrapper(obj, _xml_dumps, _xml_loads) 4610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 4620a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef XmlClient(*args, **kwds): 4630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao global xmlrpclib 4640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao import xmlrpclib 4650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads) 466