1# Copyright 2009 Brian Quinlan. All Rights Reserved. 2# Licensed to PSF under a Contributor Agreement. 3 4"""Implements ProcessPoolExecutor. 5 6The follow diagram and text describe the data-flow through the system: 7 8|======================= In-process =====================|== Out-of-process ==| 9 10+----------+ +----------+ +--------+ +-----------+ +---------+ 11| | => | Work Ids | => | | => | Call Q | => | | 12| | +----------+ | | +-----------+ | | 13| | | ... | | | | ... | | | 14| | | 6 | | | | 5, call() | | | 15| | | 7 | | | | ... | | | 16| Process | | ... | | Local | +-----------+ | Process | 17| Pool | +----------+ | Worker | | #1..n | 18| Executor | | Thread | | | 19| | +----------- + | | +-----------+ | | 20| | <=> | Work Items | <=> | | <= | Result Q | <= | | 21| | +------------+ | | +-----------+ | | 22| | | 6: call() | | | | ... | | | 23| | | future | | | | 4, result | | | 24| | | ... | | | | 3, except | | | 25+----------+ +------------+ +--------+ +-----------+ +---------+ 26 27Executor.submit() called: 28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict 29- adds the id of the _WorkItem to the "Work Ids" queue 30 31Local worker thread: 32- reads work ids from the "Work Ids" queue and looks up the corresponding 33 WorkItem from the "Work Items" dict: if the work item has been cancelled then 34 it is simply removed from the dict, otherwise it is repackaged as a 35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" 36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because 37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). 38- reads _ResultItems from "Result Q", updates the future stored in the 39 "Work Items" dict and deletes the dict entry 40 41Process #1..n: 42- reads _CallItems from "Call Q", executes the calls, and puts the resulting 43 _ResultItems in "Result Q" 44""" 45 46__author__ = 'Brian Quinlan (brian@sweetapp.com)' 47 48import atexit 49import os 50from concurrent.futures import _base 51import queue 52from queue import Full 53import multiprocessing 54from multiprocessing import SimpleQueue 55from multiprocessing.connection import wait 56import threading 57import weakref 58from functools import partial 59import itertools 60import traceback 61 62# Workers are created as daemon threads and processes. This is done to allow the 63# interpreter to exit when there are still idle processes in a 64# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However, 65# allowing workers to die with the interpreter has two undesirable properties: 66# - The workers would still be running during interpreter shutdown, 67# meaning that they would fail in unpredictable ways. 68# - The workers could be killed while evaluating a work item, which could 69# be bad if the callable being evaluated has external side-effects e.g. 70# writing to a file. 71# 72# To work around this problem, an exit handler is installed which tells the 73# workers to exit when their work queues are empty and then waits until the 74# threads/processes finish. 75 76_threads_queues = weakref.WeakKeyDictionary() 77_shutdown = False 78 79def _python_exit(): 80 global _shutdown 81 _shutdown = True 82 items = list(_threads_queues.items()) 83 for t, q in items: 84 q.put(None) 85 for t, q in items: 86 t.join() 87 88# Controls how many more calls than processes will be queued in the call queue. 89# A smaller number will mean that processes spend more time idle waiting for 90# work while a larger number will make Future.cancel() succeed less frequently 91# (Futures in the call queue cannot be cancelled). 92EXTRA_QUEUED_CALLS = 1 93 94# Hack to embed stringification of remote traceback in local traceback 95 96class _RemoteTraceback(Exception): 97 def __init__(self, tb): 98 self.tb = tb 99 def __str__(self): 100 return self.tb 101 102class _ExceptionWithTraceback: 103 def __init__(self, exc, tb): 104 tb = traceback.format_exception(type(exc), exc, tb) 105 tb = ''.join(tb) 106 self.exc = exc 107 self.tb = '\n"""\n%s"""' % tb 108 def __reduce__(self): 109 return _rebuild_exc, (self.exc, self.tb) 110 111def _rebuild_exc(exc, tb): 112 exc.__cause__ = _RemoteTraceback(tb) 113 return exc 114 115class _WorkItem(object): 116 def __init__(self, future, fn, args, kwargs): 117 self.future = future 118 self.fn = fn 119 self.args = args 120 self.kwargs = kwargs 121 122class _ResultItem(object): 123 def __init__(self, work_id, exception=None, result=None): 124 self.work_id = work_id 125 self.exception = exception 126 self.result = result 127 128class _CallItem(object): 129 def __init__(self, work_id, fn, args, kwargs): 130 self.work_id = work_id 131 self.fn = fn 132 self.args = args 133 self.kwargs = kwargs 134 135def _get_chunks(*iterables, chunksize): 136 """ Iterates over zip()ed iterables in chunks. """ 137 it = zip(*iterables) 138 while True: 139 chunk = tuple(itertools.islice(it, chunksize)) 140 if not chunk: 141 return 142 yield chunk 143 144def _process_chunk(fn, chunk): 145 """ Processes a chunk of an iterable passed to map. 146 147 Runs the function passed to map() on a chunk of the 148 iterable passed to map. 149 150 This function is run in a separate process. 151 152 """ 153 return [fn(*args) for args in chunk] 154 155def _process_worker(call_queue, result_queue): 156 """Evaluates calls from call_queue and places the results in result_queue. 157 158 This worker is run in a separate process. 159 160 Args: 161 call_queue: A multiprocessing.Queue of _CallItems that will be read and 162 evaluated by the worker. 163 result_queue: A multiprocessing.Queue of _ResultItems that will written 164 to by the worker. 165 shutdown: A multiprocessing.Event that will be set as a signal to the 166 worker that it should exit when call_queue is empty. 167 """ 168 while True: 169 call_item = call_queue.get(block=True) 170 if call_item is None: 171 # Wake up queue management thread 172 result_queue.put(os.getpid()) 173 return 174 try: 175 r = call_item.fn(*call_item.args, **call_item.kwargs) 176 except BaseException as e: 177 exc = _ExceptionWithTraceback(e, e.__traceback__) 178 result_queue.put(_ResultItem(call_item.work_id, exception=exc)) 179 else: 180 result_queue.put(_ResultItem(call_item.work_id, 181 result=r)) 182 183def _add_call_item_to_queue(pending_work_items, 184 work_ids, 185 call_queue): 186 """Fills call_queue with _WorkItems from pending_work_items. 187 188 This function never blocks. 189 190 Args: 191 pending_work_items: A dict mapping work ids to _WorkItems e.g. 192 {5: <_WorkItem...>, 6: <_WorkItem...>, ...} 193 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids 194 are consumed and the corresponding _WorkItems from 195 pending_work_items are transformed into _CallItems and put in 196 call_queue. 197 call_queue: A multiprocessing.Queue that will be filled with _CallItems 198 derived from _WorkItems. 199 """ 200 while True: 201 if call_queue.full(): 202 return 203 try: 204 work_id = work_ids.get(block=False) 205 except queue.Empty: 206 return 207 else: 208 work_item = pending_work_items[work_id] 209 210 if work_item.future.set_running_or_notify_cancel(): 211 call_queue.put(_CallItem(work_id, 212 work_item.fn, 213 work_item.args, 214 work_item.kwargs), 215 block=True) 216 else: 217 del pending_work_items[work_id] 218 continue 219 220def _queue_management_worker(executor_reference, 221 processes, 222 pending_work_items, 223 work_ids_queue, 224 call_queue, 225 result_queue): 226 """Manages the communication between this process and the worker processes. 227 228 This function is run in a local thread. 229 230 Args: 231 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns 232 this thread. Used to determine if the ProcessPoolExecutor has been 233 garbage collected and that this function can exit. 234 process: A list of the multiprocessing.Process instances used as 235 workers. 236 pending_work_items: A dict mapping work ids to _WorkItems e.g. 237 {5: <_WorkItem...>, 6: <_WorkItem...>, ...} 238 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). 239 call_queue: A multiprocessing.Queue that will be filled with _CallItems 240 derived from _WorkItems for processing by the process workers. 241 result_queue: A multiprocessing.Queue of _ResultItems generated by the 242 process workers. 243 """ 244 executor = None 245 246 def shutting_down(): 247 return _shutdown or executor is None or executor._shutdown_thread 248 249 def shutdown_worker(): 250 # This is an upper bound 251 nb_children_alive = sum(p.is_alive() for p in processes.values()) 252 for i in range(0, nb_children_alive): 253 call_queue.put_nowait(None) 254 # Release the queue's resources as soon as possible. 255 call_queue.close() 256 # If .join() is not called on the created processes then 257 # some multiprocessing.Queue methods may deadlock on Mac OS X. 258 for p in processes.values(): 259 p.join() 260 261 reader = result_queue._reader 262 263 while True: 264 _add_call_item_to_queue(pending_work_items, 265 work_ids_queue, 266 call_queue) 267 268 sentinels = [p.sentinel for p in processes.values()] 269 assert sentinels 270 ready = wait([reader] + sentinels) 271 if reader in ready: 272 result_item = reader.recv() 273 else: 274 # Mark the process pool broken so that submits fail right now. 275 executor = executor_reference() 276 if executor is not None: 277 executor._broken = True 278 executor._shutdown_thread = True 279 executor = None 280 # All futures in flight must be marked failed 281 for work_id, work_item in pending_work_items.items(): 282 work_item.future.set_exception( 283 BrokenProcessPool( 284 "A process in the process pool was " 285 "terminated abruptly while the future was " 286 "running or pending." 287 )) 288 # Delete references to object. See issue16284 289 del work_item 290 pending_work_items.clear() 291 # Terminate remaining workers forcibly: the queues or their 292 # locks may be in a dirty state and block forever. 293 for p in processes.values(): 294 p.terminate() 295 shutdown_worker() 296 return 297 if isinstance(result_item, int): 298 # Clean shutdown of a worker using its PID 299 # (avoids marking the executor broken) 300 assert shutting_down() 301 p = processes.pop(result_item) 302 p.join() 303 if not processes: 304 shutdown_worker() 305 return 306 elif result_item is not None: 307 work_item = pending_work_items.pop(result_item.work_id, None) 308 # work_item can be None if another process terminated (see above) 309 if work_item is not None: 310 if result_item.exception: 311 work_item.future.set_exception(result_item.exception) 312 else: 313 work_item.future.set_result(result_item.result) 314 # Delete references to object. See issue16284 315 del work_item 316 # Check whether we should start shutting down. 317 executor = executor_reference() 318 # No more work items can be added if: 319 # - The interpreter is shutting down OR 320 # - The executor that owns this worker has been collected OR 321 # - The executor that owns this worker has been shutdown. 322 if shutting_down(): 323 try: 324 # Since no new work items can be added, it is safe to shutdown 325 # this thread if there are no pending work items. 326 if not pending_work_items: 327 shutdown_worker() 328 return 329 except Full: 330 # This is not a problem: we will eventually be woken up (in 331 # result_queue.get()) and be able to send a sentinel again. 332 pass 333 executor = None 334 335_system_limits_checked = False 336_system_limited = None 337def _check_system_limits(): 338 global _system_limits_checked, _system_limited 339 if _system_limits_checked: 340 if _system_limited: 341 raise NotImplementedError(_system_limited) 342 _system_limits_checked = True 343 try: 344 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") 345 except (AttributeError, ValueError): 346 # sysconf not available or setting not available 347 return 348 if nsems_max == -1: 349 # indetermined limit, assume that limit is determined 350 # by available memory only 351 return 352 if nsems_max >= 256: 353 # minimum number of semaphores available 354 # according to POSIX 355 return 356 _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max 357 raise NotImplementedError(_system_limited) 358 359 360class BrokenProcessPool(RuntimeError): 361 """ 362 Raised when a process in a ProcessPoolExecutor terminated abruptly 363 while a future was in the running state. 364 """ 365 366 367class ProcessPoolExecutor(_base.Executor): 368 def __init__(self, max_workers=None): 369 """Initializes a new ProcessPoolExecutor instance. 370 371 Args: 372 max_workers: The maximum number of processes that can be used to 373 execute the given calls. If None or not given then as many 374 worker processes will be created as the machine has processors. 375 """ 376 _check_system_limits() 377 378 if max_workers is None: 379 self._max_workers = os.cpu_count() or 1 380 else: 381 if max_workers <= 0: 382 raise ValueError("max_workers must be greater than 0") 383 384 self._max_workers = max_workers 385 386 # Make the call queue slightly larger than the number of processes to 387 # prevent the worker processes from idling. But don't make it too big 388 # because futures in the call queue cannot be cancelled. 389 self._call_queue = multiprocessing.Queue(self._max_workers + 390 EXTRA_QUEUED_CALLS) 391 # Killed worker processes can produce spurious "broken pipe" 392 # tracebacks in the queue's own worker thread. But we detect killed 393 # processes anyway, so silence the tracebacks. 394 self._call_queue._ignore_epipe = True 395 self._result_queue = SimpleQueue() 396 self._work_ids = queue.Queue() 397 self._queue_management_thread = None 398 # Map of pids to processes 399 self._processes = {} 400 401 # Shutdown is a two-step process. 402 self._shutdown_thread = False 403 self._shutdown_lock = threading.Lock() 404 self._broken = False 405 self._queue_count = 0 406 self._pending_work_items = {} 407 408 def _start_queue_management_thread(self): 409 # When the executor gets lost, the weakref callback will wake up 410 # the queue management thread. 411 def weakref_cb(_, q=self._result_queue): 412 q.put(None) 413 if self._queue_management_thread is None: 414 # Start the processes so that their sentinels are known. 415 self._adjust_process_count() 416 self._queue_management_thread = threading.Thread( 417 target=_queue_management_worker, 418 args=(weakref.ref(self, weakref_cb), 419 self._processes, 420 self._pending_work_items, 421 self._work_ids, 422 self._call_queue, 423 self._result_queue)) 424 self._queue_management_thread.daemon = True 425 self._queue_management_thread.start() 426 _threads_queues[self._queue_management_thread] = self._result_queue 427 428 def _adjust_process_count(self): 429 for _ in range(len(self._processes), self._max_workers): 430 p = multiprocessing.Process( 431 target=_process_worker, 432 args=(self._call_queue, 433 self._result_queue)) 434 p.start() 435 self._processes[p.pid] = p 436 437 def submit(self, fn, *args, **kwargs): 438 with self._shutdown_lock: 439 if self._broken: 440 raise BrokenProcessPool('A child process terminated ' 441 'abruptly, the process pool is not usable anymore') 442 if self._shutdown_thread: 443 raise RuntimeError('cannot schedule new futures after shutdown') 444 445 f = _base.Future() 446 w = _WorkItem(f, fn, args, kwargs) 447 448 self._pending_work_items[self._queue_count] = w 449 self._work_ids.put(self._queue_count) 450 self._queue_count += 1 451 # Wake up queue management thread 452 self._result_queue.put(None) 453 454 self._start_queue_management_thread() 455 return f 456 submit.__doc__ = _base.Executor.submit.__doc__ 457 458 def map(self, fn, *iterables, timeout=None, chunksize=1): 459 """Returns an iterator equivalent to map(fn, iter). 460 461 Args: 462 fn: A callable that will take as many arguments as there are 463 passed iterables. 464 timeout: The maximum number of seconds to wait. If None, then there 465 is no limit on the wait time. 466 chunksize: If greater than one, the iterables will be chopped into 467 chunks of size chunksize and submitted to the process pool. 468 If set to one, the items in the list will be sent one at a time. 469 470 Returns: 471 An iterator equivalent to: map(func, *iterables) but the calls may 472 be evaluated out-of-order. 473 474 Raises: 475 TimeoutError: If the entire result iterator could not be generated 476 before the given timeout. 477 Exception: If fn(*args) raises for any values. 478 """ 479 if chunksize < 1: 480 raise ValueError("chunksize must be >= 1.") 481 482 results = super().map(partial(_process_chunk, fn), 483 _get_chunks(*iterables, chunksize=chunksize), 484 timeout=timeout) 485 return itertools.chain.from_iterable(results) 486 487 def shutdown(self, wait=True): 488 with self._shutdown_lock: 489 self._shutdown_thread = True 490 if self._queue_management_thread: 491 # Wake up queue management thread 492 self._result_queue.put(None) 493 if wait: 494 self._queue_management_thread.join() 495 # To reduce the risk of opening too many files, remove references to 496 # objects that use file descriptors. 497 self._queue_management_thread = None 498 self._call_queue = None 499 self._result_queue = None 500 self._processes = None 501 shutdown.__doc__ = _base.Executor.shutdown.__doc__ 502 503atexit.register(_python_exit) 504