1# 2# Module providing the `SyncManager` class for dealing 3# with shared objects 4# 5# multiprocessing/managers.py 6# 7# Copyright (c) 2006-2008, R Oudkerk 8# Licensed to PSF under a Contributor Agreement. 9# 10 11__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ] 12 13# 14# Imports 15# 16 17import sys 18import threading 19import array 20import queue 21 22from time import time as _time 23from traceback import format_exc 24 25from . import connection 26from .context import reduction, get_spawning_popen 27from . import pool 28from . import process 29from . import util 30from . import get_context 31 32# 33# Register some things for pickling 34# 35 36def reduce_array(a): 37 return array.array, (a.typecode, a.tobytes()) 38reduction.register(array.array, reduce_array) 39 40view_types = [type(getattr({}, name)()) for name in ('items','keys','values')] 41if view_types[0] is not list: # only needed in Py3.0 42 def rebuild_as_list(obj): 43 return list, (list(obj),) 44 for view_type in view_types: 45 reduction.register(view_type, rebuild_as_list) 46 47# 48# Type for identifying shared objects 49# 50 51class Token(object): 52 ''' 53 Type to uniquely indentify a shared object 54 ''' 55 __slots__ = ('typeid', 'address', 'id') 56 57 def __init__(self, typeid, address, id): 58 (self.typeid, self.address, self.id) = (typeid, address, id) 59 60 def __getstate__(self): 61 return (self.typeid, self.address, self.id) 62 63 def __setstate__(self, state): 64 (self.typeid, self.address, self.id) = state 65 66 def __repr__(self): 67 return '%s(typeid=%r, address=%r, id=%r)' % \ 68 (self.__class__.__name__, self.typeid, self.address, self.id) 69 70# 71# Function for communication with a manager's server process 72# 73 74def dispatch(c, id, methodname, args=(), kwds={}): 75 ''' 76 Send a message to manager using connection `c` and return response 77 ''' 78 c.send((id, methodname, args, kwds)) 79 kind, result = c.recv() 80 if kind == '#RETURN': 81 return result 82 raise convert_to_error(kind, result) 83 84def convert_to_error(kind, result): 85 if kind == '#ERROR': 86 return result 87 elif kind == '#TRACEBACK': 88 assert type(result) is str 89 return RemoteError(result) 90 elif kind == '#UNSERIALIZABLE': 91 assert type(result) is str 92 return RemoteError('Unserializable message: %s\n' % result) 93 else: 94 return ValueError('Unrecognized message type') 95 96class RemoteError(Exception): 97 def __str__(self): 98 return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75) 99 100# 101# Functions for finding the method names of an object 102# 103 104def all_methods(obj): 105 ''' 106 Return a list of names of methods of `obj` 107 ''' 108 temp = [] 109 for name in dir(obj): 110 func = getattr(obj, name) 111 if callable(func): 112 temp.append(name) 113 return temp 114 115def public_methods(obj): 116 ''' 117 Return a list of names of methods of `obj` which do not start with '_' 118 ''' 119 return [name for name in all_methods(obj) if name[0] != '_'] 120 121# 122# Server which is run in a process controlled by a manager 123# 124 125class Server(object): 126 ''' 127 Server class which runs in a process controlled by a manager object 128 ''' 129 public = ['shutdown', 'create', 'accept_connection', 'get_methods', 130 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref'] 131 132 def __init__(self, registry, address, authkey, serializer): 133 assert isinstance(authkey, bytes) 134 self.registry = registry 135 self.authkey = process.AuthenticationString(authkey) 136 Listener, Client = listener_client[serializer] 137 138 # do authentication later 139 self.listener = Listener(address=address, backlog=16) 140 self.address = self.listener.address 141 142 self.id_to_obj = {'0': (None, ())} 143 self.id_to_refcount = {} 144 self.id_to_local_proxy_obj = {} 145 self.mutex = threading.Lock() 146 147 def serve_forever(self): 148 ''' 149 Run the server forever 150 ''' 151 self.stop_event = threading.Event() 152 process.current_process()._manager_server = self 153 try: 154 accepter = threading.Thread(target=self.accepter) 155 accepter.daemon = True 156 accepter.start() 157 try: 158 while not self.stop_event.is_set(): 159 self.stop_event.wait(1) 160 except (KeyboardInterrupt, SystemExit): 161 pass 162 finally: 163 if sys.stdout != sys.__stdout__: 164 util.debug('resetting stdout, stderr') 165 sys.stdout = sys.__stdout__ 166 sys.stderr = sys.__stderr__ 167 sys.exit(0) 168 169 def accepter(self): 170 while True: 171 try: 172 c = self.listener.accept() 173 except OSError: 174 continue 175 t = threading.Thread(target=self.handle_request, args=(c,)) 176 t.daemon = True 177 t.start() 178 179 def handle_request(self, c): 180 ''' 181 Handle a new connection 182 ''' 183 funcname = result = request = None 184 try: 185 connection.deliver_challenge(c, self.authkey) 186 connection.answer_challenge(c, self.authkey) 187 request = c.recv() 188 ignore, funcname, args, kwds = request 189 assert funcname in self.public, '%r unrecognized' % funcname 190 func = getattr(self, funcname) 191 except Exception: 192 msg = ('#TRACEBACK', format_exc()) 193 else: 194 try: 195 result = func(c, *args, **kwds) 196 except Exception: 197 msg = ('#TRACEBACK', format_exc()) 198 else: 199 msg = ('#RETURN', result) 200 try: 201 c.send(msg) 202 except Exception as e: 203 try: 204 c.send(('#TRACEBACK', format_exc())) 205 except Exception: 206 pass 207 util.info('Failure to send message: %r', msg) 208 util.info(' ... request was %r', request) 209 util.info(' ... exception was %r', e) 210 211 c.close() 212 213 def serve_client(self, conn): 214 ''' 215 Handle requests from the proxies in a particular process/thread 216 ''' 217 util.debug('starting server thread to service %r', 218 threading.current_thread().name) 219 220 recv = conn.recv 221 send = conn.send 222 id_to_obj = self.id_to_obj 223 224 while not self.stop_event.is_set(): 225 226 try: 227 methodname = obj = None 228 request = recv() 229 ident, methodname, args, kwds = request 230 try: 231 obj, exposed, gettypeid = id_to_obj[ident] 232 except KeyError as ke: 233 try: 234 obj, exposed, gettypeid = \ 235 self.id_to_local_proxy_obj[ident] 236 except KeyError as second_ke: 237 raise ke 238 239 if methodname not in exposed: 240 raise AttributeError( 241 'method %r of %r object is not in exposed=%r' % 242 (methodname, type(obj), exposed) 243 ) 244 245 function = getattr(obj, methodname) 246 247 try: 248 res = function(*args, **kwds) 249 except Exception as e: 250 msg = ('#ERROR', e) 251 else: 252 typeid = gettypeid and gettypeid.get(methodname, None) 253 if typeid: 254 rident, rexposed = self.create(conn, typeid, res) 255 token = Token(typeid, self.address, rident) 256 msg = ('#PROXY', (rexposed, token)) 257 else: 258 msg = ('#RETURN', res) 259 260 except AttributeError: 261 if methodname is None: 262 msg = ('#TRACEBACK', format_exc()) 263 else: 264 try: 265 fallback_func = self.fallback_mapping[methodname] 266 result = fallback_func( 267 self, conn, ident, obj, *args, **kwds 268 ) 269 msg = ('#RETURN', result) 270 except Exception: 271 msg = ('#TRACEBACK', format_exc()) 272 273 except EOFError: 274 util.debug('got EOF -- exiting thread serving %r', 275 threading.current_thread().name) 276 sys.exit(0) 277 278 except Exception: 279 msg = ('#TRACEBACK', format_exc()) 280 281 try: 282 try: 283 send(msg) 284 except Exception as e: 285 send(('#UNSERIALIZABLE', format_exc())) 286 except Exception as e: 287 util.info('exception in thread serving %r', 288 threading.current_thread().name) 289 util.info(' ... message was %r', msg) 290 util.info(' ... exception was %r', e) 291 conn.close() 292 sys.exit(1) 293 294 def fallback_getvalue(self, conn, ident, obj): 295 return obj 296 297 def fallback_str(self, conn, ident, obj): 298 return str(obj) 299 300 def fallback_repr(self, conn, ident, obj): 301 return repr(obj) 302 303 fallback_mapping = { 304 '__str__':fallback_str, 305 '__repr__':fallback_repr, 306 '#GETVALUE':fallback_getvalue 307 } 308 309 def dummy(self, c): 310 pass 311 312 def debug_info(self, c): 313 ''' 314 Return some info --- useful to spot problems with refcounting 315 ''' 316 with self.mutex: 317 result = [] 318 keys = list(self.id_to_refcount.keys()) 319 keys.sort() 320 for ident in keys: 321 if ident != '0': 322 result.append(' %s: refcount=%s\n %s' % 323 (ident, self.id_to_refcount[ident], 324 str(self.id_to_obj[ident][0])[:75])) 325 return '\n'.join(result) 326 327 def number_of_objects(self, c): 328 ''' 329 Number of shared objects 330 ''' 331 # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0' 332 return len(self.id_to_refcount) 333 334 def shutdown(self, c): 335 ''' 336 Shutdown this process 337 ''' 338 try: 339 util.debug('manager received shutdown message') 340 c.send(('#RETURN', None)) 341 except: 342 import traceback 343 traceback.print_exc() 344 finally: 345 self.stop_event.set() 346 347 def create(self, c, typeid, *args, **kwds): 348 ''' 349 Create a new shared object and return its id 350 ''' 351 with self.mutex: 352 callable, exposed, method_to_typeid, proxytype = \ 353 self.registry[typeid] 354 355 if callable is None: 356 assert len(args) == 1 and not kwds 357 obj = args[0] 358 else: 359 obj = callable(*args, **kwds) 360 361 if exposed is None: 362 exposed = public_methods(obj) 363 if method_to_typeid is not None: 364 assert type(method_to_typeid) is dict 365 exposed = list(exposed) + list(method_to_typeid) 366 367 ident = '%x' % id(obj) # convert to string because xmlrpclib 368 # only has 32 bit signed integers 369 util.debug('%r callable returned object with id %r', typeid, ident) 370 371 self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid) 372 if ident not in self.id_to_refcount: 373 self.id_to_refcount[ident] = 0 374 375 self.incref(c, ident) 376 return ident, tuple(exposed) 377 378 def get_methods(self, c, token): 379 ''' 380 Return the methods of the shared object indicated by token 381 ''' 382 return tuple(self.id_to_obj[token.id][1]) 383 384 def accept_connection(self, c, name): 385 ''' 386 Spawn a new thread to serve this connection 387 ''' 388 threading.current_thread().name = name 389 c.send(('#RETURN', None)) 390 self.serve_client(c) 391 392 def incref(self, c, ident): 393 with self.mutex: 394 try: 395 self.id_to_refcount[ident] += 1 396 except KeyError as ke: 397 # If no external references exist but an internal (to the 398 # manager) still does and a new external reference is created 399 # from it, restore the manager's tracking of it from the 400 # previously stashed internal ref. 401 if ident in self.id_to_local_proxy_obj: 402 self.id_to_refcount[ident] = 1 403 self.id_to_obj[ident] = \ 404 self.id_to_local_proxy_obj[ident] 405 obj, exposed, gettypeid = self.id_to_obj[ident] 406 util.debug('Server re-enabled tracking & INCREF %r', ident) 407 else: 408 raise ke 409 410 def decref(self, c, ident): 411 if ident not in self.id_to_refcount and \ 412 ident in self.id_to_local_proxy_obj: 413 util.debug('Server DECREF skipping %r', ident) 414 return 415 416 with self.mutex: 417 assert self.id_to_refcount[ident] >= 1 418 self.id_to_refcount[ident] -= 1 419 if self.id_to_refcount[ident] == 0: 420 del self.id_to_refcount[ident] 421 422 if ident not in self.id_to_refcount: 423 # Two-step process in case the object turns out to contain other 424 # proxy objects (e.g. a managed list of managed lists). 425 # Otherwise, deleting self.id_to_obj[ident] would trigger the 426 # deleting of the stored value (another managed object) which would 427 # in turn attempt to acquire the mutex that is already held here. 428 self.id_to_obj[ident] = (None, (), None) # thread-safe 429 util.debug('disposing of obj with id %r', ident) 430 with self.mutex: 431 del self.id_to_obj[ident] 432 433 434# 435# Class to represent state of a manager 436# 437 438class State(object): 439 __slots__ = ['value'] 440 INITIAL = 0 441 STARTED = 1 442 SHUTDOWN = 2 443 444# 445# Mapping from serializer name to Listener and Client types 446# 447 448listener_client = { 449 'pickle' : (connection.Listener, connection.Client), 450 'xmlrpclib' : (connection.XmlListener, connection.XmlClient) 451 } 452 453# 454# Definition of BaseManager 455# 456 457class BaseManager(object): 458 ''' 459 Base class for managers 460 ''' 461 _registry = {} 462 _Server = Server 463 464 def __init__(self, address=None, authkey=None, serializer='pickle', 465 ctx=None): 466 if authkey is None: 467 authkey = process.current_process().authkey 468 self._address = address # XXX not final address if eg ('', 0) 469 self._authkey = process.AuthenticationString(authkey) 470 self._state = State() 471 self._state.value = State.INITIAL 472 self._serializer = serializer 473 self._Listener, self._Client = listener_client[serializer] 474 self._ctx = ctx or get_context() 475 476 def get_server(self): 477 ''' 478 Return server object with serve_forever() method and address attribute 479 ''' 480 assert self._state.value == State.INITIAL 481 return Server(self._registry, self._address, 482 self._authkey, self._serializer) 483 484 def connect(self): 485 ''' 486 Connect manager object to the server process 487 ''' 488 Listener, Client = listener_client[self._serializer] 489 conn = Client(self._address, authkey=self._authkey) 490 dispatch(conn, None, 'dummy') 491 self._state.value = State.STARTED 492 493 def start(self, initializer=None, initargs=()): 494 ''' 495 Spawn a server process for this manager object 496 ''' 497 assert self._state.value == State.INITIAL 498 499 if initializer is not None and not callable(initializer): 500 raise TypeError('initializer must be a callable') 501 502 # pipe over which we will retrieve address of server 503 reader, writer = connection.Pipe(duplex=False) 504 505 # spawn process which runs a server 506 self._process = self._ctx.Process( 507 target=type(self)._run_server, 508 args=(self._registry, self._address, self._authkey, 509 self._serializer, writer, initializer, initargs), 510 ) 511 ident = ':'.join(str(i) for i in self._process._identity) 512 self._process.name = type(self).__name__ + '-' + ident 513 self._process.start() 514 515 # get address of server 516 writer.close() 517 self._address = reader.recv() 518 reader.close() 519 520 # register a finalizer 521 self._state.value = State.STARTED 522 self.shutdown = util.Finalize( 523 self, type(self)._finalize_manager, 524 args=(self._process, self._address, self._authkey, 525 self._state, self._Client), 526 exitpriority=0 527 ) 528 529 @classmethod 530 def _run_server(cls, registry, address, authkey, serializer, writer, 531 initializer=None, initargs=()): 532 ''' 533 Create a server, report its address and run it 534 ''' 535 if initializer is not None: 536 initializer(*initargs) 537 538 # create server 539 server = cls._Server(registry, address, authkey, serializer) 540 541 # inform parent process of the server's address 542 writer.send(server.address) 543 writer.close() 544 545 # run the manager 546 util.info('manager serving at %r', server.address) 547 server.serve_forever() 548 549 def _create(self, typeid, *args, **kwds): 550 ''' 551 Create a new shared object; return the token and exposed tuple 552 ''' 553 assert self._state.value == State.STARTED, 'server not yet started' 554 conn = self._Client(self._address, authkey=self._authkey) 555 try: 556 id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds) 557 finally: 558 conn.close() 559 return Token(typeid, self._address, id), exposed 560 561 def join(self, timeout=None): 562 ''' 563 Join the manager process (if it has been spawned) 564 ''' 565 if self._process is not None: 566 self._process.join(timeout) 567 if not self._process.is_alive(): 568 self._process = None 569 570 def _debug_info(self): 571 ''' 572 Return some info about the servers shared objects and connections 573 ''' 574 conn = self._Client(self._address, authkey=self._authkey) 575 try: 576 return dispatch(conn, None, 'debug_info') 577 finally: 578 conn.close() 579 580 def _number_of_objects(self): 581 ''' 582 Return the number of shared objects 583 ''' 584 conn = self._Client(self._address, authkey=self._authkey) 585 try: 586 return dispatch(conn, None, 'number_of_objects') 587 finally: 588 conn.close() 589 590 def __enter__(self): 591 if self._state.value == State.INITIAL: 592 self.start() 593 assert self._state.value == State.STARTED 594 return self 595 596 def __exit__(self, exc_type, exc_val, exc_tb): 597 self.shutdown() 598 599 @staticmethod 600 def _finalize_manager(process, address, authkey, state, _Client): 601 ''' 602 Shutdown the manager process; will be registered as a finalizer 603 ''' 604 if process.is_alive(): 605 util.info('sending shutdown message to manager') 606 try: 607 conn = _Client(address, authkey=authkey) 608 try: 609 dispatch(conn, None, 'shutdown') 610 finally: 611 conn.close() 612 except Exception: 613 pass 614 615 process.join(timeout=1.0) 616 if process.is_alive(): 617 util.info('manager still alive') 618 if hasattr(process, 'terminate'): 619 util.info('trying to `terminate()` manager process') 620 process.terminate() 621 process.join(timeout=0.1) 622 if process.is_alive(): 623 util.info('manager still alive after terminate') 624 625 state.value = State.SHUTDOWN 626 try: 627 del BaseProxy._address_to_local[address] 628 except KeyError: 629 pass 630 631 address = property(lambda self: self._address) 632 633 @classmethod 634 def register(cls, typeid, callable=None, proxytype=None, exposed=None, 635 method_to_typeid=None, create_method=True): 636 ''' 637 Register a typeid with the manager type 638 ''' 639 if '_registry' not in cls.__dict__: 640 cls._registry = cls._registry.copy() 641 642 if proxytype is None: 643 proxytype = AutoProxy 644 645 exposed = exposed or getattr(proxytype, '_exposed_', None) 646 647 method_to_typeid = method_to_typeid or \ 648 getattr(proxytype, '_method_to_typeid_', None) 649 650 if method_to_typeid: 651 for key, value in list(method_to_typeid.items()): 652 assert type(key) is str, '%r is not a string' % key 653 assert type(value) is str, '%r is not a string' % value 654 655 cls._registry[typeid] = ( 656 callable, exposed, method_to_typeid, proxytype 657 ) 658 659 if create_method: 660 def temp(self, *args, **kwds): 661 util.debug('requesting creation of a shared %r object', typeid) 662 token, exp = self._create(typeid, *args, **kwds) 663 proxy = proxytype( 664 token, self._serializer, manager=self, 665 authkey=self._authkey, exposed=exp 666 ) 667 conn = self._Client(token.address, authkey=self._authkey) 668 dispatch(conn, None, 'decref', (token.id,)) 669 return proxy 670 temp.__name__ = typeid 671 setattr(cls, typeid, temp) 672 673# 674# Subclass of set which get cleared after a fork 675# 676 677class ProcessLocalSet(set): 678 def __init__(self): 679 util.register_after_fork(self, lambda obj: obj.clear()) 680 def __reduce__(self): 681 return type(self), () 682 683# 684# Definition of BaseProxy 685# 686 687class BaseProxy(object): 688 ''' 689 A base for proxies of shared objects 690 ''' 691 _address_to_local = {} 692 _mutex = util.ForkAwareThreadLock() 693 694 def __init__(self, token, serializer, manager=None, 695 authkey=None, exposed=None, incref=True, manager_owned=False): 696 with BaseProxy._mutex: 697 tls_idset = BaseProxy._address_to_local.get(token.address, None) 698 if tls_idset is None: 699 tls_idset = util.ForkAwareLocal(), ProcessLocalSet() 700 BaseProxy._address_to_local[token.address] = tls_idset 701 702 # self._tls is used to record the connection used by this 703 # thread to communicate with the manager at token.address 704 self._tls = tls_idset[0] 705 706 # self._idset is used to record the identities of all shared 707 # objects for which the current process owns references and 708 # which are in the manager at token.address 709 self._idset = tls_idset[1] 710 711 self._token = token 712 self._id = self._token.id 713 self._manager = manager 714 self._serializer = serializer 715 self._Client = listener_client[serializer][1] 716 717 # Should be set to True only when a proxy object is being created 718 # on the manager server; primary use case: nested proxy objects. 719 # RebuildProxy detects when a proxy is being created on the manager 720 # and sets this value appropriately. 721 self._owned_by_manager = manager_owned 722 723 if authkey is not None: 724 self._authkey = process.AuthenticationString(authkey) 725 elif self._manager is not None: 726 self._authkey = self._manager._authkey 727 else: 728 self._authkey = process.current_process().authkey 729 730 if incref: 731 self._incref() 732 733 util.register_after_fork(self, BaseProxy._after_fork) 734 735 def _connect(self): 736 util.debug('making connection to manager') 737 name = process.current_process().name 738 if threading.current_thread().name != 'MainThread': 739 name += '|' + threading.current_thread().name 740 conn = self._Client(self._token.address, authkey=self._authkey) 741 dispatch(conn, None, 'accept_connection', (name,)) 742 self._tls.connection = conn 743 744 def _callmethod(self, methodname, args=(), kwds={}): 745 ''' 746 Try to call a method of the referrent and return a copy of the result 747 ''' 748 try: 749 conn = self._tls.connection 750 except AttributeError: 751 util.debug('thread %r does not own a connection', 752 threading.current_thread().name) 753 self._connect() 754 conn = self._tls.connection 755 756 conn.send((self._id, methodname, args, kwds)) 757 kind, result = conn.recv() 758 759 if kind == '#RETURN': 760 return result 761 elif kind == '#PROXY': 762 exposed, token = result 763 proxytype = self._manager._registry[token.typeid][-1] 764 token.address = self._token.address 765 proxy = proxytype( 766 token, self._serializer, manager=self._manager, 767 authkey=self._authkey, exposed=exposed 768 ) 769 conn = self._Client(token.address, authkey=self._authkey) 770 dispatch(conn, None, 'decref', (token.id,)) 771 return proxy 772 raise convert_to_error(kind, result) 773 774 def _getvalue(self): 775 ''' 776 Get a copy of the value of the referent 777 ''' 778 return self._callmethod('#GETVALUE') 779 780 def _incref(self): 781 if self._owned_by_manager: 782 util.debug('owned_by_manager skipped INCREF of %r', self._token.id) 783 return 784 785 conn = self._Client(self._token.address, authkey=self._authkey) 786 dispatch(conn, None, 'incref', (self._id,)) 787 util.debug('INCREF %r', self._token.id) 788 789 self._idset.add(self._id) 790 791 state = self._manager and self._manager._state 792 793 self._close = util.Finalize( 794 self, BaseProxy._decref, 795 args=(self._token, self._authkey, state, 796 self._tls, self._idset, self._Client), 797 exitpriority=10 798 ) 799 800 @staticmethod 801 def _decref(token, authkey, state, tls, idset, _Client): 802 idset.discard(token.id) 803 804 # check whether manager is still alive 805 if state is None or state.value == State.STARTED: 806 # tell manager this process no longer cares about referent 807 try: 808 util.debug('DECREF %r', token.id) 809 conn = _Client(token.address, authkey=authkey) 810 dispatch(conn, None, 'decref', (token.id,)) 811 except Exception as e: 812 util.debug('... decref failed %s', e) 813 814 else: 815 util.debug('DECREF %r -- manager already shutdown', token.id) 816 817 # check whether we can close this thread's connection because 818 # the process owns no more references to objects for this manager 819 if not idset and hasattr(tls, 'connection'): 820 util.debug('thread %r has no more proxies so closing conn', 821 threading.current_thread().name) 822 tls.connection.close() 823 del tls.connection 824 825 def _after_fork(self): 826 self._manager = None 827 try: 828 self._incref() 829 except Exception as e: 830 # the proxy may just be for a manager which has shutdown 831 util.info('incref failed: %s' % e) 832 833 def __reduce__(self): 834 kwds = {} 835 if get_spawning_popen() is not None: 836 kwds['authkey'] = self._authkey 837 838 if getattr(self, '_isauto', False): 839 kwds['exposed'] = self._exposed_ 840 return (RebuildProxy, 841 (AutoProxy, self._token, self._serializer, kwds)) 842 else: 843 return (RebuildProxy, 844 (type(self), self._token, self._serializer, kwds)) 845 846 def __deepcopy__(self, memo): 847 return self._getvalue() 848 849 def __repr__(self): 850 return '<%s object, typeid %r at %#x>' % \ 851 (type(self).__name__, self._token.typeid, id(self)) 852 853 def __str__(self): 854 ''' 855 Return representation of the referent (or a fall-back if that fails) 856 ''' 857 try: 858 return self._callmethod('__repr__') 859 except Exception: 860 return repr(self)[:-1] + "; '__str__()' failed>" 861 862# 863# Function used for unpickling 864# 865 866def RebuildProxy(func, token, serializer, kwds): 867 ''' 868 Function used for unpickling proxy objects. 869 ''' 870 server = getattr(process.current_process(), '_manager_server', None) 871 if server and server.address == token.address: 872 util.debug('Rebuild a proxy owned by manager, token=%r', token) 873 kwds['manager_owned'] = True 874 if token.id not in server.id_to_local_proxy_obj: 875 server.id_to_local_proxy_obj[token.id] = \ 876 server.id_to_obj[token.id] 877 incref = ( 878 kwds.pop('incref', True) and 879 not getattr(process.current_process(), '_inheriting', False) 880 ) 881 return func(token, serializer, incref=incref, **kwds) 882 883# 884# Functions to create proxies and proxy types 885# 886 887def MakeProxyType(name, exposed, _cache={}): 888 ''' 889 Return a proxy type whose methods are given by `exposed` 890 ''' 891 exposed = tuple(exposed) 892 try: 893 return _cache[(name, exposed)] 894 except KeyError: 895 pass 896 897 dic = {} 898 899 for meth in exposed: 900 exec('''def %s(self, *args, **kwds): 901 return self._callmethod(%r, args, kwds)''' % (meth, meth), dic) 902 903 ProxyType = type(name, (BaseProxy,), dic) 904 ProxyType._exposed_ = exposed 905 _cache[(name, exposed)] = ProxyType 906 return ProxyType 907 908 909def AutoProxy(token, serializer, manager=None, authkey=None, 910 exposed=None, incref=True): 911 ''' 912 Return an auto-proxy for `token` 913 ''' 914 _Client = listener_client[serializer][1] 915 916 if exposed is None: 917 conn = _Client(token.address, authkey=authkey) 918 try: 919 exposed = dispatch(conn, None, 'get_methods', (token,)) 920 finally: 921 conn.close() 922 923 if authkey is None and manager is not None: 924 authkey = manager._authkey 925 if authkey is None: 926 authkey = process.current_process().authkey 927 928 ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed) 929 proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, 930 incref=incref) 931 proxy._isauto = True 932 return proxy 933 934# 935# Types/callables which we will register with SyncManager 936# 937 938class Namespace(object): 939 def __init__(self, **kwds): 940 self.__dict__.update(kwds) 941 def __repr__(self): 942 items = list(self.__dict__.items()) 943 temp = [] 944 for name, value in items: 945 if not name.startswith('_'): 946 temp.append('%s=%r' % (name, value)) 947 temp.sort() 948 return '%s(%s)' % (self.__class__.__name__, ', '.join(temp)) 949 950class Value(object): 951 def __init__(self, typecode, value, lock=True): 952 self._typecode = typecode 953 self._value = value 954 def get(self): 955 return self._value 956 def set(self, value): 957 self._value = value 958 def __repr__(self): 959 return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value) 960 value = property(get, set) 961 962def Array(typecode, sequence, lock=True): 963 return array.array(typecode, sequence) 964 965# 966# Proxy types used by SyncManager 967# 968 969class IteratorProxy(BaseProxy): 970 _exposed_ = ('__next__', 'send', 'throw', 'close') 971 def __iter__(self): 972 return self 973 def __next__(self, *args): 974 return self._callmethod('__next__', args) 975 def send(self, *args): 976 return self._callmethod('send', args) 977 def throw(self, *args): 978 return self._callmethod('throw', args) 979 def close(self, *args): 980 return self._callmethod('close', args) 981 982 983class AcquirerProxy(BaseProxy): 984 _exposed_ = ('acquire', 'release') 985 def acquire(self, blocking=True, timeout=None): 986 args = (blocking,) if timeout is None else (blocking, timeout) 987 return self._callmethod('acquire', args) 988 def release(self): 989 return self._callmethod('release') 990 def __enter__(self): 991 return self._callmethod('acquire') 992 def __exit__(self, exc_type, exc_val, exc_tb): 993 return self._callmethod('release') 994 995 996class ConditionProxy(AcquirerProxy): 997 _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all') 998 def wait(self, timeout=None): 999 return self._callmethod('wait', (timeout,)) 1000 def notify(self): 1001 return self._callmethod('notify') 1002 def notify_all(self): 1003 return self._callmethod('notify_all') 1004 def wait_for(self, predicate, timeout=None): 1005 result = predicate() 1006 if result: 1007 return result 1008 if timeout is not None: 1009 endtime = _time() + timeout 1010 else: 1011 endtime = None 1012 waittime = None 1013 while not result: 1014 if endtime is not None: 1015 waittime = endtime - _time() 1016 if waittime <= 0: 1017 break 1018 self.wait(waittime) 1019 result = predicate() 1020 return result 1021 1022 1023class EventProxy(BaseProxy): 1024 _exposed_ = ('is_set', 'set', 'clear', 'wait') 1025 def is_set(self): 1026 return self._callmethod('is_set') 1027 def set(self): 1028 return self._callmethod('set') 1029 def clear(self): 1030 return self._callmethod('clear') 1031 def wait(self, timeout=None): 1032 return self._callmethod('wait', (timeout,)) 1033 1034 1035class BarrierProxy(BaseProxy): 1036 _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset') 1037 def wait(self, timeout=None): 1038 return self._callmethod('wait', (timeout,)) 1039 def abort(self): 1040 return self._callmethod('abort') 1041 def reset(self): 1042 return self._callmethod('reset') 1043 @property 1044 def parties(self): 1045 return self._callmethod('__getattribute__', ('parties',)) 1046 @property 1047 def n_waiting(self): 1048 return self._callmethod('__getattribute__', ('n_waiting',)) 1049 @property 1050 def broken(self): 1051 return self._callmethod('__getattribute__', ('broken',)) 1052 1053 1054class NamespaceProxy(BaseProxy): 1055 _exposed_ = ('__getattribute__', '__setattr__', '__delattr__') 1056 def __getattr__(self, key): 1057 if key[0] == '_': 1058 return object.__getattribute__(self, key) 1059 callmethod = object.__getattribute__(self, '_callmethod') 1060 return callmethod('__getattribute__', (key,)) 1061 def __setattr__(self, key, value): 1062 if key[0] == '_': 1063 return object.__setattr__(self, key, value) 1064 callmethod = object.__getattribute__(self, '_callmethod') 1065 return callmethod('__setattr__', (key, value)) 1066 def __delattr__(self, key): 1067 if key[0] == '_': 1068 return object.__delattr__(self, key) 1069 callmethod = object.__getattribute__(self, '_callmethod') 1070 return callmethod('__delattr__', (key,)) 1071 1072 1073class ValueProxy(BaseProxy): 1074 _exposed_ = ('get', 'set') 1075 def get(self): 1076 return self._callmethod('get') 1077 def set(self, value): 1078 return self._callmethod('set', (value,)) 1079 value = property(get, set) 1080 1081 1082BaseListProxy = MakeProxyType('BaseListProxy', ( 1083 '__add__', '__contains__', '__delitem__', '__getitem__', '__len__', 1084 '__mul__', '__reversed__', '__rmul__', '__setitem__', 1085 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove', 1086 'reverse', 'sort', '__imul__' 1087 )) 1088class ListProxy(BaseListProxy): 1089 def __iadd__(self, value): 1090 self._callmethod('extend', (value,)) 1091 return self 1092 def __imul__(self, value): 1093 self._callmethod('__imul__', (value,)) 1094 return self 1095 1096 1097DictProxy = MakeProxyType('DictProxy', ( 1098 '__contains__', '__delitem__', '__getitem__', '__len__', 1099 '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items', 1100 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values' 1101 )) 1102 1103 1104ArrayProxy = MakeProxyType('ArrayProxy', ( 1105 '__len__', '__getitem__', '__setitem__' 1106 )) 1107 1108 1109BasePoolProxy = MakeProxyType('PoolProxy', ( 1110 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 1111 'map', 'map_async', 'starmap', 'starmap_async', 'terminate', 1112 )) 1113BasePoolProxy._method_to_typeid_ = { 1114 'apply_async': 'AsyncResult', 1115 'map_async': 'AsyncResult', 1116 'starmap_async': 'AsyncResult', 1117 'imap': 'Iterator', 1118 'imap_unordered': 'Iterator' 1119 } 1120class PoolProxy(BasePoolProxy): 1121 def __enter__(self): 1122 return self 1123 def __exit__(self, exc_type, exc_val, exc_tb): 1124 self.terminate() 1125 1126# 1127# Definition of SyncManager 1128# 1129 1130class SyncManager(BaseManager): 1131 ''' 1132 Subclass of `BaseManager` which supports a number of shared object types. 1133 1134 The types registered are those intended for the synchronization 1135 of threads, plus `dict`, `list` and `Namespace`. 1136 1137 The `multiprocessing.Manager()` function creates started instances of 1138 this class. 1139 ''' 1140 1141SyncManager.register('Queue', queue.Queue) 1142SyncManager.register('JoinableQueue', queue.Queue) 1143SyncManager.register('Event', threading.Event, EventProxy) 1144SyncManager.register('Lock', threading.Lock, AcquirerProxy) 1145SyncManager.register('RLock', threading.RLock, AcquirerProxy) 1146SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy) 1147SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore, 1148 AcquirerProxy) 1149SyncManager.register('Condition', threading.Condition, ConditionProxy) 1150SyncManager.register('Barrier', threading.Barrier, BarrierProxy) 1151SyncManager.register('Pool', pool.Pool, PoolProxy) 1152SyncManager.register('list', list, ListProxy) 1153SyncManager.register('dict', dict, DictProxy) 1154SyncManager.register('Value', Value, ValueProxy) 1155SyncManager.register('Array', Array, ArrayProxy) 1156SyncManager.register('Namespace', Namespace, NamespaceProxy) 1157 1158# types returned by methods of PoolProxy 1159SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False) 1160SyncManager.register('AsyncResult', create_method=False) 1161