1#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
7# Copyright (c) 2006-2008, R Oudkerk
8# Licensed to PSF under a Contributor Agreement.
9#
10
11__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
12
13#
14# Imports
15#
16
17import sys
18import threading
19import array
20import queue
21
22from time import time as _time
23from traceback import format_exc
24
25from . import connection
26from .context import reduction, get_spawning_popen
27from . import pool
28from . import process
29from . import util
30from . import get_context
31
32#
33# Register some things for pickling
34#
35
36def reduce_array(a):
37    return array.array, (a.typecode, a.tobytes())
38reduction.register(array.array, reduce_array)
39
40view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
41if view_types[0] is not list:       # only needed in Py3.0
42    def rebuild_as_list(obj):
43        return list, (list(obj),)
44    for view_type in view_types:
45        reduction.register(view_type, rebuild_as_list)
46
47#
48# Type for identifying shared objects
49#
50
51class Token(object):
52    '''
53    Type to uniquely indentify a shared object
54    '''
55    __slots__ = ('typeid', 'address', 'id')
56
57    def __init__(self, typeid, address, id):
58        (self.typeid, self.address, self.id) = (typeid, address, id)
59
60    def __getstate__(self):
61        return (self.typeid, self.address, self.id)
62
63    def __setstate__(self, state):
64        (self.typeid, self.address, self.id) = state
65
66    def __repr__(self):
67        return '%s(typeid=%r, address=%r, id=%r)' % \
68               (self.__class__.__name__, self.typeid, self.address, self.id)
69
70#
71# Function for communication with a manager's server process
72#
73
74def dispatch(c, id, methodname, args=(), kwds={}):
75    '''
76    Send a message to manager using connection `c` and return response
77    '''
78    c.send((id, methodname, args, kwds))
79    kind, result = c.recv()
80    if kind == '#RETURN':
81        return result
82    raise convert_to_error(kind, result)
83
84def convert_to_error(kind, result):
85    if kind == '#ERROR':
86        return result
87    elif kind == '#TRACEBACK':
88        assert type(result) is str
89        return  RemoteError(result)
90    elif kind == '#UNSERIALIZABLE':
91        assert type(result) is str
92        return RemoteError('Unserializable message: %s\n' % result)
93    else:
94        return ValueError('Unrecognized message type')
95
96class RemoteError(Exception):
97    def __str__(self):
98        return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
99
100#
101# Functions for finding the method names of an object
102#
103
104def all_methods(obj):
105    '''
106    Return a list of names of methods of `obj`
107    '''
108    temp = []
109    for name in dir(obj):
110        func = getattr(obj, name)
111        if callable(func):
112            temp.append(name)
113    return temp
114
115def public_methods(obj):
116    '''
117    Return a list of names of methods of `obj` which do not start with '_'
118    '''
119    return [name for name in all_methods(obj) if name[0] != '_']
120
121#
122# Server which is run in a process controlled by a manager
123#
124
125class Server(object):
126    '''
127    Server class which runs in a process controlled by a manager object
128    '''
129    public = ['shutdown', 'create', 'accept_connection', 'get_methods',
130              'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
131
132    def __init__(self, registry, address, authkey, serializer):
133        assert isinstance(authkey, bytes)
134        self.registry = registry
135        self.authkey = process.AuthenticationString(authkey)
136        Listener, Client = listener_client[serializer]
137
138        # do authentication later
139        self.listener = Listener(address=address, backlog=16)
140        self.address = self.listener.address
141
142        self.id_to_obj = {'0': (None, ())}
143        self.id_to_refcount = {}
144        self.id_to_local_proxy_obj = {}
145        self.mutex = threading.Lock()
146
147    def serve_forever(self):
148        '''
149        Run the server forever
150        '''
151        self.stop_event = threading.Event()
152        process.current_process()._manager_server = self
153        try:
154            accepter = threading.Thread(target=self.accepter)
155            accepter.daemon = True
156            accepter.start()
157            try:
158                while not self.stop_event.is_set():
159                    self.stop_event.wait(1)
160            except (KeyboardInterrupt, SystemExit):
161                pass
162        finally:
163            if sys.stdout != sys.__stdout__:
164                util.debug('resetting stdout, stderr')
165                sys.stdout = sys.__stdout__
166                sys.stderr = sys.__stderr__
167            sys.exit(0)
168
169    def accepter(self):
170        while True:
171            try:
172                c = self.listener.accept()
173            except OSError:
174                continue
175            t = threading.Thread(target=self.handle_request, args=(c,))
176            t.daemon = True
177            t.start()
178
179    def handle_request(self, c):
180        '''
181        Handle a new connection
182        '''
183        funcname = result = request = None
184        try:
185            connection.deliver_challenge(c, self.authkey)
186            connection.answer_challenge(c, self.authkey)
187            request = c.recv()
188            ignore, funcname, args, kwds = request
189            assert funcname in self.public, '%r unrecognized' % funcname
190            func = getattr(self, funcname)
191        except Exception:
192            msg = ('#TRACEBACK', format_exc())
193        else:
194            try:
195                result = func(c, *args, **kwds)
196            except Exception:
197                msg = ('#TRACEBACK', format_exc())
198            else:
199                msg = ('#RETURN', result)
200        try:
201            c.send(msg)
202        except Exception as e:
203            try:
204                c.send(('#TRACEBACK', format_exc()))
205            except Exception:
206                pass
207            util.info('Failure to send message: %r', msg)
208            util.info(' ... request was %r', request)
209            util.info(' ... exception was %r', e)
210
211        c.close()
212
213    def serve_client(self, conn):
214        '''
215        Handle requests from the proxies in a particular process/thread
216        '''
217        util.debug('starting server thread to service %r',
218                   threading.current_thread().name)
219
220        recv = conn.recv
221        send = conn.send
222        id_to_obj = self.id_to_obj
223
224        while not self.stop_event.is_set():
225
226            try:
227                methodname = obj = None
228                request = recv()
229                ident, methodname, args, kwds = request
230                try:
231                    obj, exposed, gettypeid = id_to_obj[ident]
232                except KeyError as ke:
233                    try:
234                        obj, exposed, gettypeid = \
235                            self.id_to_local_proxy_obj[ident]
236                    except KeyError as second_ke:
237                        raise ke
238
239                if methodname not in exposed:
240                    raise AttributeError(
241                        'method %r of %r object is not in exposed=%r' %
242                        (methodname, type(obj), exposed)
243                        )
244
245                function = getattr(obj, methodname)
246
247                try:
248                    res = function(*args, **kwds)
249                except Exception as e:
250                    msg = ('#ERROR', e)
251                else:
252                    typeid = gettypeid and gettypeid.get(methodname, None)
253                    if typeid:
254                        rident, rexposed = self.create(conn, typeid, res)
255                        token = Token(typeid, self.address, rident)
256                        msg = ('#PROXY', (rexposed, token))
257                    else:
258                        msg = ('#RETURN', res)
259
260            except AttributeError:
261                if methodname is None:
262                    msg = ('#TRACEBACK', format_exc())
263                else:
264                    try:
265                        fallback_func = self.fallback_mapping[methodname]
266                        result = fallback_func(
267                            self, conn, ident, obj, *args, **kwds
268                            )
269                        msg = ('#RETURN', result)
270                    except Exception:
271                        msg = ('#TRACEBACK', format_exc())
272
273            except EOFError:
274                util.debug('got EOF -- exiting thread serving %r',
275                           threading.current_thread().name)
276                sys.exit(0)
277
278            except Exception:
279                msg = ('#TRACEBACK', format_exc())
280
281            try:
282                try:
283                    send(msg)
284                except Exception as e:
285                    send(('#UNSERIALIZABLE', format_exc()))
286            except Exception as e:
287                util.info('exception in thread serving %r',
288                        threading.current_thread().name)
289                util.info(' ... message was %r', msg)
290                util.info(' ... exception was %r', e)
291                conn.close()
292                sys.exit(1)
293
294    def fallback_getvalue(self, conn, ident, obj):
295        return obj
296
297    def fallback_str(self, conn, ident, obj):
298        return str(obj)
299
300    def fallback_repr(self, conn, ident, obj):
301        return repr(obj)
302
303    fallback_mapping = {
304        '__str__':fallback_str,
305        '__repr__':fallback_repr,
306        '#GETVALUE':fallback_getvalue
307        }
308
309    def dummy(self, c):
310        pass
311
312    def debug_info(self, c):
313        '''
314        Return some info --- useful to spot problems with refcounting
315        '''
316        with self.mutex:
317            result = []
318            keys = list(self.id_to_refcount.keys())
319            keys.sort()
320            for ident in keys:
321                if ident != '0':
322                    result.append('  %s:       refcount=%s\n    %s' %
323                                  (ident, self.id_to_refcount[ident],
324                                   str(self.id_to_obj[ident][0])[:75]))
325            return '\n'.join(result)
326
327    def number_of_objects(self, c):
328        '''
329        Number of shared objects
330        '''
331        # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
332        return len(self.id_to_refcount)
333
334    def shutdown(self, c):
335        '''
336        Shutdown this process
337        '''
338        try:
339            util.debug('manager received shutdown message')
340            c.send(('#RETURN', None))
341        except:
342            import traceback
343            traceback.print_exc()
344        finally:
345            self.stop_event.set()
346
347    def create(self, c, typeid, *args, **kwds):
348        '''
349        Create a new shared object and return its id
350        '''
351        with self.mutex:
352            callable, exposed, method_to_typeid, proxytype = \
353                      self.registry[typeid]
354
355            if callable is None:
356                assert len(args) == 1 and not kwds
357                obj = args[0]
358            else:
359                obj = callable(*args, **kwds)
360
361            if exposed is None:
362                exposed = public_methods(obj)
363            if method_to_typeid is not None:
364                assert type(method_to_typeid) is dict
365                exposed = list(exposed) + list(method_to_typeid)
366
367            ident = '%x' % id(obj)  # convert to string because xmlrpclib
368                                    # only has 32 bit signed integers
369            util.debug('%r callable returned object with id %r', typeid, ident)
370
371            self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
372            if ident not in self.id_to_refcount:
373                self.id_to_refcount[ident] = 0
374
375        self.incref(c, ident)
376        return ident, tuple(exposed)
377
378    def get_methods(self, c, token):
379        '''
380        Return the methods of the shared object indicated by token
381        '''
382        return tuple(self.id_to_obj[token.id][1])
383
384    def accept_connection(self, c, name):
385        '''
386        Spawn a new thread to serve this connection
387        '''
388        threading.current_thread().name = name
389        c.send(('#RETURN', None))
390        self.serve_client(c)
391
392    def incref(self, c, ident):
393        with self.mutex:
394            try:
395                self.id_to_refcount[ident] += 1
396            except KeyError as ke:
397                # If no external references exist but an internal (to the
398                # manager) still does and a new external reference is created
399                # from it, restore the manager's tracking of it from the
400                # previously stashed internal ref.
401                if ident in self.id_to_local_proxy_obj:
402                    self.id_to_refcount[ident] = 1
403                    self.id_to_obj[ident] = \
404                        self.id_to_local_proxy_obj[ident]
405                    obj, exposed, gettypeid = self.id_to_obj[ident]
406                    util.debug('Server re-enabled tracking & INCREF %r', ident)
407                else:
408                    raise ke
409
410    def decref(self, c, ident):
411        if ident not in self.id_to_refcount and \
412            ident in self.id_to_local_proxy_obj:
413            util.debug('Server DECREF skipping %r', ident)
414            return
415
416        with self.mutex:
417            assert self.id_to_refcount[ident] >= 1
418            self.id_to_refcount[ident] -= 1
419            if self.id_to_refcount[ident] == 0:
420                del self.id_to_refcount[ident]
421
422        if ident not in self.id_to_refcount:
423            # Two-step process in case the object turns out to contain other
424            # proxy objects (e.g. a managed list of managed lists).
425            # Otherwise, deleting self.id_to_obj[ident] would trigger the
426            # deleting of the stored value (another managed object) which would
427            # in turn attempt to acquire the mutex that is already held here.
428            self.id_to_obj[ident] = (None, (), None)  # thread-safe
429            util.debug('disposing of obj with id %r', ident)
430            with self.mutex:
431                del self.id_to_obj[ident]
432
433
434#
435# Class to represent state of a manager
436#
437
438class State(object):
439    __slots__ = ['value']
440    INITIAL = 0
441    STARTED = 1
442    SHUTDOWN = 2
443
444#
445# Mapping from serializer name to Listener and Client types
446#
447
448listener_client = {
449    'pickle' : (connection.Listener, connection.Client),
450    'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
451    }
452
453#
454# Definition of BaseManager
455#
456
457class BaseManager(object):
458    '''
459    Base class for managers
460    '''
461    _registry = {}
462    _Server = Server
463
464    def __init__(self, address=None, authkey=None, serializer='pickle',
465                 ctx=None):
466        if authkey is None:
467            authkey = process.current_process().authkey
468        self._address = address     # XXX not final address if eg ('', 0)
469        self._authkey = process.AuthenticationString(authkey)
470        self._state = State()
471        self._state.value = State.INITIAL
472        self._serializer = serializer
473        self._Listener, self._Client = listener_client[serializer]
474        self._ctx = ctx or get_context()
475
476    def get_server(self):
477        '''
478        Return server object with serve_forever() method and address attribute
479        '''
480        assert self._state.value == State.INITIAL
481        return Server(self._registry, self._address,
482                      self._authkey, self._serializer)
483
484    def connect(self):
485        '''
486        Connect manager object to the server process
487        '''
488        Listener, Client = listener_client[self._serializer]
489        conn = Client(self._address, authkey=self._authkey)
490        dispatch(conn, None, 'dummy')
491        self._state.value = State.STARTED
492
493    def start(self, initializer=None, initargs=()):
494        '''
495        Spawn a server process for this manager object
496        '''
497        assert self._state.value == State.INITIAL
498
499        if initializer is not None and not callable(initializer):
500            raise TypeError('initializer must be a callable')
501
502        # pipe over which we will retrieve address of server
503        reader, writer = connection.Pipe(duplex=False)
504
505        # spawn process which runs a server
506        self._process = self._ctx.Process(
507            target=type(self)._run_server,
508            args=(self._registry, self._address, self._authkey,
509                  self._serializer, writer, initializer, initargs),
510            )
511        ident = ':'.join(str(i) for i in self._process._identity)
512        self._process.name = type(self).__name__  + '-' + ident
513        self._process.start()
514
515        # get address of server
516        writer.close()
517        self._address = reader.recv()
518        reader.close()
519
520        # register a finalizer
521        self._state.value = State.STARTED
522        self.shutdown = util.Finalize(
523            self, type(self)._finalize_manager,
524            args=(self._process, self._address, self._authkey,
525                  self._state, self._Client),
526            exitpriority=0
527            )
528
529    @classmethod
530    def _run_server(cls, registry, address, authkey, serializer, writer,
531                    initializer=None, initargs=()):
532        '''
533        Create a server, report its address and run it
534        '''
535        if initializer is not None:
536            initializer(*initargs)
537
538        # create server
539        server = cls._Server(registry, address, authkey, serializer)
540
541        # inform parent process of the server's address
542        writer.send(server.address)
543        writer.close()
544
545        # run the manager
546        util.info('manager serving at %r', server.address)
547        server.serve_forever()
548
549    def _create(self, typeid, *args, **kwds):
550        '''
551        Create a new shared object; return the token and exposed tuple
552        '''
553        assert self._state.value == State.STARTED, 'server not yet started'
554        conn = self._Client(self._address, authkey=self._authkey)
555        try:
556            id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
557        finally:
558            conn.close()
559        return Token(typeid, self._address, id), exposed
560
561    def join(self, timeout=None):
562        '''
563        Join the manager process (if it has been spawned)
564        '''
565        if self._process is not None:
566            self._process.join(timeout)
567            if not self._process.is_alive():
568                self._process = None
569
570    def _debug_info(self):
571        '''
572        Return some info about the servers shared objects and connections
573        '''
574        conn = self._Client(self._address, authkey=self._authkey)
575        try:
576            return dispatch(conn, None, 'debug_info')
577        finally:
578            conn.close()
579
580    def _number_of_objects(self):
581        '''
582        Return the number of shared objects
583        '''
584        conn = self._Client(self._address, authkey=self._authkey)
585        try:
586            return dispatch(conn, None, 'number_of_objects')
587        finally:
588            conn.close()
589
590    def __enter__(self):
591        if self._state.value == State.INITIAL:
592            self.start()
593        assert self._state.value == State.STARTED
594        return self
595
596    def __exit__(self, exc_type, exc_val, exc_tb):
597        self.shutdown()
598
599    @staticmethod
600    def _finalize_manager(process, address, authkey, state, _Client):
601        '''
602        Shutdown the manager process; will be registered as a finalizer
603        '''
604        if process.is_alive():
605            util.info('sending shutdown message to manager')
606            try:
607                conn = _Client(address, authkey=authkey)
608                try:
609                    dispatch(conn, None, 'shutdown')
610                finally:
611                    conn.close()
612            except Exception:
613                pass
614
615            process.join(timeout=1.0)
616            if process.is_alive():
617                util.info('manager still alive')
618                if hasattr(process, 'terminate'):
619                    util.info('trying to `terminate()` manager process')
620                    process.terminate()
621                    process.join(timeout=0.1)
622                    if process.is_alive():
623                        util.info('manager still alive after terminate')
624
625        state.value = State.SHUTDOWN
626        try:
627            del BaseProxy._address_to_local[address]
628        except KeyError:
629            pass
630
631    address = property(lambda self: self._address)
632
633    @classmethod
634    def register(cls, typeid, callable=None, proxytype=None, exposed=None,
635                 method_to_typeid=None, create_method=True):
636        '''
637        Register a typeid with the manager type
638        '''
639        if '_registry' not in cls.__dict__:
640            cls._registry = cls._registry.copy()
641
642        if proxytype is None:
643            proxytype = AutoProxy
644
645        exposed = exposed or getattr(proxytype, '_exposed_', None)
646
647        method_to_typeid = method_to_typeid or \
648                           getattr(proxytype, '_method_to_typeid_', None)
649
650        if method_to_typeid:
651            for key, value in list(method_to_typeid.items()):
652                assert type(key) is str, '%r is not a string' % key
653                assert type(value) is str, '%r is not a string' % value
654
655        cls._registry[typeid] = (
656            callable, exposed, method_to_typeid, proxytype
657            )
658
659        if create_method:
660            def temp(self, *args, **kwds):
661                util.debug('requesting creation of a shared %r object', typeid)
662                token, exp = self._create(typeid, *args, **kwds)
663                proxy = proxytype(
664                    token, self._serializer, manager=self,
665                    authkey=self._authkey, exposed=exp
666                    )
667                conn = self._Client(token.address, authkey=self._authkey)
668                dispatch(conn, None, 'decref', (token.id,))
669                return proxy
670            temp.__name__ = typeid
671            setattr(cls, typeid, temp)
672
673#
674# Subclass of set which get cleared after a fork
675#
676
677class ProcessLocalSet(set):
678    def __init__(self):
679        util.register_after_fork(self, lambda obj: obj.clear())
680    def __reduce__(self):
681        return type(self), ()
682
683#
684# Definition of BaseProxy
685#
686
687class BaseProxy(object):
688    '''
689    A base for proxies of shared objects
690    '''
691    _address_to_local = {}
692    _mutex = util.ForkAwareThreadLock()
693
694    def __init__(self, token, serializer, manager=None,
695                 authkey=None, exposed=None, incref=True, manager_owned=False):
696        with BaseProxy._mutex:
697            tls_idset = BaseProxy._address_to_local.get(token.address, None)
698            if tls_idset is None:
699                tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
700                BaseProxy._address_to_local[token.address] = tls_idset
701
702        # self._tls is used to record the connection used by this
703        # thread to communicate with the manager at token.address
704        self._tls = tls_idset[0]
705
706        # self._idset is used to record the identities of all shared
707        # objects for which the current process owns references and
708        # which are in the manager at token.address
709        self._idset = tls_idset[1]
710
711        self._token = token
712        self._id = self._token.id
713        self._manager = manager
714        self._serializer = serializer
715        self._Client = listener_client[serializer][1]
716
717        # Should be set to True only when a proxy object is being created
718        # on the manager server; primary use case: nested proxy objects.
719        # RebuildProxy detects when a proxy is being created on the manager
720        # and sets this value appropriately.
721        self._owned_by_manager = manager_owned
722
723        if authkey is not None:
724            self._authkey = process.AuthenticationString(authkey)
725        elif self._manager is not None:
726            self._authkey = self._manager._authkey
727        else:
728            self._authkey = process.current_process().authkey
729
730        if incref:
731            self._incref()
732
733        util.register_after_fork(self, BaseProxy._after_fork)
734
735    def _connect(self):
736        util.debug('making connection to manager')
737        name = process.current_process().name
738        if threading.current_thread().name != 'MainThread':
739            name += '|' + threading.current_thread().name
740        conn = self._Client(self._token.address, authkey=self._authkey)
741        dispatch(conn, None, 'accept_connection', (name,))
742        self._tls.connection = conn
743
744    def _callmethod(self, methodname, args=(), kwds={}):
745        '''
746        Try to call a method of the referrent and return a copy of the result
747        '''
748        try:
749            conn = self._tls.connection
750        except AttributeError:
751            util.debug('thread %r does not own a connection',
752                       threading.current_thread().name)
753            self._connect()
754            conn = self._tls.connection
755
756        conn.send((self._id, methodname, args, kwds))
757        kind, result = conn.recv()
758
759        if kind == '#RETURN':
760            return result
761        elif kind == '#PROXY':
762            exposed, token = result
763            proxytype = self._manager._registry[token.typeid][-1]
764            token.address = self._token.address
765            proxy = proxytype(
766                token, self._serializer, manager=self._manager,
767                authkey=self._authkey, exposed=exposed
768                )
769            conn = self._Client(token.address, authkey=self._authkey)
770            dispatch(conn, None, 'decref', (token.id,))
771            return proxy
772        raise convert_to_error(kind, result)
773
774    def _getvalue(self):
775        '''
776        Get a copy of the value of the referent
777        '''
778        return self._callmethod('#GETVALUE')
779
780    def _incref(self):
781        if self._owned_by_manager:
782            util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
783            return
784
785        conn = self._Client(self._token.address, authkey=self._authkey)
786        dispatch(conn, None, 'incref', (self._id,))
787        util.debug('INCREF %r', self._token.id)
788
789        self._idset.add(self._id)
790
791        state = self._manager and self._manager._state
792
793        self._close = util.Finalize(
794            self, BaseProxy._decref,
795            args=(self._token, self._authkey, state,
796                  self._tls, self._idset, self._Client),
797            exitpriority=10
798            )
799
800    @staticmethod
801    def _decref(token, authkey, state, tls, idset, _Client):
802        idset.discard(token.id)
803
804        # check whether manager is still alive
805        if state is None or state.value == State.STARTED:
806            # tell manager this process no longer cares about referent
807            try:
808                util.debug('DECREF %r', token.id)
809                conn = _Client(token.address, authkey=authkey)
810                dispatch(conn, None, 'decref', (token.id,))
811            except Exception as e:
812                util.debug('... decref failed %s', e)
813
814        else:
815            util.debug('DECREF %r -- manager already shutdown', token.id)
816
817        # check whether we can close this thread's connection because
818        # the process owns no more references to objects for this manager
819        if not idset and hasattr(tls, 'connection'):
820            util.debug('thread %r has no more proxies so closing conn',
821                       threading.current_thread().name)
822            tls.connection.close()
823            del tls.connection
824
825    def _after_fork(self):
826        self._manager = None
827        try:
828            self._incref()
829        except Exception as e:
830            # the proxy may just be for a manager which has shutdown
831            util.info('incref failed: %s' % e)
832
833    def __reduce__(self):
834        kwds = {}
835        if get_spawning_popen() is not None:
836            kwds['authkey'] = self._authkey
837
838        if getattr(self, '_isauto', False):
839            kwds['exposed'] = self._exposed_
840            return (RebuildProxy,
841                    (AutoProxy, self._token, self._serializer, kwds))
842        else:
843            return (RebuildProxy,
844                    (type(self), self._token, self._serializer, kwds))
845
846    def __deepcopy__(self, memo):
847        return self._getvalue()
848
849    def __repr__(self):
850        return '<%s object, typeid %r at %#x>' % \
851               (type(self).__name__, self._token.typeid, id(self))
852
853    def __str__(self):
854        '''
855        Return representation of the referent (or a fall-back if that fails)
856        '''
857        try:
858            return self._callmethod('__repr__')
859        except Exception:
860            return repr(self)[:-1] + "; '__str__()' failed>"
861
862#
863# Function used for unpickling
864#
865
866def RebuildProxy(func, token, serializer, kwds):
867    '''
868    Function used for unpickling proxy objects.
869    '''
870    server = getattr(process.current_process(), '_manager_server', None)
871    if server and server.address == token.address:
872        util.debug('Rebuild a proxy owned by manager, token=%r', token)
873        kwds['manager_owned'] = True
874        if token.id not in server.id_to_local_proxy_obj:
875            server.id_to_local_proxy_obj[token.id] = \
876                server.id_to_obj[token.id]
877    incref = (
878        kwds.pop('incref', True) and
879        not getattr(process.current_process(), '_inheriting', False)
880        )
881    return func(token, serializer, incref=incref, **kwds)
882
883#
884# Functions to create proxies and proxy types
885#
886
887def MakeProxyType(name, exposed, _cache={}):
888    '''
889    Return a proxy type whose methods are given by `exposed`
890    '''
891    exposed = tuple(exposed)
892    try:
893        return _cache[(name, exposed)]
894    except KeyError:
895        pass
896
897    dic = {}
898
899    for meth in exposed:
900        exec('''def %s(self, *args, **kwds):
901        return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
902
903    ProxyType = type(name, (BaseProxy,), dic)
904    ProxyType._exposed_ = exposed
905    _cache[(name, exposed)] = ProxyType
906    return ProxyType
907
908
909def AutoProxy(token, serializer, manager=None, authkey=None,
910              exposed=None, incref=True):
911    '''
912    Return an auto-proxy for `token`
913    '''
914    _Client = listener_client[serializer][1]
915
916    if exposed is None:
917        conn = _Client(token.address, authkey=authkey)
918        try:
919            exposed = dispatch(conn, None, 'get_methods', (token,))
920        finally:
921            conn.close()
922
923    if authkey is None and manager is not None:
924        authkey = manager._authkey
925    if authkey is None:
926        authkey = process.current_process().authkey
927
928    ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
929    proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
930                      incref=incref)
931    proxy._isauto = True
932    return proxy
933
934#
935# Types/callables which we will register with SyncManager
936#
937
938class Namespace(object):
939    def __init__(self, **kwds):
940        self.__dict__.update(kwds)
941    def __repr__(self):
942        items = list(self.__dict__.items())
943        temp = []
944        for name, value in items:
945            if not name.startswith('_'):
946                temp.append('%s=%r' % (name, value))
947        temp.sort()
948        return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
949
950class Value(object):
951    def __init__(self, typecode, value, lock=True):
952        self._typecode = typecode
953        self._value = value
954    def get(self):
955        return self._value
956    def set(self, value):
957        self._value = value
958    def __repr__(self):
959        return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
960    value = property(get, set)
961
962def Array(typecode, sequence, lock=True):
963    return array.array(typecode, sequence)
964
965#
966# Proxy types used by SyncManager
967#
968
969class IteratorProxy(BaseProxy):
970    _exposed_ = ('__next__', 'send', 'throw', 'close')
971    def __iter__(self):
972        return self
973    def __next__(self, *args):
974        return self._callmethod('__next__', args)
975    def send(self, *args):
976        return self._callmethod('send', args)
977    def throw(self, *args):
978        return self._callmethod('throw', args)
979    def close(self, *args):
980        return self._callmethod('close', args)
981
982
983class AcquirerProxy(BaseProxy):
984    _exposed_ = ('acquire', 'release')
985    def acquire(self, blocking=True, timeout=None):
986        args = (blocking,) if timeout is None else (blocking, timeout)
987        return self._callmethod('acquire', args)
988    def release(self):
989        return self._callmethod('release')
990    def __enter__(self):
991        return self._callmethod('acquire')
992    def __exit__(self, exc_type, exc_val, exc_tb):
993        return self._callmethod('release')
994
995
996class ConditionProxy(AcquirerProxy):
997    _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
998    def wait(self, timeout=None):
999        return self._callmethod('wait', (timeout,))
1000    def notify(self):
1001        return self._callmethod('notify')
1002    def notify_all(self):
1003        return self._callmethod('notify_all')
1004    def wait_for(self, predicate, timeout=None):
1005        result = predicate()
1006        if result:
1007            return result
1008        if timeout is not None:
1009            endtime = _time() + timeout
1010        else:
1011            endtime = None
1012            waittime = None
1013        while not result:
1014            if endtime is not None:
1015                waittime = endtime - _time()
1016                if waittime <= 0:
1017                    break
1018            self.wait(waittime)
1019            result = predicate()
1020        return result
1021
1022
1023class EventProxy(BaseProxy):
1024    _exposed_ = ('is_set', 'set', 'clear', 'wait')
1025    def is_set(self):
1026        return self._callmethod('is_set')
1027    def set(self):
1028        return self._callmethod('set')
1029    def clear(self):
1030        return self._callmethod('clear')
1031    def wait(self, timeout=None):
1032        return self._callmethod('wait', (timeout,))
1033
1034
1035class BarrierProxy(BaseProxy):
1036    _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1037    def wait(self, timeout=None):
1038        return self._callmethod('wait', (timeout,))
1039    def abort(self):
1040        return self._callmethod('abort')
1041    def reset(self):
1042        return self._callmethod('reset')
1043    @property
1044    def parties(self):
1045        return self._callmethod('__getattribute__', ('parties',))
1046    @property
1047    def n_waiting(self):
1048        return self._callmethod('__getattribute__', ('n_waiting',))
1049    @property
1050    def broken(self):
1051        return self._callmethod('__getattribute__', ('broken',))
1052
1053
1054class NamespaceProxy(BaseProxy):
1055    _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1056    def __getattr__(self, key):
1057        if key[0] == '_':
1058            return object.__getattribute__(self, key)
1059        callmethod = object.__getattribute__(self, '_callmethod')
1060        return callmethod('__getattribute__', (key,))
1061    def __setattr__(self, key, value):
1062        if key[0] == '_':
1063            return object.__setattr__(self, key, value)
1064        callmethod = object.__getattribute__(self, '_callmethod')
1065        return callmethod('__setattr__', (key, value))
1066    def __delattr__(self, key):
1067        if key[0] == '_':
1068            return object.__delattr__(self, key)
1069        callmethod = object.__getattribute__(self, '_callmethod')
1070        return callmethod('__delattr__', (key,))
1071
1072
1073class ValueProxy(BaseProxy):
1074    _exposed_ = ('get', 'set')
1075    def get(self):
1076        return self._callmethod('get')
1077    def set(self, value):
1078        return self._callmethod('set', (value,))
1079    value = property(get, set)
1080
1081
1082BaseListProxy = MakeProxyType('BaseListProxy', (
1083    '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1084    '__mul__', '__reversed__', '__rmul__', '__setitem__',
1085    'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1086    'reverse', 'sort', '__imul__'
1087    ))
1088class ListProxy(BaseListProxy):
1089    def __iadd__(self, value):
1090        self._callmethod('extend', (value,))
1091        return self
1092    def __imul__(self, value):
1093        self._callmethod('__imul__', (value,))
1094        return self
1095
1096
1097DictProxy = MakeProxyType('DictProxy', (
1098    '__contains__', '__delitem__', '__getitem__', '__len__',
1099    '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1100    'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1101    ))
1102
1103
1104ArrayProxy = MakeProxyType('ArrayProxy', (
1105    '__len__', '__getitem__', '__setitem__'
1106    ))
1107
1108
1109BasePoolProxy = MakeProxyType('PoolProxy', (
1110    'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1111    'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
1112    ))
1113BasePoolProxy._method_to_typeid_ = {
1114    'apply_async': 'AsyncResult',
1115    'map_async': 'AsyncResult',
1116    'starmap_async': 'AsyncResult',
1117    'imap': 'Iterator',
1118    'imap_unordered': 'Iterator'
1119    }
1120class PoolProxy(BasePoolProxy):
1121    def __enter__(self):
1122        return self
1123    def __exit__(self, exc_type, exc_val, exc_tb):
1124        self.terminate()
1125
1126#
1127# Definition of SyncManager
1128#
1129
1130class SyncManager(BaseManager):
1131    '''
1132    Subclass of `BaseManager` which supports a number of shared object types.
1133
1134    The types registered are those intended for the synchronization
1135    of threads, plus `dict`, `list` and `Namespace`.
1136
1137    The `multiprocessing.Manager()` function creates started instances of
1138    this class.
1139    '''
1140
1141SyncManager.register('Queue', queue.Queue)
1142SyncManager.register('JoinableQueue', queue.Queue)
1143SyncManager.register('Event', threading.Event, EventProxy)
1144SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1145SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1146SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1147SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1148                     AcquirerProxy)
1149SyncManager.register('Condition', threading.Condition, ConditionProxy)
1150SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
1151SyncManager.register('Pool', pool.Pool, PoolProxy)
1152SyncManager.register('list', list, ListProxy)
1153SyncManager.register('dict', dict, DictProxy)
1154SyncManager.register('Value', Value, ValueProxy)
1155SyncManager.register('Array', Array, ArrayProxy)
1156SyncManager.register('Namespace', Namespace, NamespaceProxy)
1157
1158# types returned by methods of PoolProxy
1159SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1160SyncManager.register('AsyncResult', create_method=False)
1161