1# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ThreadPoolExecutor."""
5
6import atexit
7from concurrent.futures import _base
8import itertools
9import Queue as queue
10import threading
11import weakref
12import sys
13
14try:
15    from multiprocessing import cpu_count
16except ImportError:
17    # some platforms don't have multiprocessing
18    def cpu_count():
19        return None
20
21__author__ = 'Brian Quinlan (brian@sweetapp.com)'
22
23# Workers are created as daemon threads. This is done to allow the interpreter
24# to exit when there are still idle threads in a ThreadPoolExecutor's thread
25# pool (i.e. shutdown() was not called). However, allowing workers to die with
26# the interpreter has two undesirable properties:
27#   - The workers would still be running during interpretor shutdown,
28#     meaning that they would fail in unpredictable ways.
29#   - The workers could be killed while evaluating a work item, which could
30#     be bad if the callable being evaluated has external side-effects e.g.
31#     writing to a file.
32#
33# To work around this problem, an exit handler is installed which tells the
34# workers to exit when their work queues are empty and then waits until the
35# threads finish.
36
37_threads_queues = weakref.WeakKeyDictionary()
38_shutdown = False
39
40def _python_exit():
41    global _shutdown
42    _shutdown = True
43    items = list(_threads_queues.items()) if _threads_queues else ()
44    for t, q in items:
45        q.put(None)
46    for t, q in items:
47        t.join(sys.maxint)
48
49atexit.register(_python_exit)
50
51class _WorkItem(object):
52    def __init__(self, future, fn, args, kwargs):
53        self.future = future
54        self.fn = fn
55        self.args = args
56        self.kwargs = kwargs
57
58    def run(self):
59        if not self.future.set_running_or_notify_cancel():
60            return
61
62        try:
63            result = self.fn(*self.args, **self.kwargs)
64        except:
65            e, tb = sys.exc_info()[1:]
66            self.future.set_exception_info(e, tb)
67        else:
68            self.future.set_result(result)
69
70def _worker(executor_reference, work_queue):
71    try:
72        while True:
73            work_item = work_queue.get(block=True)
74            if work_item is not None:
75                work_item.run()
76                # Delete references to object. See issue16284
77                del work_item
78                continue
79            executor = executor_reference()
80            # Exit if:
81            #   - The interpreter is shutting down OR
82            #   - The executor that owns the worker has been collected OR
83            #   - The executor that owns the worker has been shutdown.
84            if _shutdown or executor is None or executor._shutdown:
85                # Notice other workers
86                work_queue.put(None)
87                return
88            del executor
89    except:
90        _base.LOGGER.critical('Exception in worker', exc_info=True)
91
92
93class ThreadPoolExecutor(_base.Executor):
94
95    # Used to assign unique thread names when thread_name_prefix is not supplied.
96    _counter = itertools.count().next
97
98    def __init__(self, max_workers=None, thread_name_prefix=''):
99        """Initializes a new ThreadPoolExecutor instance.
100
101        Args:
102            max_workers: The maximum number of threads that can be used to
103                execute the given calls.
104            thread_name_prefix: An optional name prefix to give our threads.
105        """
106        if max_workers is None:
107            # Use this number because ThreadPoolExecutor is often
108            # used to overlap I/O instead of CPU work.
109            max_workers = (cpu_count() or 1) * 5
110        if max_workers <= 0:
111            raise ValueError("max_workers must be greater than 0")
112
113        self._max_workers = max_workers
114        self._work_queue = queue.Queue()
115        self._threads = set()
116        self._shutdown = False
117        self._shutdown_lock = threading.Lock()
118        self._thread_name_prefix = (thread_name_prefix or
119                                    ("ThreadPoolExecutor-%d" % self._counter()))
120
121    def submit(self, fn, *args, **kwargs):
122        with self._shutdown_lock:
123            if self._shutdown:
124                raise RuntimeError('cannot schedule new futures after shutdown')
125
126            f = _base.Future()
127            w = _WorkItem(f, fn, args, kwargs)
128
129            self._work_queue.put(w)
130            self._adjust_thread_count()
131            return f
132    submit.__doc__ = _base.Executor.submit.__doc__
133
134    def _adjust_thread_count(self):
135        # When the executor gets lost, the weakref callback will wake up
136        # the worker threads.
137        def weakref_cb(_, q=self._work_queue):
138            q.put(None)
139        # TODO(bquinlan): Should avoid creating new threads if there are more
140        # idle threads than items in the work queue.
141        num_threads = len(self._threads)
142        if num_threads < self._max_workers:
143            thread_name = '%s_%d' % (self._thread_name_prefix or self,
144                                     num_threads)
145            t = threading.Thread(name=thread_name, target=_worker,
146                                 args=(weakref.ref(self, weakref_cb),
147                                       self._work_queue))
148            t.daemon = True
149            t.start()
150            self._threads.add(t)
151            _threads_queues[t] = self._work_queue
152
153    def shutdown(self, wait=True):
154        with self._shutdown_lock:
155            self._shutdown = True
156            self._work_queue.put(None)
157        if wait:
158            for t in self._threads:
159                t.join(sys.maxint)
160    shutdown.__doc__ = _base.Executor.shutdown.__doc__
161