184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk#
284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk# We use a background thread for sharing fds on Unix, and for sharing sockets on
384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk# Windows.
484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk#
584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk# A client which wants to pickle a resource registers it with the resource
684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk# sharer and gets an identifier in return.  The unpickling process will connect
784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk# to the resource sharer, sends the identifier and its pid, and then receives
884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk# the resource.
984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk#
1084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk
1184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerkimport os
1284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerkimport signal
1384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerkimport socket
1484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerkimport sys
1584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerkimport threading
1684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk
1784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerkfrom . import process
185458647bb867770fc3d830a618cef6994fdfac4bDavin Pottsfrom .context import reduction
1984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerkfrom . import util
2084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk
2184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk__all__ = ['stop']
2284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk
2384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk
2484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerkif sys.platform == 'win32':
2584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk    __all__ += ['DupSocket']
2684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk
2784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk    class DupSocket(object):
2884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        '''Picklable wrapper for a socket.'''
2984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        def __init__(self, sock):
3084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk            new_sock = sock.dup()
3184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk            def send(conn, pid):
3284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                share = new_sock.share(pid)
3384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                conn.send_bytes(share)
3484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk            self._id = _resource_sharer.register(send, new_sock.close)
3584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk
3684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        def detach(self):
3784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk            '''Get the socket.  This should only be called once.'''
3884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk            with _resource_sharer.get_connection(self._id) as conn:
3984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                share = conn.recv_bytes()
4084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                return socket.fromshare(share)
4184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk
4284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerkelse:
4384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk    __all__ += ['DupFd']
4484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk
4584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk    class DupFd(object):
4684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        '''Wrapper for fd which can be used at any time.'''
4784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        def __init__(self, fd):
4884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk            new_fd = os.dup(fd)
4984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk            def send(conn, pid):
5084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                reduction.send_handle(conn, new_fd, pid)
5184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk            def close():
5284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                os.close(new_fd)
5384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk            self._id = _resource_sharer.register(send, close)
5484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk
5584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        def detach(self):
5684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk            '''Get the fd.  This should only be called once.'''
5784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk            with _resource_sharer.get_connection(self._id) as conn:
5884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                return reduction.recv_handle(conn)
5984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk
6084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk
6184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerkclass _ResourceSharer(object):
6284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk    '''Manager for resouces using background thread.'''
6384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk    def __init__(self):
6484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        self._key = 0
6584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        self._cache = {}
6684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        self._old_locks = []
6784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        self._lock = threading.Lock()
6884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        self._listener = None
6984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        self._address = None
7084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        self._thread = None
7184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        util.register_after_fork(self, _ResourceSharer._afterfork)
7284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk
7384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk    def register(self, send, close):
7484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        '''Register resource, returning an identifier.'''
7584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        with self._lock:
7684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk            if self._address is None:
7784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                self._start()
7884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk            self._key += 1
7984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk            self._cache[self._key] = (send, close)
8084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk            return (self._address, self._key)
8184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk
8284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk    @staticmethod
8384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk    def get_connection(ident):
8484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        '''Return connection from which to receive identified resource.'''
8584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        from .connection import Client
8684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        address, key = ident
8784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        c = Client(address, authkey=process.current_process().authkey)
8884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        c.send((key, os.getpid()))
8984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        return c
9084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk
9184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk    def stop(self, timeout=None):
9284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        '''Stop the background thread and clear registered resources.'''
9384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        from .connection import Client
9484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        with self._lock:
9584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk            if self._address is not None:
9684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                c = Client(self._address,
9784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                           authkey=process.current_process().authkey)
9884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                c.send(None)
9984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                c.close()
10084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                self._thread.join(timeout)
10184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                if self._thread.is_alive():
10284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                    util.sub_warning('_ResourceSharer thread did '
10384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                                     'not stop when asked')
10484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                self._listener.close()
10584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                self._thread = None
10684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                self._address = None
10784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                self._listener = None
10884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                for key, (send, close) in self._cache.items():
10984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                    close()
11084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                self._cache.clear()
11184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk
11284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk    def _afterfork(self):
11384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        for key, (send, close) in self._cache.items():
11484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk            close()
11584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        self._cache.clear()
11684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        # If self._lock was locked at the time of the fork, it may be broken
11784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        # -- see issue 6721.  Replace it without letting it be gc'ed.
11884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        self._old_locks.append(self._lock)
11984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        self._lock = threading.Lock()
12084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        if self._listener is not None:
12184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk            self._listener.close()
12284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        self._listener = None
12384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        self._address = None
12484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        self._thread = None
12584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk
12684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk    def _start(self):
12784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        from .connection import Listener
12884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        assert self._listener is None
12984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        util.debug('starting listener and thread for sending handles')
13084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        self._listener = Listener(authkey=process.current_process().authkey)
13184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        self._address = self._listener.address
13284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        t = threading.Thread(target=self._serve)
13384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        t.daemon = True
13484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        t.start()
13584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        self._thread = t
13684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk
13784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk    def _serve(self):
13884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        if hasattr(signal, 'pthread_sigmask'):
13984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk            signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
14084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk        while 1:
14184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk            try:
14284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                with self._listener.accept() as conn:
14384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                    msg = conn.recv()
14484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                    if msg is None:
14584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                        break
14684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                    key, destination_pid = msg
14784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                    send, close = self._cache.pop(key)
14884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                    try:
14984ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                        send(conn, destination_pid)
15084ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                    finally:
15184ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                        close()
15284ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk            except:
15384ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                if not util.is_exiting():
15484ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk                    sys.excepthook(*sys.exc_info())
15584ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk
15684ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk
15784ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerk_resource_sharer = _ResourceSharer()
15884ed9a68bd9a13252b376b21a9167dabae254325Richard Oudkerkstop = _resource_sharer.stop
159