1# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
6The follow diagram and text describe the data-flow through the system:
7
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+     +----------+       +--------+     +-----------+    +---------+
11|          |  => | Work Ids |    => |        |  => | Call Q    | => |         |
12|          |     +----------+       |        |     +-----------+    |         |
13|          |     | ...      |       |        |     | ...       |    |         |
14|          |     | 6        |       |        |     | 5, call() |    |         |
15|          |     | 7        |       |        |     | ...       |    |         |
16| Process  |     | ...      |       | Local  |     +-----------+    | Process |
17|  Pool    |     +----------+       | Worker |                      |  #1..n  |
18| Executor |                        | Thread |                      |         |
19|          |     +----------- +     |        |     +-----------+    |         |
20|          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
21|          |     +------------+     |        |     +-----------+    |         |
22|          |     | 6: call()  |     |        |     | ...       |    |         |
23|          |     |    future  |     |        |     | 4, result |    |         |
24|          |     | ...        |     |        |     | 3, except |    |         |
25+----------+     +------------+     +--------+     +-----------+    +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33  WorkItem from the "Work Items" dict: if the work item has been cancelled then
34  it is simply removed from the dict, otherwise it is repackaged as a
35  _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36  until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37  calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39  "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
43  _ResultItems in "Result Q"
44"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48import atexit
49import os
50from concurrent.futures import _base
51import queue
52from queue import Full
53import multiprocessing
54from multiprocessing import SimpleQueue
55from multiprocessing.connection import wait
56import threading
57import weakref
58from functools import partial
59import itertools
60import traceback
61
62# Workers are created as daemon threads and processes. This is done to allow the
63# interpreter to exit when there are still idle processes in a
64# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
65# allowing workers to die with the interpreter has two undesirable properties:
66#   - The workers would still be running during interpreter shutdown,
67#     meaning that they would fail in unpredictable ways.
68#   - The workers could be killed while evaluating a work item, which could
69#     be bad if the callable being evaluated has external side-effects e.g.
70#     writing to a file.
71#
72# To work around this problem, an exit handler is installed which tells the
73# workers to exit when their work queues are empty and then waits until the
74# threads/processes finish.
75
76_threads_queues = weakref.WeakKeyDictionary()
77_shutdown = False
78
79def _python_exit():
80    global _shutdown
81    _shutdown = True
82    items = list(_threads_queues.items())
83    for t, q in items:
84        q.put(None)
85    for t, q in items:
86        t.join()
87
88# Controls how many more calls than processes will be queued in the call queue.
89# A smaller number will mean that processes spend more time idle waiting for
90# work while a larger number will make Future.cancel() succeed less frequently
91# (Futures in the call queue cannot be cancelled).
92EXTRA_QUEUED_CALLS = 1
93
94# Hack to embed stringification of remote traceback in local traceback
95
96class _RemoteTraceback(Exception):
97    def __init__(self, tb):
98        self.tb = tb
99    def __str__(self):
100        return self.tb
101
102class _ExceptionWithTraceback:
103    def __init__(self, exc, tb):
104        tb = traceback.format_exception(type(exc), exc, tb)
105        tb = ''.join(tb)
106        self.exc = exc
107        self.tb = '\n"""\n%s"""' % tb
108    def __reduce__(self):
109        return _rebuild_exc, (self.exc, self.tb)
110
111def _rebuild_exc(exc, tb):
112    exc.__cause__ = _RemoteTraceback(tb)
113    return exc
114
115class _WorkItem(object):
116    def __init__(self, future, fn, args, kwargs):
117        self.future = future
118        self.fn = fn
119        self.args = args
120        self.kwargs = kwargs
121
122class _ResultItem(object):
123    def __init__(self, work_id, exception=None, result=None):
124        self.work_id = work_id
125        self.exception = exception
126        self.result = result
127
128class _CallItem(object):
129    def __init__(self, work_id, fn, args, kwargs):
130        self.work_id = work_id
131        self.fn = fn
132        self.args = args
133        self.kwargs = kwargs
134
135def _get_chunks(*iterables, chunksize):
136    """ Iterates over zip()ed iterables in chunks. """
137    it = zip(*iterables)
138    while True:
139        chunk = tuple(itertools.islice(it, chunksize))
140        if not chunk:
141            return
142        yield chunk
143
144def _process_chunk(fn, chunk):
145    """ Processes a chunk of an iterable passed to map.
146
147    Runs the function passed to map() on a chunk of the
148    iterable passed to map.
149
150    This function is run in a separate process.
151
152    """
153    return [fn(*args) for args in chunk]
154
155def _process_worker(call_queue, result_queue):
156    """Evaluates calls from call_queue and places the results in result_queue.
157
158    This worker is run in a separate process.
159
160    Args:
161        call_queue: A multiprocessing.Queue of _CallItems that will be read and
162            evaluated by the worker.
163        result_queue: A multiprocessing.Queue of _ResultItems that will written
164            to by the worker.
165        shutdown: A multiprocessing.Event that will be set as a signal to the
166            worker that it should exit when call_queue is empty.
167    """
168    while True:
169        call_item = call_queue.get(block=True)
170        if call_item is None:
171            # Wake up queue management thread
172            result_queue.put(os.getpid())
173            return
174        try:
175            r = call_item.fn(*call_item.args, **call_item.kwargs)
176        except BaseException as e:
177            exc = _ExceptionWithTraceback(e, e.__traceback__)
178            result_queue.put(_ResultItem(call_item.work_id, exception=exc))
179        else:
180            result_queue.put(_ResultItem(call_item.work_id,
181                                         result=r))
182
183def _add_call_item_to_queue(pending_work_items,
184                            work_ids,
185                            call_queue):
186    """Fills call_queue with _WorkItems from pending_work_items.
187
188    This function never blocks.
189
190    Args:
191        pending_work_items: A dict mapping work ids to _WorkItems e.g.
192            {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
193        work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
194            are consumed and the corresponding _WorkItems from
195            pending_work_items are transformed into _CallItems and put in
196            call_queue.
197        call_queue: A multiprocessing.Queue that will be filled with _CallItems
198            derived from _WorkItems.
199    """
200    while True:
201        if call_queue.full():
202            return
203        try:
204            work_id = work_ids.get(block=False)
205        except queue.Empty:
206            return
207        else:
208            work_item = pending_work_items[work_id]
209
210            if work_item.future.set_running_or_notify_cancel():
211                call_queue.put(_CallItem(work_id,
212                                         work_item.fn,
213                                         work_item.args,
214                                         work_item.kwargs),
215                               block=True)
216            else:
217                del pending_work_items[work_id]
218                continue
219
220def _queue_management_worker(executor_reference,
221                             processes,
222                             pending_work_items,
223                             work_ids_queue,
224                             call_queue,
225                             result_queue):
226    """Manages the communication between this process and the worker processes.
227
228    This function is run in a local thread.
229
230    Args:
231        executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
232            this thread. Used to determine if the ProcessPoolExecutor has been
233            garbage collected and that this function can exit.
234        process: A list of the multiprocessing.Process instances used as
235            workers.
236        pending_work_items: A dict mapping work ids to _WorkItems e.g.
237            {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
238        work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
239        call_queue: A multiprocessing.Queue that will be filled with _CallItems
240            derived from _WorkItems for processing by the process workers.
241        result_queue: A multiprocessing.Queue of _ResultItems generated by the
242            process workers.
243    """
244    executor = None
245
246    def shutting_down():
247        return _shutdown or executor is None or executor._shutdown_thread
248
249    def shutdown_worker():
250        # This is an upper bound
251        nb_children_alive = sum(p.is_alive() for p in processes.values())
252        for i in range(0, nb_children_alive):
253            call_queue.put_nowait(None)
254        # Release the queue's resources as soon as possible.
255        call_queue.close()
256        # If .join() is not called on the created processes then
257        # some multiprocessing.Queue methods may deadlock on Mac OS X.
258        for p in processes.values():
259            p.join()
260
261    reader = result_queue._reader
262
263    while True:
264        _add_call_item_to_queue(pending_work_items,
265                                work_ids_queue,
266                                call_queue)
267
268        sentinels = [p.sentinel for p in processes.values()]
269        assert sentinels
270        ready = wait([reader] + sentinels)
271        if reader in ready:
272            result_item = reader.recv()
273        else:
274            # Mark the process pool broken so that submits fail right now.
275            executor = executor_reference()
276            if executor is not None:
277                executor._broken = True
278                executor._shutdown_thread = True
279                executor = None
280            # All futures in flight must be marked failed
281            for work_id, work_item in pending_work_items.items():
282                work_item.future.set_exception(
283                    BrokenProcessPool(
284                        "A process in the process pool was "
285                        "terminated abruptly while the future was "
286                        "running or pending."
287                    ))
288                # Delete references to object. See issue16284
289                del work_item
290            pending_work_items.clear()
291            # Terminate remaining workers forcibly: the queues or their
292            # locks may be in a dirty state and block forever.
293            for p in processes.values():
294                p.terminate()
295            shutdown_worker()
296            return
297        if isinstance(result_item, int):
298            # Clean shutdown of a worker using its PID
299            # (avoids marking the executor broken)
300            assert shutting_down()
301            p = processes.pop(result_item)
302            p.join()
303            if not processes:
304                shutdown_worker()
305                return
306        elif result_item is not None:
307            work_item = pending_work_items.pop(result_item.work_id, None)
308            # work_item can be None if another process terminated (see above)
309            if work_item is not None:
310                if result_item.exception:
311                    work_item.future.set_exception(result_item.exception)
312                else:
313                    work_item.future.set_result(result_item.result)
314                # Delete references to object. See issue16284
315                del work_item
316        # Check whether we should start shutting down.
317        executor = executor_reference()
318        # No more work items can be added if:
319        #   - The interpreter is shutting down OR
320        #   - The executor that owns this worker has been collected OR
321        #   - The executor that owns this worker has been shutdown.
322        if shutting_down():
323            try:
324                # Since no new work items can be added, it is safe to shutdown
325                # this thread if there are no pending work items.
326                if not pending_work_items:
327                    shutdown_worker()
328                    return
329            except Full:
330                # This is not a problem: we will eventually be woken up (in
331                # result_queue.get()) and be able to send a sentinel again.
332                pass
333        executor = None
334
335_system_limits_checked = False
336_system_limited = None
337def _check_system_limits():
338    global _system_limits_checked, _system_limited
339    if _system_limits_checked:
340        if _system_limited:
341            raise NotImplementedError(_system_limited)
342    _system_limits_checked = True
343    try:
344        nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
345    except (AttributeError, ValueError):
346        # sysconf not available or setting not available
347        return
348    if nsems_max == -1:
349        # indetermined limit, assume that limit is determined
350        # by available memory only
351        return
352    if nsems_max >= 256:
353        # minimum number of semaphores available
354        # according to POSIX
355        return
356    _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
357    raise NotImplementedError(_system_limited)
358
359
360class BrokenProcessPool(RuntimeError):
361    """
362    Raised when a process in a ProcessPoolExecutor terminated abruptly
363    while a future was in the running state.
364    """
365
366
367class ProcessPoolExecutor(_base.Executor):
368    def __init__(self, max_workers=None):
369        """Initializes a new ProcessPoolExecutor instance.
370
371        Args:
372            max_workers: The maximum number of processes that can be used to
373                execute the given calls. If None or not given then as many
374                worker processes will be created as the machine has processors.
375        """
376        _check_system_limits()
377
378        if max_workers is None:
379            self._max_workers = os.cpu_count() or 1
380        else:
381            if max_workers <= 0:
382                raise ValueError("max_workers must be greater than 0")
383
384            self._max_workers = max_workers
385
386        # Make the call queue slightly larger than the number of processes to
387        # prevent the worker processes from idling. But don't make it too big
388        # because futures in the call queue cannot be cancelled.
389        self._call_queue = multiprocessing.Queue(self._max_workers +
390                                                 EXTRA_QUEUED_CALLS)
391        # Killed worker processes can produce spurious "broken pipe"
392        # tracebacks in the queue's own worker thread. But we detect killed
393        # processes anyway, so silence the tracebacks.
394        self._call_queue._ignore_epipe = True
395        self._result_queue = SimpleQueue()
396        self._work_ids = queue.Queue()
397        self._queue_management_thread = None
398        # Map of pids to processes
399        self._processes = {}
400
401        # Shutdown is a two-step process.
402        self._shutdown_thread = False
403        self._shutdown_lock = threading.Lock()
404        self._broken = False
405        self._queue_count = 0
406        self._pending_work_items = {}
407
408    def _start_queue_management_thread(self):
409        # When the executor gets lost, the weakref callback will wake up
410        # the queue management thread.
411        def weakref_cb(_, q=self._result_queue):
412            q.put(None)
413        if self._queue_management_thread is None:
414            # Start the processes so that their sentinels are known.
415            self._adjust_process_count()
416            self._queue_management_thread = threading.Thread(
417                    target=_queue_management_worker,
418                    args=(weakref.ref(self, weakref_cb),
419                          self._processes,
420                          self._pending_work_items,
421                          self._work_ids,
422                          self._call_queue,
423                          self._result_queue))
424            self._queue_management_thread.daemon = True
425            self._queue_management_thread.start()
426            _threads_queues[self._queue_management_thread] = self._result_queue
427
428    def _adjust_process_count(self):
429        for _ in range(len(self._processes), self._max_workers):
430            p = multiprocessing.Process(
431                    target=_process_worker,
432                    args=(self._call_queue,
433                          self._result_queue))
434            p.start()
435            self._processes[p.pid] = p
436
437    def submit(self, fn, *args, **kwargs):
438        with self._shutdown_lock:
439            if self._broken:
440                raise BrokenProcessPool('A child process terminated '
441                    'abruptly, the process pool is not usable anymore')
442            if self._shutdown_thread:
443                raise RuntimeError('cannot schedule new futures after shutdown')
444
445            f = _base.Future()
446            w = _WorkItem(f, fn, args, kwargs)
447
448            self._pending_work_items[self._queue_count] = w
449            self._work_ids.put(self._queue_count)
450            self._queue_count += 1
451            # Wake up queue management thread
452            self._result_queue.put(None)
453
454            self._start_queue_management_thread()
455            return f
456    submit.__doc__ = _base.Executor.submit.__doc__
457
458    def map(self, fn, *iterables, timeout=None, chunksize=1):
459        """Returns an iterator equivalent to map(fn, iter).
460
461        Args:
462            fn: A callable that will take as many arguments as there are
463                passed iterables.
464            timeout: The maximum number of seconds to wait. If None, then there
465                is no limit on the wait time.
466            chunksize: If greater than one, the iterables will be chopped into
467                chunks of size chunksize and submitted to the process pool.
468                If set to one, the items in the list will be sent one at a time.
469
470        Returns:
471            An iterator equivalent to: map(func, *iterables) but the calls may
472            be evaluated out-of-order.
473
474        Raises:
475            TimeoutError: If the entire result iterator could not be generated
476                before the given timeout.
477            Exception: If fn(*args) raises for any values.
478        """
479        if chunksize < 1:
480            raise ValueError("chunksize must be >= 1.")
481
482        results = super().map(partial(_process_chunk, fn),
483                              _get_chunks(*iterables, chunksize=chunksize),
484                              timeout=timeout)
485        return itertools.chain.from_iterable(results)
486
487    def shutdown(self, wait=True):
488        with self._shutdown_lock:
489            self._shutdown_thread = True
490        if self._queue_management_thread:
491            # Wake up queue management thread
492            self._result_queue.put(None)
493            if wait:
494                self._queue_management_thread.join()
495        # To reduce the risk of opening too many files, remove references to
496        # objects that use file descriptors.
497        self._queue_management_thread = None
498        self._call_queue = None
499        self._result_queue = None
500        self._processes = None
501    shutdown.__doc__ = _base.Executor.shutdown.__doc__
502
503atexit.register(_python_exit)
504