14adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 24adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Module providing the `Pool` class for managing a process pool 34adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 44adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# multiprocessing/pool.py 54adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 64adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Copyright (c) 2006-2008, R Oudkerk 74adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# All rights reserved. 84adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 94adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Redistribution and use in source and binary forms, with or without 104adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# modification, are permitted provided that the following conditions 114adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# are met: 124adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 134adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 1. Redistributions of source code must retain the above copyright 144adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# notice, this list of conditions and the following disclaimer. 154adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 2. Redistributions in binary form must reproduce the above copyright 164adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# notice, this list of conditions and the following disclaimer in the 174adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# documentation and/or other materials provided with the distribution. 184adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 3. Neither the name of author nor the names of any contributors may be 194adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# used to endorse or promote products derived from this software 204adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# without specific prior written permission. 214adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 224adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 234adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 244adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 254adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 264adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 274adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 284adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 294adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 304adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 314adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 324adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# SUCH DAMAGE. 334adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 344adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 354adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao__all__ = ['Pool'] 364adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 374adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 384adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Imports 394adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 404adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 414adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoimport threading 424adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoimport Queue 434adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoimport itertools 444adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoimport collections 454adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoimport time 464adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 474adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaofrom multiprocessing import Process, cpu_count, TimeoutError 484adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaofrom multiprocessing.util import Finalize, debug 494adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 504adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 514adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Constants representing the state of a pool 524adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 534adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 544adfde8bc82dd39f59e0445588c3e599ada477dJosh GaoRUN = 0 554adfde8bc82dd39f59e0445588c3e599ada477dJosh GaoCLOSE = 1 564adfde8bc82dd39f59e0445588c3e599ada477dJosh GaoTERMINATE = 2 574adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 584adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 594adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Miscellaneous 604adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 614adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 624adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaojob_counter = itertools.count() 634adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 644adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaodef mapstar(args): 654adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return map(*args) 664adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 674adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 684adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Code run by worker processes 694adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 704adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 714adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoclass MaybeEncodingError(Exception): 724adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao """Wraps possible unpickleable errors, so they can be 734adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao safely sent through the socket.""" 744adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 754adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def __init__(self, exc, value): 764adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self.exc = repr(exc) 774adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self.value = repr(value) 784adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao super(MaybeEncodingError, self).__init__(self.exc, self.value) 794adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 804adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def __str__(self): 814adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return "Error sending result: '%s'. Reason: '%s'" % (self.value, 824adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self.exc) 834adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 844adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def __repr__(self): 854adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return "<MaybeEncodingError: %s>" % str(self) 864adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 874adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 884adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaodef worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): 894adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) 904adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao put = outqueue.put 914adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao get = inqueue.get 924adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if hasattr(inqueue, '_writer'): 934adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao inqueue._writer.close() 944adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao outqueue._reader.close() 954adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 964adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if initializer is not None: 974adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao initializer(*initargs) 984adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 994adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao completed = 0 1004adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao while maxtasks is None or (maxtasks and completed < maxtasks): 1014adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 1024adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao task = get() 1034adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao except (EOFError, IOError): 1044adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('worker got EOFError or IOError -- exiting') 1054adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao break 1064adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1074adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if task is None: 1084adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('worker got sentinel -- exiting') 1094adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao break 1104adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1114adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao job, i, func, args, kwds = task 1124adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 1134adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao result = (True, func(*args, **kwds)) 1144adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao except Exception, e: 1154adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao result = (False, e) 1164adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 1174adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao put((job, i, result)) 1184adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao except Exception as e: 1194adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao wrapped = MaybeEncodingError(e, result[1]) 1204adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug("Possible encoding error while sending result: %s" % ( 1214adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao wrapped)) 1224adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao put((job, i, (False, wrapped))) 1234adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao completed += 1 1244adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('worker exiting after %d tasks' % completed) 1254adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1264adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 1274adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Class representing a process pool 1284adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 1294adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1304adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoclass Pool(object): 1314adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao ''' 1324adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao Class which supports an async version of the `apply()` builtin 1334adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao ''' 1344adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao Process = Process 1354adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1364adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def __init__(self, processes=None, initializer=None, initargs=(), 1374adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao maxtasksperchild=None): 1384adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._setup_queues() 1394adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._taskqueue = Queue.Queue() 1404adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cache = {} 1414adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._state = RUN 1424adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._maxtasksperchild = maxtasksperchild 1434adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._initializer = initializer 1444adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._initargs = initargs 1454adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1464adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if processes is None: 1474adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 1484adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao processes = cpu_count() 1494adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao except NotImplementedError: 1504adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao processes = 1 1514adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if processes < 1: 1524adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao raise ValueError("Number of processes must be at least 1") 1534adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1544adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if initializer is not None and not hasattr(initializer, '__call__'): 1554adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao raise TypeError('initializer must be a callable') 1564adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1574adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._processes = processes 1584adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._pool = [] 1594adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._repopulate_pool() 1604adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1614adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._worker_handler = threading.Thread( 1624adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao target=Pool._handle_workers, 1634adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao args=(self, ) 1644adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao ) 1654adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._worker_handler.daemon = True 1664adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._worker_handler._state = RUN 1674adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._worker_handler.start() 1684adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1694adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1704adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._task_handler = threading.Thread( 1714adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao target=Pool._handle_tasks, 1724adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao args=(self._taskqueue, self._quick_put, self._outqueue, self._pool) 1734adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao ) 1744adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._task_handler.daemon = True 1754adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._task_handler._state = RUN 1764adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._task_handler.start() 1774adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1784adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._result_handler = threading.Thread( 1794adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao target=Pool._handle_results, 1804adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao args=(self._outqueue, self._quick_get, self._cache) 1814adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao ) 1824adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._result_handler.daemon = True 1834adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._result_handler._state = RUN 1844adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._result_handler.start() 1854adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1864adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._terminate = Finalize( 1874adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self, self._terminate_pool, 1884adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, 1894adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._worker_handler, self._task_handler, 1904adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._result_handler, self._cache), 1914adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao exitpriority=15 1924adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao ) 1934adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1944adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def _join_exited_workers(self): 1954adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao """Cleanup after any worker processes which have exited due to reaching 1964adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao their specified lifetime. Returns True if any workers were cleaned up. 1974adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao """ 1984adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao cleaned = False 1994adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao for i in reversed(range(len(self._pool))): 2004adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao worker = self._pool[i] 2014adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if worker.exitcode is not None: 2024adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao # worker exited 2034adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('cleaning up worker %d' % i) 2044adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao worker.join() 2054adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao cleaned = True 2064adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao del self._pool[i] 2074adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return cleaned 2084adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 2094adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def _repopulate_pool(self): 2104adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao """Bring the number of pool processes up to the specified number, 2114adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao for use after reaping workers which have exited. 2124adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao """ 2134adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao for i in range(self._processes - len(self._pool)): 2144adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao w = self.Process(target=worker, 2154adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao args=(self._inqueue, self._outqueue, 2164adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._initializer, 2174adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._initargs, self._maxtasksperchild) 2184adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao ) 2194adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._pool.append(w) 2204adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao w.name = w.name.replace('Process', 'PoolWorker') 2214adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao w.daemon = True 2224adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao w.start() 2234adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('added worker') 2244adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 2254adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def _maintain_pool(self): 2264adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao """Clean up any exited workers and start replacements for them. 2274adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao """ 2284adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if self._join_exited_workers(): 2294adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._repopulate_pool() 2304adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 2314adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def _setup_queues(self): 2324adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao from .queues import SimpleQueue 2334adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._inqueue = SimpleQueue() 2344adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._outqueue = SimpleQueue() 2354adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._quick_put = self._inqueue._writer.send 2364adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._quick_get = self._outqueue._reader.recv 2374adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 2384adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def apply(self, func, args=(), kwds={}): 2394adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao ''' 2404adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao Equivalent of `apply()` builtin 2414adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao ''' 2424adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao assert self._state == RUN 2434adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return self.apply_async(func, args, kwds).get() 2444adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 2454adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def map(self, func, iterable, chunksize=None): 2464adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao ''' 2474adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao Equivalent of `map()` builtin 2484adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao ''' 2494adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao assert self._state == RUN 2504adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return self.map_async(func, iterable, chunksize).get() 2514adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 2524adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def imap(self, func, iterable, chunksize=1): 2534adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao ''' 2544adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()` 2554adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao ''' 2564adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao assert self._state == RUN 2574adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if chunksize == 1: 2584adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao result = IMapIterator(self._cache) 2594adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._taskqueue.put((((result._job, i, func, (x,), {}) 2604adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao for i, x in enumerate(iterable)), result._set_length)) 2614adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return result 2624adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao else: 2634adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao assert chunksize > 1 2644adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao task_batches = Pool._get_tasks(func, iterable, chunksize) 2654adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao result = IMapIterator(self._cache) 2664adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._taskqueue.put((((result._job, i, mapstar, (x,), {}) 2674adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao for i, x in enumerate(task_batches)), result._set_length)) 2684adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return (item for chunk in result for item in chunk) 2694adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 2704adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def imap_unordered(self, func, iterable, chunksize=1): 2714adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao ''' 2724adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao Like `imap()` method but ordering of results is arbitrary 2734adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao ''' 2744adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao assert self._state == RUN 2754adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if chunksize == 1: 2764adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao result = IMapUnorderedIterator(self._cache) 2774adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._taskqueue.put((((result._job, i, func, (x,), {}) 2784adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao for i, x in enumerate(iterable)), result._set_length)) 2794adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return result 2804adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao else: 2814adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao assert chunksize > 1 2824adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao task_batches = Pool._get_tasks(func, iterable, chunksize) 2834adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao result = IMapUnorderedIterator(self._cache) 2844adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._taskqueue.put((((result._job, i, mapstar, (x,), {}) 2854adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao for i, x in enumerate(task_batches)), result._set_length)) 2864adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return (item for chunk in result for item in chunk) 2874adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 2884adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def apply_async(self, func, args=(), kwds={}, callback=None): 2894adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao ''' 2904adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao Asynchronous equivalent of `apply()` builtin 2914adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao ''' 2924adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao assert self._state == RUN 2934adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao result = ApplyResult(self._cache, callback) 2944adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) 2954adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return result 2964adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 2974adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def map_async(self, func, iterable, chunksize=None, callback=None): 2984adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao ''' 2994adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao Asynchronous equivalent of `map()` builtin 3004adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao ''' 3014adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao assert self._state == RUN 3024adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if not hasattr(iterable, '__len__'): 3034adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao iterable = list(iterable) 3044adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3054adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if chunksize is None: 3064adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao chunksize, extra = divmod(len(iterable), len(self._pool) * 4) 3074adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if extra: 3084adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao chunksize += 1 3094adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if len(iterable) == 0: 3104adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao chunksize = 0 3114adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3124adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao task_batches = Pool._get_tasks(func, iterable, chunksize) 3134adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao result = MapResult(self._cache, chunksize, len(iterable), callback) 3144adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._taskqueue.put((((result._job, i, mapstar, (x,), {}) 3154adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao for i, x in enumerate(task_batches)), None)) 3164adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return result 3174adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3184adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao @staticmethod 3194adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def _handle_workers(pool): 3204adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao thread = threading.current_thread() 3214adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3224adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao # Keep maintaining workers until the cache gets drained, unless the pool 3234adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao # is terminated. 3244adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao while thread._state == RUN or (pool._cache and thread._state != TERMINATE): 3254adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao pool._maintain_pool() 3264adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao time.sleep(0.1) 3274adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao # send sentinel to stop workers 3284adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao pool._taskqueue.put(None) 3294adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('worker handler exiting') 3304adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3314adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao @staticmethod 3324adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def _handle_tasks(taskqueue, put, outqueue, pool): 3334adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao thread = threading.current_thread() 3344adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3354adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao for taskseq, set_length in iter(taskqueue.get, None): 3364adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao i = -1 3374adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao for i, task in enumerate(taskseq): 3384adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if thread._state: 3394adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('task handler found thread._state != RUN') 3404adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao break 3414adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 3424adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao put(task) 3434adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao except IOError: 3444adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('could not put task on queue') 3454adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao break 3464adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao else: 3474adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if set_length: 3484adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('doing set_length()') 3494adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao set_length(i+1) 3504adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao continue 3514adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao break 3524adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao else: 3534adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('task handler got sentinel') 3544adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3554adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3564adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 3574adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao # tell result handler to finish when cache is empty 3584adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('task handler sending sentinel to result handler') 3594adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao outqueue.put(None) 3604adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3614adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao # tell workers there is no more work 3624adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('task handler sending sentinel to workers') 3634adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao for p in pool: 3644adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao put(None) 3654adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao except IOError: 3664adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('task handler got IOError when sending sentinels') 3674adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3684adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('task handler exiting') 3694adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3704adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao @staticmethod 3714adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def _handle_results(outqueue, get, cache): 3724adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao thread = threading.current_thread() 3734adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3744adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao while 1: 3754adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 3764adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao task = get() 3774adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao except (IOError, EOFError): 3784adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('result handler got EOFError/IOError -- exiting') 3794adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return 3804adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3814adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if thread._state: 3824adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao assert thread._state == TERMINATE 3834adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('result handler found thread._state=TERMINATE') 3844adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao break 3854adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3864adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if task is None: 3874adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('result handler got sentinel') 3884adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao break 3894adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3904adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao job, i, obj = task 3914adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 3924adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao cache[job]._set(i, obj) 3934adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao except KeyError: 3944adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao pass 3954adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3964adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao while cache and thread._state != TERMINATE: 3974adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 3984adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao task = get() 3994adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao except (IOError, EOFError): 4004adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('result handler got EOFError/IOError -- exiting') 4014adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return 4024adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 4034adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if task is None: 4044adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('result handler ignoring extra sentinel') 4054adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao continue 4064adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao job, i, obj = task 4074adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 4084adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao cache[job]._set(i, obj) 4094adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao except KeyError: 4104adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao pass 4114adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 4124adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if hasattr(outqueue, '_reader'): 4134adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('ensuring that outqueue is not full') 4144adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao # If we don't make room available in outqueue then 4154adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao # attempts to add the sentinel (None) to outqueue may 4164adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao # block. There is guaranteed to be no more than 2 sentinels. 4174adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 4184adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao for i in range(10): 4194adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if not outqueue._reader.poll(): 4204adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao break 4214adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao get() 4224adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao except (IOError, EOFError): 4234adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao pass 4244adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 4254adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('result handler exiting: len(cache)=%s, thread._state=%s', 4264adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao len(cache), thread._state) 4274adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 4284adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao @staticmethod 4294adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def _get_tasks(func, it, size): 4304adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao it = iter(it) 4314adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao while 1: 4324adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao x = tuple(itertools.islice(it, size)) 4334adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if not x: 4344adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return 4354adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao yield (func, x) 4364adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 4374adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def __reduce__(self): 4384adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao raise NotImplementedError( 4394adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 'pool objects cannot be passed between processes or pickled' 4404adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao ) 4414adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 4424adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def close(self): 4434adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('closing pool') 4444adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if self._state == RUN: 4454adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._state = CLOSE 4464adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._worker_handler._state = CLOSE 4474adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 4484adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def terminate(self): 4494adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('terminating pool') 4504adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._state = TERMINATE 4514adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._worker_handler._state = TERMINATE 4524adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._terminate() 4534adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 4544adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def join(self): 4554adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('joining pool') 4564adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao assert self._state in (CLOSE, TERMINATE) 4574adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._worker_handler.join() 4584adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._task_handler.join() 4594adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._result_handler.join() 4604adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao for p in self._pool: 4614adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao p.join() 4624adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 4634adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao @staticmethod 4644adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def _help_stuff_finish(inqueue, task_handler, size): 4654adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao # task_handler may be blocked trying to put items on inqueue 4664adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('removing tasks from inqueue until task handler finished') 4674adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao inqueue._rlock.acquire() 4684adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao while task_handler.is_alive() and inqueue._reader.poll(): 4694adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao inqueue._reader.recv() 4704adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao time.sleep(0) 4714adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 4724adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao @classmethod 4734adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, 4744adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao worker_handler, task_handler, result_handler, cache): 4754adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao # this is guaranteed to only be called once 4764adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('finalizing pool') 4774adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 4784adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao worker_handler._state = TERMINATE 4794adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao task_handler._state = TERMINATE 4804adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 4814adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('helping task handler/workers to finish') 4824adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao cls._help_stuff_finish(inqueue, task_handler, len(pool)) 4834adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 4844adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao assert result_handler.is_alive() or len(cache) == 0 4854adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 4864adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao result_handler._state = TERMINATE 4874adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao outqueue.put(None) # sentinel 4884adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 4894adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao # We must wait for the worker handler to exit before terminating 4904adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao # workers because we don't want workers to be restarted behind our back. 4914adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('joining worker handler') 4924adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if threading.current_thread() is not worker_handler: 4934adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao worker_handler.join(1e100) 4944adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 4954adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao # Terminate workers which haven't already finished. 4964adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if pool and hasattr(pool[0], 'terminate'): 4974adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('terminating workers') 4984adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao for p in pool: 4994adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if p.exitcode is None: 5004adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao p.terminate() 5014adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 5024adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('joining task handler') 5034adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if threading.current_thread() is not task_handler: 5044adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao task_handler.join(1e100) 5054adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 5064adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('joining result handler') 5074adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if threading.current_thread() is not result_handler: 5084adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao result_handler.join(1e100) 5094adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 5104adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if pool and hasattr(pool[0], 'terminate'): 5114adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('joining pool workers') 5124adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao for p in pool: 5134adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if p.is_alive(): 5144adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao # worker has not yet exited 5154adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('cleaning up worker %d' % p.pid) 5164adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao p.join() 5174adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 5184adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 5194adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Class whose instances are returned by `Pool.apply_async()` 5204adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 5214adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 5224adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoclass ApplyResult(object): 5234adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 5244adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def __init__(self, cache, callback): 5254adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond = threading.Condition(threading.Lock()) 5264adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._job = job_counter.next() 5274adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cache = cache 5284adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._ready = False 5294adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._callback = callback 5304adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao cache[self._job] = self 5314adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 5324adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def ready(self): 5334adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return self._ready 5344adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 5354adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def successful(self): 5364adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao assert self._ready 5374adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return self._success 5384adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 5394adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def wait(self, timeout=None): 5404adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.acquire() 5414adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 5424adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if not self._ready: 5434adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.wait(timeout) 5444adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao finally: 5454adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.release() 5464adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 5474adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def get(self, timeout=None): 5484adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self.wait(timeout) 5494adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if not self._ready: 5504adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao raise TimeoutError 5514adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if self._success: 5524adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return self._value 5534adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao else: 5544adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao raise self._value 5554adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 5564adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def _set(self, i, obj): 5574adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._success, self._value = obj 5584adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if self._callback and self._success: 5594adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._callback(self._value) 5604adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.acquire() 5614adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 5624adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._ready = True 5634adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.notify() 5644adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao finally: 5654adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.release() 5664adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao del self._cache[self._job] 5674adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 5684adfde8bc82dd39f59e0445588c3e599ada477dJosh GaoAsyncResult = ApplyResult # create alias -- see #17805 5694adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 5704adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 5714adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Class whose instances are returned by `Pool.map_async()` 5724adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 5734adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 5744adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoclass MapResult(ApplyResult): 5754adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 5764adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def __init__(self, cache, chunksize, length, callback): 5774adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao ApplyResult.__init__(self, cache, callback) 5784adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._success = True 5794adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._value = [None] * length 5804adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._chunksize = chunksize 5814adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if chunksize <= 0: 5824adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._number_left = 0 5834adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._ready = True 5844adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao del cache[self._job] 5854adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao else: 5864adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._number_left = length//chunksize + bool(length % chunksize) 5874adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 5884adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def _set(self, i, success_result): 5894adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao success, result = success_result 5904adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if success: 5914adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._value[i*self._chunksize:(i+1)*self._chunksize] = result 5924adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._number_left -= 1 5934adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if self._number_left == 0: 5944adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if self._callback: 5954adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._callback(self._value) 5964adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao del self._cache[self._job] 5974adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.acquire() 5984adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 5994adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._ready = True 6004adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.notify() 6014adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao finally: 6024adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.release() 6034adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 6044adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao else: 6054adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._success = False 6064adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._value = result 6074adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao del self._cache[self._job] 6084adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.acquire() 6094adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 6104adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._ready = True 6114adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.notify() 6124adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao finally: 6134adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.release() 6144adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 6154adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 6164adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Class whose instances are returned by `Pool.imap()` 6174adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 6184adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 6194adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoclass IMapIterator(object): 6204adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 6214adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def __init__(self, cache): 6224adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond = threading.Condition(threading.Lock()) 6234adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._job = job_counter.next() 6244adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cache = cache 6254adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._items = collections.deque() 6264adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._index = 0 6274adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._length = None 6284adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._unsorted = {} 6294adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao cache[self._job] = self 6304adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 6314adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def __iter__(self): 6324adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return self 6334adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 6344adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def next(self, timeout=None): 6354adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.acquire() 6364adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 6374adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 6384adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao item = self._items.popleft() 6394adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao except IndexError: 6404adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if self._index == self._length: 6414adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao raise StopIteration 6424adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.wait(timeout) 6434adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 6444adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao item = self._items.popleft() 6454adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao except IndexError: 6464adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if self._index == self._length: 6474adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao raise StopIteration 6484adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao raise TimeoutError 6494adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao finally: 6504adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.release() 6514adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 6524adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao success, value = item 6534adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if success: 6544adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return value 6554adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao raise value 6564adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 6574adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao __next__ = next # XXX 6584adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 6594adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def _set(self, i, obj): 6604adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.acquire() 6614adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 6624adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if self._index == i: 6634adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._items.append(obj) 6644adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._index += 1 6654adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao while self._index in self._unsorted: 6664adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao obj = self._unsorted.pop(self._index) 6674adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._items.append(obj) 6684adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._index += 1 6694adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.notify() 6704adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao else: 6714adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._unsorted[i] = obj 6724adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 6734adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if self._index == self._length: 6744adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao del self._cache[self._job] 6754adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao finally: 6764adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.release() 6774adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 6784adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def _set_length(self, length): 6794adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.acquire() 6804adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 6814adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._length = length 6824adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if self._index == self._length: 6834adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.notify() 6844adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao del self._cache[self._job] 6854adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao finally: 6864adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.release() 6874adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 6884adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 6894adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Class whose instances are returned by `Pool.imap_unordered()` 6904adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 6914adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 6924adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoclass IMapUnorderedIterator(IMapIterator): 6934adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 6944adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def _set(self, i, obj): 6954adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.acquire() 6964adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 6974adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._items.append(obj) 6984adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._index += 1 6994adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.notify() 7004adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if self._index == self._length: 7014adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao del self._cache[self._job] 7024adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao finally: 7034adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.release() 7044adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 7054adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 7064adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 7074adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 7084adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 7094adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoclass ThreadPool(Pool): 7104adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 7114adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao from .dummy import Process 7124adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 7134adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def __init__(self, processes=None, initializer=None, initargs=()): 7144adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao Pool.__init__(self, processes, initializer, initargs) 7154adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 7164adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def _setup_queues(self): 7174adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._inqueue = Queue.Queue() 7184adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._outqueue = Queue.Queue() 7194adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._quick_put = self._inqueue.put 7204adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._quick_get = self._outqueue.get 7214adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 7224adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao @staticmethod 7234adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def _help_stuff_finish(inqueue, task_handler, size): 7244adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao # put sentinels at head of inqueue to make workers finish 7254adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao inqueue.not_empty.acquire() 7264adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 7274adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao inqueue.queue.clear() 7284adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao inqueue.queue.extend([None] * size) 7294adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao inqueue.not_empty.notify_all() 7304adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao finally: 7314adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao inqueue.not_empty.release() 732