1#
2# Module providing the `SyncManager` class for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
7# Copyright (c) 2006-2008, R Oudkerk
8# All rights reserved.
9#
10# Redistribution and use in source and binary forms, with or without
11# modification, are permitted provided that the following conditions
12# are met:
13#
14# 1. Redistributions of source code must retain the above copyright
15#    notice, this list of conditions and the following disclaimer.
16# 2. Redistributions in binary form must reproduce the above copyright
17#    notice, this list of conditions and the following disclaimer in the
18#    documentation and/or other materials provided with the distribution.
19# 3. Neither the name of author nor the names of any contributors may be
20#    used to endorse or promote products derived from this software
21#    without specific prior written permission.
22#
23# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
24# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
27# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
28# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
29# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
30# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
31# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
32# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33# SUCH DAMAGE.
34#
35
36__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ]
37
38#
39# Imports
40#
41
42import os
43import sys
44import weakref
45import threading
46import array
47import Queue
48
49from traceback import format_exc
50from multiprocessing import Process, current_process, active_children, Pool, util, connection
51from multiprocessing.process import AuthenticationString
52from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler
53from multiprocessing.util import Finalize, info
54
55try:
56    from cPickle import PicklingError
57except ImportError:
58    from pickle import PicklingError
59
60#
61# Register some things for pickling
62#
63
64def reduce_array(a):
65    return array.array, (a.typecode, a.tostring())
66ForkingPickler.register(array.array, reduce_array)
67
68view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
69
70#
71# Type for identifying shared objects
72#
73
74class Token(object):
75    '''
76    Type to uniquely indentify a shared object
77    '''
78    __slots__ = ('typeid', 'address', 'id')
79
80    def __init__(self, typeid, address, id):
81        (self.typeid, self.address, self.id) = (typeid, address, id)
82
83    def __getstate__(self):
84        return (self.typeid, self.address, self.id)
85
86    def __setstate__(self, state):
87        (self.typeid, self.address, self.id) = state
88
89    def __repr__(self):
90        return 'Token(typeid=%r, address=%r, id=%r)' % \
91               (self.typeid, self.address, self.id)
92
93#
94# Function for communication with a manager's server process
95#
96
97def dispatch(c, id, methodname, args=(), kwds={}):
98    '''
99    Send a message to manager using connection `c` and return response
100    '''
101    c.send((id, methodname, args, kwds))
102    kind, result = c.recv()
103    if kind == '#RETURN':
104        return result
105    raise convert_to_error(kind, result)
106
107def convert_to_error(kind, result):
108    if kind == '#ERROR':
109        return result
110    elif kind == '#TRACEBACK':
111        assert type(result) is str
112        return  RemoteError(result)
113    elif kind == '#UNSERIALIZABLE':
114        assert type(result) is str
115        return RemoteError('Unserializable message: %s\n' % result)
116    else:
117        return ValueError('Unrecognized message type')
118
119class RemoteError(Exception):
120    def __str__(self):
121        return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
122
123#
124# Functions for finding the method names of an object
125#
126
127def all_methods(obj):
128    '''
129    Return a list of names of methods of `obj`
130    '''
131    temp = []
132    for name in dir(obj):
133        func = getattr(obj, name)
134        if hasattr(func, '__call__'):
135            temp.append(name)
136    return temp
137
138def public_methods(obj):
139    '''
140    Return a list of names of methods of `obj` which do not start with '_'
141    '''
142    return [name for name in all_methods(obj) if name[0] != '_']
143
144#
145# Server which is run in a process controlled by a manager
146#
147
148class Server(object):
149    '''
150    Server class which runs in a process controlled by a manager object
151    '''
152    public = ['shutdown', 'create', 'accept_connection', 'get_methods',
153              'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
154
155    def __init__(self, registry, address, authkey, serializer):
156        assert isinstance(authkey, bytes)
157        self.registry = registry
158        self.authkey = AuthenticationString(authkey)
159        Listener, Client = listener_client[serializer]
160
161        # do authentication later
162        self.listener = Listener(address=address, backlog=16)
163        self.address = self.listener.address
164
165        self.id_to_obj = {'0': (None, ())}
166        self.id_to_refcount = {}
167        self.mutex = threading.RLock()
168        self.stop = 0
169
170    def serve_forever(self):
171        '''
172        Run the server forever
173        '''
174        current_process()._manager_server = self
175        try:
176            try:
177                while 1:
178                    try:
179                        c = self.listener.accept()
180                    except (OSError, IOError):
181                        continue
182                    t = threading.Thread(target=self.handle_request, args=(c,))
183                    t.daemon = True
184                    t.start()
185            except (KeyboardInterrupt, SystemExit):
186                pass
187        finally:
188            self.stop = 999
189            self.listener.close()
190
191    def handle_request(self, c):
192        '''
193        Handle a new connection
194        '''
195        funcname = result = request = None
196        try:
197            connection.deliver_challenge(c, self.authkey)
198            connection.answer_challenge(c, self.authkey)
199            request = c.recv()
200            ignore, funcname, args, kwds = request
201            assert funcname in self.public, '%r unrecognized' % funcname
202            func = getattr(self, funcname)
203        except Exception:
204            msg = ('#TRACEBACK', format_exc())
205        else:
206            try:
207                result = func(c, *args, **kwds)
208            except Exception:
209                msg = ('#TRACEBACK', format_exc())
210            else:
211                msg = ('#RETURN', result)
212        try:
213            c.send(msg)
214        except Exception, e:
215            try:
216                c.send(('#TRACEBACK', format_exc()))
217            except Exception:
218                pass
219            util.info('Failure to send message: %r', msg)
220            util.info(' ... request was %r', request)
221            util.info(' ... exception was %r', e)
222
223        c.close()
224
225    def serve_client(self, conn):
226        '''
227        Handle requests from the proxies in a particular process/thread
228        '''
229        util.debug('starting server thread to service %r',
230                   threading.current_thread().name)
231
232        recv = conn.recv
233        send = conn.send
234        id_to_obj = self.id_to_obj
235
236        while not self.stop:
237
238            try:
239                methodname = obj = None
240                request = recv()
241                ident, methodname, args, kwds = request
242                obj, exposed, gettypeid = id_to_obj[ident]
243
244                if methodname not in exposed:
245                    raise AttributeError(
246                        'method %r of %r object is not in exposed=%r' %
247                        (methodname, type(obj), exposed)
248                        )
249
250                function = getattr(obj, methodname)
251
252                try:
253                    res = function(*args, **kwds)
254                except Exception, e:
255                    msg = ('#ERROR', e)
256                else:
257                    typeid = gettypeid and gettypeid.get(methodname, None)
258                    if typeid:
259                        rident, rexposed = self.create(conn, typeid, res)
260                        token = Token(typeid, self.address, rident)
261                        msg = ('#PROXY', (rexposed, token))
262                    else:
263                        msg = ('#RETURN', res)
264
265            except AttributeError:
266                if methodname is None:
267                    msg = ('#TRACEBACK', format_exc())
268                else:
269                    try:
270                        fallback_func = self.fallback_mapping[methodname]
271                        result = fallback_func(
272                            self, conn, ident, obj, *args, **kwds
273                            )
274                        msg = ('#RETURN', result)
275                    except Exception:
276                        msg = ('#TRACEBACK', format_exc())
277
278            except EOFError:
279                util.debug('got EOF -- exiting thread serving %r',
280                           threading.current_thread().name)
281                sys.exit(0)
282
283            except Exception:
284                msg = ('#TRACEBACK', format_exc())
285
286            try:
287                try:
288                    send(msg)
289                except Exception, e:
290                    send(('#UNSERIALIZABLE', repr(msg)))
291            except Exception, e:
292                util.info('exception in thread serving %r',
293                        threading.current_thread().name)
294                util.info(' ... message was %r', msg)
295                util.info(' ... exception was %r', e)
296                conn.close()
297                sys.exit(1)
298
299    def fallback_getvalue(self, conn, ident, obj):
300        return obj
301
302    def fallback_str(self, conn, ident, obj):
303        return str(obj)
304
305    def fallback_repr(self, conn, ident, obj):
306        return repr(obj)
307
308    fallback_mapping = {
309        '__str__':fallback_str,
310        '__repr__':fallback_repr,
311        '#GETVALUE':fallback_getvalue
312        }
313
314    def dummy(self, c):
315        pass
316
317    def debug_info(self, c):
318        '''
319        Return some info --- useful to spot problems with refcounting
320        '''
321        self.mutex.acquire()
322        try:
323            result = []
324            keys = self.id_to_obj.keys()
325            keys.sort()
326            for ident in keys:
327                if ident != '0':
328                    result.append('  %s:       refcount=%s\n    %s' %
329                                  (ident, self.id_to_refcount[ident],
330                                   str(self.id_to_obj[ident][0])[:75]))
331            return '\n'.join(result)
332        finally:
333            self.mutex.release()
334
335    def number_of_objects(self, c):
336        '''
337        Number of shared objects
338        '''
339        return len(self.id_to_obj) - 1      # don't count ident='0'
340
341    def shutdown(self, c):
342        '''
343        Shutdown this process
344        '''
345        try:
346            try:
347                util.debug('manager received shutdown message')
348                c.send(('#RETURN', None))
349
350                if sys.stdout != sys.__stdout__:
351                    util.debug('resetting stdout, stderr')
352                    sys.stdout = sys.__stdout__
353                    sys.stderr = sys.__stderr__
354
355                util._run_finalizers(0)
356
357                for p in active_children():
358                    util.debug('terminating a child process of manager')
359                    p.terminate()
360
361                for p in active_children():
362                    util.debug('terminating a child process of manager')
363                    p.join()
364
365                util._run_finalizers()
366                util.info('manager exiting with exitcode 0')
367            except:
368                import traceback
369                traceback.print_exc()
370        finally:
371            exit(0)
372
373    def create(self, c, typeid, *args, **kwds):
374        '''
375        Create a new shared object and return its id
376        '''
377        self.mutex.acquire()
378        try:
379            callable, exposed, method_to_typeid, proxytype = \
380                      self.registry[typeid]
381
382            if callable is None:
383                assert len(args) == 1 and not kwds
384                obj = args[0]
385            else:
386                obj = callable(*args, **kwds)
387
388            if exposed is None:
389                exposed = public_methods(obj)
390            if method_to_typeid is not None:
391                assert type(method_to_typeid) is dict
392                exposed = list(exposed) + list(method_to_typeid)
393
394            ident = '%x' % id(obj)  # convert to string because xmlrpclib
395                                    # only has 32 bit signed integers
396            util.debug('%r callable returned object with id %r', typeid, ident)
397
398            self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
399            if ident not in self.id_to_refcount:
400                self.id_to_refcount[ident] = 0
401            # increment the reference count immediately, to avoid
402            # this object being garbage collected before a Proxy
403            # object for it can be created.  The caller of create()
404            # is responsible for doing a decref once the Proxy object
405            # has been created.
406            self.incref(c, ident)
407            return ident, tuple(exposed)
408        finally:
409            self.mutex.release()
410
411    def get_methods(self, c, token):
412        '''
413        Return the methods of the shared object indicated by token
414        '''
415        return tuple(self.id_to_obj[token.id][1])
416
417    def accept_connection(self, c, name):
418        '''
419        Spawn a new thread to serve this connection
420        '''
421        threading.current_thread().name = name
422        c.send(('#RETURN', None))
423        self.serve_client(c)
424
425    def incref(self, c, ident):
426        self.mutex.acquire()
427        try:
428            self.id_to_refcount[ident] += 1
429        finally:
430            self.mutex.release()
431
432    def decref(self, c, ident):
433        self.mutex.acquire()
434        try:
435            assert self.id_to_refcount[ident] >= 1
436            self.id_to_refcount[ident] -= 1
437            if self.id_to_refcount[ident] == 0:
438                del self.id_to_obj[ident], self.id_to_refcount[ident]
439                util.debug('disposing of obj with id %r', ident)
440        finally:
441            self.mutex.release()
442
443#
444# Class to represent state of a manager
445#
446
447class State(object):
448    __slots__ = ['value']
449    INITIAL = 0
450    STARTED = 1
451    SHUTDOWN = 2
452
453#
454# Mapping from serializer name to Listener and Client types
455#
456
457listener_client = {
458    'pickle' : (connection.Listener, connection.Client),
459    'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
460    }
461
462#
463# Definition of BaseManager
464#
465
466class BaseManager(object):
467    '''
468    Base class for managers
469    '''
470    _registry = {}
471    _Server = Server
472
473    def __init__(self, address=None, authkey=None, serializer='pickle'):
474        if authkey is None:
475            authkey = current_process().authkey
476        self._address = address     # XXX not final address if eg ('', 0)
477        self._authkey = AuthenticationString(authkey)
478        self._state = State()
479        self._state.value = State.INITIAL
480        self._serializer = serializer
481        self._Listener, self._Client = listener_client[serializer]
482
483    def __reduce__(self):
484        return type(self).from_address, \
485               (self._address, self._authkey, self._serializer)
486
487    def get_server(self):
488        '''
489        Return server object with serve_forever() method and address attribute
490        '''
491        assert self._state.value == State.INITIAL
492        return Server(self._registry, self._address,
493                      self._authkey, self._serializer)
494
495    def connect(self):
496        '''
497        Connect manager object to the server process
498        '''
499        Listener, Client = listener_client[self._serializer]
500        conn = Client(self._address, authkey=self._authkey)
501        dispatch(conn, None, 'dummy')
502        self._state.value = State.STARTED
503
504    def start(self, initializer=None, initargs=()):
505        '''
506        Spawn a server process for this manager object
507        '''
508        assert self._state.value == State.INITIAL
509
510        if initializer is not None and not hasattr(initializer, '__call__'):
511            raise TypeError('initializer must be a callable')
512
513        # pipe over which we will retrieve address of server
514        reader, writer = connection.Pipe(duplex=False)
515
516        # spawn process which runs a server
517        self._process = Process(
518            target=type(self)._run_server,
519            args=(self._registry, self._address, self._authkey,
520                  self._serializer, writer, initializer, initargs),
521            )
522        ident = ':'.join(str(i) for i in self._process._identity)
523        self._process.name = type(self).__name__  + '-' + ident
524        self._process.start()
525
526        # get address of server
527        writer.close()
528        self._address = reader.recv()
529        reader.close()
530
531        # register a finalizer
532        self._state.value = State.STARTED
533        self.shutdown = util.Finalize(
534            self, type(self)._finalize_manager,
535            args=(self._process, self._address, self._authkey,
536                  self._state, self._Client),
537            exitpriority=0
538            )
539
540    @classmethod
541    def _run_server(cls, registry, address, authkey, serializer, writer,
542                    initializer=None, initargs=()):
543        '''
544        Create a server, report its address and run it
545        '''
546        if initializer is not None:
547            initializer(*initargs)
548
549        # create server
550        server = cls._Server(registry, address, authkey, serializer)
551
552        # inform parent process of the server's address
553        writer.send(server.address)
554        writer.close()
555
556        # run the manager
557        util.info('manager serving at %r', server.address)
558        server.serve_forever()
559
560    def _create(self, typeid, *args, **kwds):
561        '''
562        Create a new shared object; return the token and exposed tuple
563        '''
564        assert self._state.value == State.STARTED, 'server not yet started'
565        conn = self._Client(self._address, authkey=self._authkey)
566        try:
567            id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
568        finally:
569            conn.close()
570        return Token(typeid, self._address, id), exposed
571
572    def join(self, timeout=None):
573        '''
574        Join the manager process (if it has been spawned)
575        '''
576        self._process.join(timeout)
577
578    def _debug_info(self):
579        '''
580        Return some info about the servers shared objects and connections
581        '''
582        conn = self._Client(self._address, authkey=self._authkey)
583        try:
584            return dispatch(conn, None, 'debug_info')
585        finally:
586            conn.close()
587
588    def _number_of_objects(self):
589        '''
590        Return the number of shared objects
591        '''
592        conn = self._Client(self._address, authkey=self._authkey)
593        try:
594            return dispatch(conn, None, 'number_of_objects')
595        finally:
596            conn.close()
597
598    def __enter__(self):
599        return self
600
601    def __exit__(self, exc_type, exc_val, exc_tb):
602        self.shutdown()
603
604    @staticmethod
605    def _finalize_manager(process, address, authkey, state, _Client):
606        '''
607        Shutdown the manager process; will be registered as a finalizer
608        '''
609        if process.is_alive():
610            util.info('sending shutdown message to manager')
611            try:
612                conn = _Client(address, authkey=authkey)
613                try:
614                    dispatch(conn, None, 'shutdown')
615                finally:
616                    conn.close()
617            except Exception:
618                pass
619
620            process.join(timeout=0.2)
621            if process.is_alive():
622                util.info('manager still alive')
623                if hasattr(process, 'terminate'):
624                    util.info('trying to `terminate()` manager process')
625                    process.terminate()
626                    process.join(timeout=0.1)
627                    if process.is_alive():
628                        util.info('manager still alive after terminate')
629
630        state.value = State.SHUTDOWN
631        try:
632            del BaseProxy._address_to_local[address]
633        except KeyError:
634            pass
635
636    address = property(lambda self: self._address)
637
638    @classmethod
639    def register(cls, typeid, callable=None, proxytype=None, exposed=None,
640                 method_to_typeid=None, create_method=True):
641        '''
642        Register a typeid with the manager type
643        '''
644        if '_registry' not in cls.__dict__:
645            cls._registry = cls._registry.copy()
646
647        if proxytype is None:
648            proxytype = AutoProxy
649
650        exposed = exposed or getattr(proxytype, '_exposed_', None)
651
652        method_to_typeid = method_to_typeid or \
653                           getattr(proxytype, '_method_to_typeid_', None)
654
655        if method_to_typeid:
656            for key, value in method_to_typeid.items():
657                assert type(key) is str, '%r is not a string' % key
658                assert type(value) is str, '%r is not a string' % value
659
660        cls._registry[typeid] = (
661            callable, exposed, method_to_typeid, proxytype
662            )
663
664        if create_method:
665            def temp(self, *args, **kwds):
666                util.debug('requesting creation of a shared %r object', typeid)
667                token, exp = self._create(typeid, *args, **kwds)
668                proxy = proxytype(
669                    token, self._serializer, manager=self,
670                    authkey=self._authkey, exposed=exp
671                    )
672                conn = self._Client(token.address, authkey=self._authkey)
673                dispatch(conn, None, 'decref', (token.id,))
674                return proxy
675            temp.__name__ = typeid
676            setattr(cls, typeid, temp)
677
678#
679# Subclass of set which get cleared after a fork
680#
681
682class ProcessLocalSet(set):
683    def __init__(self):
684        util.register_after_fork(self, lambda obj: obj.clear())
685    def __reduce__(self):
686        return type(self), ()
687
688#
689# Definition of BaseProxy
690#
691
692class BaseProxy(object):
693    '''
694    A base for proxies of shared objects
695    '''
696    _address_to_local = {}
697    _mutex = util.ForkAwareThreadLock()
698
699    def __init__(self, token, serializer, manager=None,
700                 authkey=None, exposed=None, incref=True):
701        BaseProxy._mutex.acquire()
702        try:
703            tls_idset = BaseProxy._address_to_local.get(token.address, None)
704            if tls_idset is None:
705                tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
706                BaseProxy._address_to_local[token.address] = tls_idset
707        finally:
708            BaseProxy._mutex.release()
709
710        # self._tls is used to record the connection used by this
711        # thread to communicate with the manager at token.address
712        self._tls = tls_idset[0]
713
714        # self._idset is used to record the identities of all shared
715        # objects for which the current process owns references and
716        # which are in the manager at token.address
717        self._idset = tls_idset[1]
718
719        self._token = token
720        self._id = self._token.id
721        self._manager = manager
722        self._serializer = serializer
723        self._Client = listener_client[serializer][1]
724
725        if authkey is not None:
726            self._authkey = AuthenticationString(authkey)
727        elif self._manager is not None:
728            self._authkey = self._manager._authkey
729        else:
730            self._authkey = current_process().authkey
731
732        if incref:
733            self._incref()
734
735        util.register_after_fork(self, BaseProxy._after_fork)
736
737    def _connect(self):
738        util.debug('making connection to manager')
739        name = current_process().name
740        if threading.current_thread().name != 'MainThread':
741            name += '|' + threading.current_thread().name
742        conn = self._Client(self._token.address, authkey=self._authkey)
743        dispatch(conn, None, 'accept_connection', (name,))
744        self._tls.connection = conn
745
746    def _callmethod(self, methodname, args=(), kwds={}):
747        '''
748        Try to call a method of the referrent and return a copy of the result
749        '''
750        try:
751            conn = self._tls.connection
752        except AttributeError:
753            util.debug('thread %r does not own a connection',
754                       threading.current_thread().name)
755            self._connect()
756            conn = self._tls.connection
757
758        conn.send((self._id, methodname, args, kwds))
759        kind, result = conn.recv()
760
761        if kind == '#RETURN':
762            return result
763        elif kind == '#PROXY':
764            exposed, token = result
765            proxytype = self._manager._registry[token.typeid][-1]
766            proxy = proxytype(
767                token, self._serializer, manager=self._manager,
768                authkey=self._authkey, exposed=exposed
769                )
770            conn = self._Client(token.address, authkey=self._authkey)
771            dispatch(conn, None, 'decref', (token.id,))
772            return proxy
773        raise convert_to_error(kind, result)
774
775    def _getvalue(self):
776        '''
777        Get a copy of the value of the referent
778        '''
779        return self._callmethod('#GETVALUE')
780
781    def _incref(self):
782        conn = self._Client(self._token.address, authkey=self._authkey)
783        dispatch(conn, None, 'incref', (self._id,))
784        util.debug('INCREF %r', self._token.id)
785
786        self._idset.add(self._id)
787
788        state = self._manager and self._manager._state
789
790        self._close = util.Finalize(
791            self, BaseProxy._decref,
792            args=(self._token, self._authkey, state,
793                  self._tls, self._idset, self._Client),
794            exitpriority=10
795            )
796
797    @staticmethod
798    def _decref(token, authkey, state, tls, idset, _Client):
799        idset.discard(token.id)
800
801        # check whether manager is still alive
802        if state is None or state.value == State.STARTED:
803            # tell manager this process no longer cares about referent
804            try:
805                util.debug('DECREF %r', token.id)
806                conn = _Client(token.address, authkey=authkey)
807                dispatch(conn, None, 'decref', (token.id,))
808            except Exception, e:
809                util.debug('... decref failed %s', e)
810
811        else:
812            util.debug('DECREF %r -- manager already shutdown', token.id)
813
814        # check whether we can close this thread's connection because
815        # the process owns no more references to objects for this manager
816        if not idset and hasattr(tls, 'connection'):
817            util.debug('thread %r has no more proxies so closing conn',
818                       threading.current_thread().name)
819            tls.connection.close()
820            del tls.connection
821
822    def _after_fork(self):
823        self._manager = None
824        try:
825            self._incref()
826        except Exception, e:
827            # the proxy may just be for a manager which has shutdown
828            util.info('incref failed: %s' % e)
829
830    def __reduce__(self):
831        kwds = {}
832        if Popen.thread_is_spawning():
833            kwds['authkey'] = self._authkey
834
835        if getattr(self, '_isauto', False):
836            kwds['exposed'] = self._exposed_
837            return (RebuildProxy,
838                    (AutoProxy, self._token, self._serializer, kwds))
839        else:
840            return (RebuildProxy,
841                    (type(self), self._token, self._serializer, kwds))
842
843    def __deepcopy__(self, memo):
844        return self._getvalue()
845
846    def __repr__(self):
847        return '<%s object, typeid %r at %s>' % \
848               (type(self).__name__, self._token.typeid, '0x%x' % id(self))
849
850    def __str__(self):
851        '''
852        Return representation of the referent (or a fall-back if that fails)
853        '''
854        try:
855            return self._callmethod('__repr__')
856        except Exception:
857            return repr(self)[:-1] + "; '__str__()' failed>"
858
859#
860# Function used for unpickling
861#
862
863def RebuildProxy(func, token, serializer, kwds):
864    '''
865    Function used for unpickling proxy objects.
866
867    If possible the shared object is returned, or otherwise a proxy for it.
868    '''
869    server = getattr(current_process(), '_manager_server', None)
870
871    if server and server.address == token.address:
872        return server.id_to_obj[token.id][0]
873    else:
874        incref = (
875            kwds.pop('incref', True) and
876            not getattr(current_process(), '_inheriting', False)
877            )
878        return func(token, serializer, incref=incref, **kwds)
879
880#
881# Functions to create proxies and proxy types
882#
883
884def MakeProxyType(name, exposed, _cache={}):
885    '''
886    Return an proxy type whose methods are given by `exposed`
887    '''
888    exposed = tuple(exposed)
889    try:
890        return _cache[(name, exposed)]
891    except KeyError:
892        pass
893
894    dic = {}
895
896    for meth in exposed:
897        exec '''def %s(self, *args, **kwds):
898        return self._callmethod(%r, args, kwds)''' % (meth, meth) in dic
899
900    ProxyType = type(name, (BaseProxy,), dic)
901    ProxyType._exposed_ = exposed
902    _cache[(name, exposed)] = ProxyType
903    return ProxyType
904
905
906def AutoProxy(token, serializer, manager=None, authkey=None,
907              exposed=None, incref=True):
908    '''
909    Return an auto-proxy for `token`
910    '''
911    _Client = listener_client[serializer][1]
912
913    if exposed is None:
914        conn = _Client(token.address, authkey=authkey)
915        try:
916            exposed = dispatch(conn, None, 'get_methods', (token,))
917        finally:
918            conn.close()
919
920    if authkey is None and manager is not None:
921        authkey = manager._authkey
922    if authkey is None:
923        authkey = current_process().authkey
924
925    ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
926    proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
927                      incref=incref)
928    proxy._isauto = True
929    return proxy
930
931#
932# Types/callables which we will register with SyncManager
933#
934
935class Namespace(object):
936    def __init__(self, **kwds):
937        self.__dict__.update(kwds)
938    def __repr__(self):
939        items = self.__dict__.items()
940        temp = []
941        for name, value in items:
942            if not name.startswith('_'):
943                temp.append('%s=%r' % (name, value))
944        temp.sort()
945        return 'Namespace(%s)' % str.join(', ', temp)
946
947class Value(object):
948    def __init__(self, typecode, value, lock=True):
949        self._typecode = typecode
950        self._value = value
951    def get(self):
952        return self._value
953    def set(self, value):
954        self._value = value
955    def __repr__(self):
956        return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
957    value = property(get, set)
958
959def Array(typecode, sequence, lock=True):
960    return array.array(typecode, sequence)
961
962#
963# Proxy types used by SyncManager
964#
965
966class IteratorProxy(BaseProxy):
967    # XXX remove methods for Py3.0 and Py2.6
968    _exposed_ = ('__next__', 'next', 'send', 'throw', 'close')
969    def __iter__(self):
970        return self
971    def __next__(self, *args):
972        return self._callmethod('__next__', args)
973    def next(self, *args):
974        return self._callmethod('next', args)
975    def send(self, *args):
976        return self._callmethod('send', args)
977    def throw(self, *args):
978        return self._callmethod('throw', args)
979    def close(self, *args):
980        return self._callmethod('close', args)
981
982
983class AcquirerProxy(BaseProxy):
984    _exposed_ = ('acquire', 'release')
985    def acquire(self, blocking=True):
986        return self._callmethod('acquire', (blocking,))
987    def release(self):
988        return self._callmethod('release')
989    def __enter__(self):
990        return self._callmethod('acquire')
991    def __exit__(self, exc_type, exc_val, exc_tb):
992        return self._callmethod('release')
993
994
995class ConditionProxy(AcquirerProxy):
996    # XXX will Condition.notfyAll() name be available in Py3.0?
997    _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
998    def wait(self, timeout=None):
999        return self._callmethod('wait', (timeout,))
1000    def notify(self):
1001        return self._callmethod('notify')
1002    def notify_all(self):
1003        return self._callmethod('notify_all')
1004
1005class EventProxy(BaseProxy):
1006    _exposed_ = ('is_set', 'set', 'clear', 'wait')
1007    def is_set(self):
1008        return self._callmethod('is_set')
1009    def set(self):
1010        return self._callmethod('set')
1011    def clear(self):
1012        return self._callmethod('clear')
1013    def wait(self, timeout=None):
1014        return self._callmethod('wait', (timeout,))
1015
1016class NamespaceProxy(BaseProxy):
1017    _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1018    def __getattr__(self, key):
1019        if key[0] == '_':
1020            return object.__getattribute__(self, key)
1021        callmethod = object.__getattribute__(self, '_callmethod')
1022        return callmethod('__getattribute__', (key,))
1023    def __setattr__(self, key, value):
1024        if key[0] == '_':
1025            return object.__setattr__(self, key, value)
1026        callmethod = object.__getattribute__(self, '_callmethod')
1027        return callmethod('__setattr__', (key, value))
1028    def __delattr__(self, key):
1029        if key[0] == '_':
1030            return object.__delattr__(self, key)
1031        callmethod = object.__getattribute__(self, '_callmethod')
1032        return callmethod('__delattr__', (key,))
1033
1034
1035class ValueProxy(BaseProxy):
1036    _exposed_ = ('get', 'set')
1037    def get(self):
1038        return self._callmethod('get')
1039    def set(self, value):
1040        return self._callmethod('set', (value,))
1041    value = property(get, set)
1042
1043
1044BaseListProxy = MakeProxyType('BaseListProxy', (
1045    '__add__', '__contains__', '__delitem__', '__delslice__',
1046    '__getitem__', '__getslice__', '__len__', '__mul__',
1047    '__reversed__', '__rmul__', '__setitem__', '__setslice__',
1048    'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1049    'reverse', 'sort', '__imul__'
1050    ))                  # XXX __getslice__ and __setslice__ unneeded in Py3.0
1051class ListProxy(BaseListProxy):
1052    def __iadd__(self, value):
1053        self._callmethod('extend', (value,))
1054        return self
1055    def __imul__(self, value):
1056        self._callmethod('__imul__', (value,))
1057        return self
1058
1059
1060DictProxy = MakeProxyType('DictProxy', (
1061    '__contains__', '__delitem__', '__getitem__', '__len__',
1062    '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items',
1063    'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1064    ))
1065
1066
1067ArrayProxy = MakeProxyType('ArrayProxy', (
1068    '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__'
1069    ))                  # XXX __getslice__ and __setslice__ unneeded in Py3.0
1070
1071
1072PoolProxy = MakeProxyType('PoolProxy', (
1073    'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1074    'map', 'map_async', 'terminate'
1075    ))
1076PoolProxy._method_to_typeid_ = {
1077    'apply_async': 'AsyncResult',
1078    'map_async': 'AsyncResult',
1079    'imap': 'Iterator',
1080    'imap_unordered': 'Iterator'
1081    }
1082
1083#
1084# Definition of SyncManager
1085#
1086
1087class SyncManager(BaseManager):
1088    '''
1089    Subclass of `BaseManager` which supports a number of shared object types.
1090
1091    The types registered are those intended for the synchronization
1092    of threads, plus `dict`, `list` and `Namespace`.
1093
1094    The `multiprocessing.Manager()` function creates started instances of
1095    this class.
1096    '''
1097
1098SyncManager.register('Queue', Queue.Queue)
1099SyncManager.register('JoinableQueue', Queue.Queue)
1100SyncManager.register('Event', threading.Event, EventProxy)
1101SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1102SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1103SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1104SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1105                     AcquirerProxy)
1106SyncManager.register('Condition', threading.Condition, ConditionProxy)
1107SyncManager.register('Pool', Pool, PoolProxy)
1108SyncManager.register('list', list, ListProxy)
1109SyncManager.register('dict', dict, DictProxy)
1110SyncManager.register('Value', Value, ValueProxy)
1111SyncManager.register('Array', Array, ArrayProxy)
1112SyncManager.register('Namespace', Namespace, NamespaceProxy)
1113
1114# types returned by methods of PoolProxy
1115SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1116SyncManager.register('AsyncResult', create_method=False)
1117