184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk# 284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk# We use a background thread for sharing fds on Unix, and for sharing sockets on 384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk# Windows. 484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk# 584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk# A client which wants to pickle a resource registers it with the resource 684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk# sharer and gets an identifier in return. The unpickling process will connect 784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk# to the resource sharer, sends the identifier and its pid, and then receives 884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk# the resource. 984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk# 1084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk 1184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerkimport os 1284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerkimport signal 1384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerkimport socket 1484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerkimport sys 1584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerkimport threading 1684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk 1784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerkfrom . import process 185458647bb867770fc3d830a618cef6994fdfac4bDavin Pottsfrom .context import reduction 1984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerkfrom . import util 2084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk 2184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk__all__ = ['stop'] 2284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk 2384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk 2484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerkif sys.platform == 'win32': 2584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk __all__ += ['DupSocket'] 2684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk 2784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk class DupSocket(object): 2884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk '''Picklable wrapper for a socket.''' 2984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk def __init__(self, sock): 3084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk new_sock = sock.dup() 3184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk def send(conn, pid): 3284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk share = new_sock.share(pid) 3384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk conn.send_bytes(share) 3484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._id = _resource_sharer.register(send, new_sock.close) 3584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk 3684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk def detach(self): 3784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk '''Get the socket. This should only be called once.''' 3884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk with _resource_sharer.get_connection(self._id) as conn: 3984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk share = conn.recv_bytes() 4084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk return socket.fromshare(share) 4184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk 4284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerkelse: 4384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk __all__ += ['DupFd'] 4484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk 4584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk class DupFd(object): 4684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk '''Wrapper for fd which can be used at any time.''' 4784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk def __init__(self, fd): 4884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk new_fd = os.dup(fd) 4984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk def send(conn, pid): 5084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk reduction.send_handle(conn, new_fd, pid) 5184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk def close(): 5284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk os.close(new_fd) 5384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._id = _resource_sharer.register(send, close) 5484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk 5584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk def detach(self): 5684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk '''Get the fd. This should only be called once.''' 5784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk with _resource_sharer.get_connection(self._id) as conn: 5884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk return reduction.recv_handle(conn) 5984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk 6084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk 6184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerkclass _ResourceSharer(object): 6284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk '''Manager for resouces using background thread.''' 6384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk def __init__(self): 6484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._key = 0 6584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._cache = {} 6684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._old_locks = [] 6784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._lock = threading.Lock() 6884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._listener = None 6984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._address = None 7084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._thread = None 7184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk util.register_after_fork(self, _ResourceSharer._afterfork) 7284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk 7384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk def register(self, send, close): 7484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk '''Register resource, returning an identifier.''' 7584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk with self._lock: 7684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk if self._address is None: 7784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._start() 7884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._key += 1 7984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._cache[self._key] = (send, close) 8084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk return (self._address, self._key) 8184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk 8284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk @staticmethod 8384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk def get_connection(ident): 8484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk '''Return connection from which to receive identified resource.''' 8584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk from .connection import Client 8684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk address, key = ident 8784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk c = Client(address, authkey=process.current_process().authkey) 8884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk c.send((key, os.getpid())) 8984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk return c 9084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk 9184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk def stop(self, timeout=None): 9284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk '''Stop the background thread and clear registered resources.''' 9384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk from .connection import Client 9484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk with self._lock: 9584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk if self._address is not None: 9684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk c = Client(self._address, 9784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk authkey=process.current_process().authkey) 9884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk c.send(None) 9984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk c.close() 10084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._thread.join(timeout) 10184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk if self._thread.is_alive(): 10284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk util.sub_warning('_ResourceSharer thread did ' 10384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk 'not stop when asked') 10484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._listener.close() 10584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._thread = None 10684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._address = None 10784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._listener = None 10884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk for key, (send, close) in self._cache.items(): 10984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk close() 11084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._cache.clear() 11184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk 11284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk def _afterfork(self): 11384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk for key, (send, close) in self._cache.items(): 11484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk close() 11584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._cache.clear() 11684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk # If self._lock was locked at the time of the fork, it may be broken 11784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk # -- see issue 6721. Replace it without letting it be gc'ed. 11884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._old_locks.append(self._lock) 11984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._lock = threading.Lock() 12084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk if self._listener is not None: 12184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._listener.close() 12284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._listener = None 12384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._address = None 12484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._thread = None 12584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk 12684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk def _start(self): 12784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk from .connection import Listener 12884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk assert self._listener is None 12984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk util.debug('starting listener and thread for sending handles') 13084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._listener = Listener(authkey=process.current_process().authkey) 13184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._address = self._listener.address 13284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk t = threading.Thread(target=self._serve) 13384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk t.daemon = True 13484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk t.start() 13584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk self._thread = t 13684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk 13784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk def _serve(self): 13884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk if hasattr(signal, 'pthread_sigmask'): 13984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG)) 14084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk while 1: 14184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk try: 14284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk with self._listener.accept() as conn: 14384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk msg = conn.recv() 14484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk if msg is None: 14584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk break 14684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk key, destination_pid = msg 14784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk send, close = self._cache.pop(key) 14884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk try: 14984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk send(conn, destination_pid) 15084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk finally: 15184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk close() 15284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk except: 15384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk if not util.is_exiting(): 15484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk sys.excepthook(*sys.exc_info()) 15584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk 15684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk 15784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk_resource_sharer = _ResourceSharer() 15884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerkstop = _resource_sharer.stop 159