10a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 20a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Module providing the `Pool` class for managing a process pool 30a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 40a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# multiprocessing/pool.py 50a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 60a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Copyright (c) 2006-2008, R Oudkerk 70a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# All rights reserved. 80a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 90a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Redistribution and use in source and binary forms, with or without 100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# modification, are permitted provided that the following conditions 110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# are met: 120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 1. Redistributions of source code must retain the above copyright 140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# notice, this list of conditions and the following disclaimer. 150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 2. Redistributions in binary form must reproduce the above copyright 160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# notice, this list of conditions and the following disclaimer in the 170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# documentation and/or other materials provided with the distribution. 180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 3. Neither the name of author nor the names of any contributors may be 190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# used to endorse or promote products derived from this software 200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# without specific prior written permission. 210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# SUCH DAMAGE. 330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao__all__ = ['Pool'] 360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Imports 390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 410a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport threading 420a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport Queue 430a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport itertools 440a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport collections 450a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport time 460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 470a8c90248264a8b26970b4473770bcc3df8515fJosh Gaofrom multiprocessing import Process, cpu_count, TimeoutError 480a8c90248264a8b26970b4473770bcc3df8515fJosh Gaofrom multiprocessing.util import Finalize, debug 490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Constants representing the state of a pool 520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 540a8c90248264a8b26970b4473770bcc3df8515fJosh GaoRUN = 0 550a8c90248264a8b26970b4473770bcc3df8515fJosh GaoCLOSE = 1 560a8c90248264a8b26970b4473770bcc3df8515fJosh GaoTERMINATE = 2 570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Miscellaneous 600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 620a8c90248264a8b26970b4473770bcc3df8515fJosh Gaojob_counter = itertools.count() 630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 640a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef mapstar(args): 650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return map(*args) 660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Code run by worker processes 690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 710a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass MaybeEncodingError(Exception): 720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao """Wraps possible unpickleable errors, so they can be 730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao safely sent through the socket.""" 740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def __init__(self, exc, value): 760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self.exc = repr(exc) 770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self.value = repr(value) 780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao super(MaybeEncodingError, self).__init__(self.exc, self.value) 790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def __str__(self): 810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return "Error sending result: '%s'. Reason: '%s'" % (self.value, 820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self.exc) 830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def __repr__(self): 850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return "<MaybeEncodingError: %s>" % str(self) 860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 880a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): 890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) 900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao put = outqueue.put 910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao get = inqueue.get 920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if hasattr(inqueue, '_writer'): 930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao inqueue._writer.close() 940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao outqueue._reader.close() 950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if initializer is not None: 970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao initializer(*initargs) 980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao completed = 0 1000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao while maxtasks is None or (maxtasks and completed < maxtasks): 1010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 1020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao task = get() 1030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao except (EOFError, IOError): 1040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('worker got EOFError or IOError -- exiting') 1050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao break 1060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if task is None: 1080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('worker got sentinel -- exiting') 1090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao break 1100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao job, i, func, args, kwds = task 1120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 1130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao result = (True, func(*args, **kwds)) 1140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao except Exception, e: 1150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao result = (False, e) 1160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 1170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao put((job, i, result)) 1180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao except Exception as e: 1190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao wrapped = MaybeEncodingError(e, result[1]) 1200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug("Possible encoding error while sending result: %s" % ( 1210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao wrapped)) 1220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao put((job, i, (False, wrapped))) 1230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao completed += 1 1240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('worker exiting after %d tasks' % completed) 1250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 1270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Class representing a process pool 1280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 1290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1300a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass Pool(object): 1310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 1320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao Class which supports an async version of the `apply()` builtin 1330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 1340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao Process = Process 1350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def __init__(self, processes=None, initializer=None, initargs=(), 1370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao maxtasksperchild=None): 1380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._setup_queues() 1390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._taskqueue = Queue.Queue() 1400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cache = {} 1410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._state = RUN 1420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._maxtasksperchild = maxtasksperchild 1430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._initializer = initializer 1440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._initargs = initargs 1450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if processes is None: 1470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 1480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao processes = cpu_count() 1490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao except NotImplementedError: 1500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao processes = 1 1510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if processes < 1: 1520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao raise ValueError("Number of processes must be at least 1") 1530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if initializer is not None and not hasattr(initializer, '__call__'): 1550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao raise TypeError('initializer must be a callable') 1560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._processes = processes 1580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._pool = [] 1590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._repopulate_pool() 1600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._worker_handler = threading.Thread( 1620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao target=Pool._handle_workers, 1630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao args=(self, ) 1640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ) 1650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._worker_handler.daemon = True 1660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._worker_handler._state = RUN 1670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._worker_handler.start() 1680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._task_handler = threading.Thread( 1710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao target=Pool._handle_tasks, 1720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao args=(self._taskqueue, self._quick_put, self._outqueue, self._pool) 1730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ) 1740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._task_handler.daemon = True 1750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._task_handler._state = RUN 1760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._task_handler.start() 1770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._result_handler = threading.Thread( 1790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao target=Pool._handle_results, 1800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao args=(self._outqueue, self._quick_get, self._cache) 1810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ) 1820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._result_handler.daemon = True 1830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._result_handler._state = RUN 1840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._result_handler.start() 1850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._terminate = Finalize( 1870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self, self._terminate_pool, 1880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, 1890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._worker_handler, self._task_handler, 1900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._result_handler, self._cache), 1910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao exitpriority=15 1920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ) 1930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def _join_exited_workers(self): 1950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao """Cleanup after any worker processes which have exited due to reaching 1960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao their specified lifetime. Returns True if any workers were cleaned up. 1970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao """ 1980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao cleaned = False 1990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao for i in reversed(range(len(self._pool))): 2000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao worker = self._pool[i] 2010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if worker.exitcode is not None: 2020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # worker exited 2030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('cleaning up worker %d' % i) 2040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao worker.join() 2050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao cleaned = True 2060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao del self._pool[i] 2070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return cleaned 2080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def _repopulate_pool(self): 2100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao """Bring the number of pool processes up to the specified number, 2110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao for use after reaping workers which have exited. 2120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao """ 2130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao for i in range(self._processes - len(self._pool)): 2140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao w = self.Process(target=worker, 2150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao args=(self._inqueue, self._outqueue, 2160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._initializer, 2170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._initargs, self._maxtasksperchild) 2180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ) 2190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._pool.append(w) 2200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao w.name = w.name.replace('Process', 'PoolWorker') 2210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao w.daemon = True 2220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao w.start() 2230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('added worker') 2240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def _maintain_pool(self): 2260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao """Clean up any exited workers and start replacements for them. 2270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao """ 2280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if self._join_exited_workers(): 2290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._repopulate_pool() 2300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def _setup_queues(self): 2320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao from .queues import SimpleQueue 2330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._inqueue = SimpleQueue() 2340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._outqueue = SimpleQueue() 2350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._quick_put = self._inqueue._writer.send 2360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._quick_get = self._outqueue._reader.recv 2370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def apply(self, func, args=(), kwds={}): 2390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 2400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao Equivalent of `apply()` builtin 2410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 2420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao assert self._state == RUN 2430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return self.apply_async(func, args, kwds).get() 2440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def map(self, func, iterable, chunksize=None): 2460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 2470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao Equivalent of `map()` builtin 2480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 2490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao assert self._state == RUN 2500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return self.map_async(func, iterable, chunksize).get() 2510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def imap(self, func, iterable, chunksize=1): 2530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 2540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()` 2550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 2560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao assert self._state == RUN 2570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if chunksize == 1: 2580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao result = IMapIterator(self._cache) 2590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._taskqueue.put((((result._job, i, func, (x,), {}) 2600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao for i, x in enumerate(iterable)), result._set_length)) 2610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return result 2620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao else: 2630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao assert chunksize > 1 2640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao task_batches = Pool._get_tasks(func, iterable, chunksize) 2650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao result = IMapIterator(self._cache) 2660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._taskqueue.put((((result._job, i, mapstar, (x,), {}) 2670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao for i, x in enumerate(task_batches)), result._set_length)) 2680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return (item for chunk in result for item in chunk) 2690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def imap_unordered(self, func, iterable, chunksize=1): 2710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 2720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao Like `imap()` method but ordering of results is arbitrary 2730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 2740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao assert self._state == RUN 2750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if chunksize == 1: 2760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao result = IMapUnorderedIterator(self._cache) 2770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._taskqueue.put((((result._job, i, func, (x,), {}) 2780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao for i, x in enumerate(iterable)), result._set_length)) 2790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return result 2800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao else: 2810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao assert chunksize > 1 2820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao task_batches = Pool._get_tasks(func, iterable, chunksize) 2830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao result = IMapUnorderedIterator(self._cache) 2840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._taskqueue.put((((result._job, i, mapstar, (x,), {}) 2850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao for i, x in enumerate(task_batches)), result._set_length)) 2860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return (item for chunk in result for item in chunk) 2870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def apply_async(self, func, args=(), kwds={}, callback=None): 2890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 2900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao Asynchronous equivalent of `apply()` builtin 2910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 2920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao assert self._state == RUN 2930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao result = ApplyResult(self._cache, callback) 2940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) 2950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return result 2960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def map_async(self, func, iterable, chunksize=None, callback=None): 2980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 2990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao Asynchronous equivalent of `map()` builtin 3000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ''' 3010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao assert self._state == RUN 3020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if not hasattr(iterable, '__len__'): 3030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao iterable = list(iterable) 3040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if chunksize is None: 3060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao chunksize, extra = divmod(len(iterable), len(self._pool) * 4) 3070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if extra: 3080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao chunksize += 1 3090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if len(iterable) == 0: 3100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao chunksize = 0 3110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao task_batches = Pool._get_tasks(func, iterable, chunksize) 3130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao result = MapResult(self._cache, chunksize, len(iterable), callback) 3140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._taskqueue.put((((result._job, i, mapstar, (x,), {}) 3150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao for i, x in enumerate(task_batches)), None)) 3160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return result 3170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao @staticmethod 3190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def _handle_workers(pool): 3200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao thread = threading.current_thread() 3210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # Keep maintaining workers until the cache gets drained, unless the pool 3230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # is terminated. 3240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao while thread._state == RUN or (pool._cache and thread._state != TERMINATE): 3250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao pool._maintain_pool() 3260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao time.sleep(0.1) 3270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # send sentinel to stop workers 3280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao pool._taskqueue.put(None) 3290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('worker handler exiting') 3300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao @staticmethod 3320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def _handle_tasks(taskqueue, put, outqueue, pool): 3330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao thread = threading.current_thread() 3340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao for taskseq, set_length in iter(taskqueue.get, None): 3360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao i = -1 3370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao for i, task in enumerate(taskseq): 3380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if thread._state: 3390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('task handler found thread._state != RUN') 3400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao break 3410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 3420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao put(task) 3430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao except IOError: 3440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('could not put task on queue') 3450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao break 3460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao else: 3470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if set_length: 3480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('doing set_length()') 3490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao set_length(i+1) 3500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao continue 3510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao break 3520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao else: 3530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('task handler got sentinel') 3540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 3570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # tell result handler to finish when cache is empty 3580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('task handler sending sentinel to result handler') 3590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao outqueue.put(None) 3600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # tell workers there is no more work 3620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('task handler sending sentinel to workers') 3630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao for p in pool: 3640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao put(None) 3650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao except IOError: 3660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('task handler got IOError when sending sentinels') 3670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('task handler exiting') 3690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao @staticmethod 3710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def _handle_results(outqueue, get, cache): 3720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao thread = threading.current_thread() 3730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao while 1: 3750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 3760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao task = get() 3770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao except (IOError, EOFError): 3780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('result handler got EOFError/IOError -- exiting') 3790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return 3800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if thread._state: 3820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao assert thread._state == TERMINATE 3830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('result handler found thread._state=TERMINATE') 3840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao break 3850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if task is None: 3870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('result handler got sentinel') 3880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao break 3890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao job, i, obj = task 3910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 3920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao cache[job]._set(i, obj) 3930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao except KeyError: 3940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao pass 3950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 3960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao while cache and thread._state != TERMINATE: 3970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 3980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao task = get() 3990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao except (IOError, EOFError): 4000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('result handler got EOFError/IOError -- exiting') 4010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return 4020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 4030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if task is None: 4040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('result handler ignoring extra sentinel') 4050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao continue 4060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao job, i, obj = task 4070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 4080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao cache[job]._set(i, obj) 4090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao except KeyError: 4100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao pass 4110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 4120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if hasattr(outqueue, '_reader'): 4130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('ensuring that outqueue is not full') 4140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # If we don't make room available in outqueue then 4150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # attempts to add the sentinel (None) to outqueue may 4160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # block. There is guaranteed to be no more than 2 sentinels. 4170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 4180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao for i in range(10): 4190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if not outqueue._reader.poll(): 4200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao break 4210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao get() 4220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao except (IOError, EOFError): 4230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao pass 4240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 4250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('result handler exiting: len(cache)=%s, thread._state=%s', 4260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao len(cache), thread._state) 4270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 4280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao @staticmethod 4290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def _get_tasks(func, it, size): 4300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao it = iter(it) 4310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao while 1: 4320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao x = tuple(itertools.islice(it, size)) 4330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if not x: 4340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return 4350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao yield (func, x) 4360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 4370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def __reduce__(self): 4380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao raise NotImplementedError( 4390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 'pool objects cannot be passed between processes or pickled' 4400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ) 4410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 4420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def close(self): 4430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('closing pool') 4440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if self._state == RUN: 4450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._state = CLOSE 4460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._worker_handler._state = CLOSE 4470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 4480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def terminate(self): 4490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('terminating pool') 4500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._state = TERMINATE 4510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._worker_handler._state = TERMINATE 4520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._terminate() 4530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 4540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def join(self): 4550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('joining pool') 4560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao assert self._state in (CLOSE, TERMINATE) 4570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._worker_handler.join() 4580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._task_handler.join() 4590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._result_handler.join() 4600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao for p in self._pool: 4610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao p.join() 4620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 4630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao @staticmethod 4640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def _help_stuff_finish(inqueue, task_handler, size): 4650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # task_handler may be blocked trying to put items on inqueue 4660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('removing tasks from inqueue until task handler finished') 4670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao inqueue._rlock.acquire() 4680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao while task_handler.is_alive() and inqueue._reader.poll(): 4690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao inqueue._reader.recv() 4700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao time.sleep(0) 4710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 4720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao @classmethod 4730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, 4740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao worker_handler, task_handler, result_handler, cache): 4750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # this is guaranteed to only be called once 4760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('finalizing pool') 4770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 4780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao worker_handler._state = TERMINATE 4790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao task_handler._state = TERMINATE 4800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 4810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('helping task handler/workers to finish') 4820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao cls._help_stuff_finish(inqueue, task_handler, len(pool)) 4830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 4840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao assert result_handler.is_alive() or len(cache) == 0 4850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 4860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao result_handler._state = TERMINATE 4870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao outqueue.put(None) # sentinel 4880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 4890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # We must wait for the worker handler to exit before terminating 4900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # workers because we don't want workers to be restarted behind our back. 4910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('joining worker handler') 4920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if threading.current_thread() is not worker_handler: 4930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao worker_handler.join(1e100) 4940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 4950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # Terminate workers which haven't already finished. 4960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if pool and hasattr(pool[0], 'terminate'): 4970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('terminating workers') 4980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao for p in pool: 4990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if p.exitcode is None: 5000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao p.terminate() 5010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 5020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('joining task handler') 5030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if threading.current_thread() is not task_handler: 5040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao task_handler.join(1e100) 5050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 5060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('joining result handler') 5070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if threading.current_thread() is not result_handler: 5080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao result_handler.join(1e100) 5090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 5100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if pool and hasattr(pool[0], 'terminate'): 5110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('joining pool workers') 5120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao for p in pool: 5130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if p.is_alive(): 5140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # worker has not yet exited 5150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao debug('cleaning up worker %d' % p.pid) 5160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao p.join() 5170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 5180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 5190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Class whose instances are returned by `Pool.apply_async()` 5200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 5210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 5220a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass ApplyResult(object): 5230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 5240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def __init__(self, cache, callback): 5250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cond = threading.Condition(threading.Lock()) 5260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._job = job_counter.next() 5270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cache = cache 5280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._ready = False 5290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._callback = callback 5300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao cache[self._job] = self 5310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 5320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def ready(self): 5330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return self._ready 5340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 5350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def successful(self): 5360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao assert self._ready 5370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return self._success 5380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 5390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def wait(self, timeout=None): 5400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cond.acquire() 5410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 5420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if not self._ready: 5430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cond.wait(timeout) 5440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao finally: 5450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cond.release() 5460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 5470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def get(self, timeout=None): 5480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self.wait(timeout) 5490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if not self._ready: 5500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao raise TimeoutError 5510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if self._success: 5520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return self._value 5530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao else: 5540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao raise self._value 5550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 5560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def _set(self, i, obj): 5570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._success, self._value = obj 5580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if self._callback and self._success: 5590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._callback(self._value) 5600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cond.acquire() 5610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 5620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._ready = True 5630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cond.notify() 5640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao finally: 5650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cond.release() 5660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao del self._cache[self._job] 5670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 5680a8c90248264a8b26970b4473770bcc3df8515fJosh GaoAsyncResult = ApplyResult # create alias -- see #17805 5690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 5700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 5710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Class whose instances are returned by `Pool.map_async()` 5720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 5730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 5740a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass MapResult(ApplyResult): 5750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 5760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def __init__(self, cache, chunksize, length, callback): 5770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ApplyResult.__init__(self, cache, callback) 5780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._success = True 5790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._value = [None] * length 5800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._chunksize = chunksize 5810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if chunksize <= 0: 5820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._number_left = 0 5830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._ready = True 5840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao del cache[self._job] 5850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao else: 5860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._number_left = length//chunksize + bool(length % chunksize) 5870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 5880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def _set(self, i, success_result): 5890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao success, result = success_result 5900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if success: 5910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._value[i*self._chunksize:(i+1)*self._chunksize] = result 5920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._number_left -= 1 5930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if self._number_left == 0: 5940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if self._callback: 5950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._callback(self._value) 5960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao del self._cache[self._job] 5970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cond.acquire() 5980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 5990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._ready = True 6000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cond.notify() 6010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao finally: 6020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cond.release() 6030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 6040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao else: 6050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._success = False 6060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._value = result 6070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao del self._cache[self._job] 6080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cond.acquire() 6090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 6100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._ready = True 6110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cond.notify() 6120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao finally: 6130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cond.release() 6140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 6150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 6160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Class whose instances are returned by `Pool.imap()` 6170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 6180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 6190a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass IMapIterator(object): 6200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 6210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def __init__(self, cache): 6220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cond = threading.Condition(threading.Lock()) 6230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._job = job_counter.next() 6240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cache = cache 6250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._items = collections.deque() 6260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._index = 0 6270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._length = None 6280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._unsorted = {} 6290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao cache[self._job] = self 6300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 6310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def __iter__(self): 6320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return self 6330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 6340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def next(self, timeout=None): 6350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cond.acquire() 6360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 6370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 6380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao item = self._items.popleft() 6390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao except IndexError: 6400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if self._index == self._length: 6410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao raise StopIteration 6420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cond.wait(timeout) 6430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 6440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao item = self._items.popleft() 6450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao except IndexError: 6460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if self._index == self._length: 6470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao raise StopIteration 6480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao raise TimeoutError 6490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao finally: 6500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cond.release() 6510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 6520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao success, value = item 6530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if success: 6540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return value 6550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao raise value 6560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 6570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao __next__ = next # XXX 6580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 6590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def _set(self, i, obj): 6600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cond.acquire() 6610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 6620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if self._index == i: 6630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._items.append(obj) 6640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._index += 1 6650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao while self._index in self._unsorted: 6660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao obj = self._unsorted.pop(self._index) 6670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._items.append(obj) 6680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._index += 1 6690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cond.notify() 6700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao else: 6710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._unsorted[i] = obj 6720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 6730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if self._index == self._length: 6740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao del self._cache[self._job] 6750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao finally: 6760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cond.release() 6770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 6780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def _set_length(self, length): 6790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cond.acquire() 6800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 6810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._length = length 6820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if self._index == self._length: 6830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cond.notify() 6840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao del self._cache[self._job] 6850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao finally: 6860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cond.release() 6870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 6880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 6890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Class whose instances are returned by `Pool.imap_unordered()` 6900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 6910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 6920a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass IMapUnorderedIterator(IMapIterator): 6930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 6940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def _set(self, i, obj): 6950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cond.acquire() 6960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 6970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._items.append(obj) 6980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._index += 1 6990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cond.notify() 7000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if self._index == self._length: 7010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao del self._cache[self._job] 7020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao finally: 7030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._cond.release() 7040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 7050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 7060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 7070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 7080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 7090a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass ThreadPool(Pool): 7100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 7110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao from .dummy import Process 7120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 7130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def __init__(self, processes=None, initializer=None, initargs=()): 7140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao Pool.__init__(self, processes, initializer, initargs) 7150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 7160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def _setup_queues(self): 7170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._inqueue = Queue.Queue() 7180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._outqueue = Queue.Queue() 7190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._quick_put = self._inqueue.put 7200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._quick_get = self._outqueue.get 7210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 7220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao @staticmethod 7230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def _help_stuff_finish(inqueue, task_handler, size): 7240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # put sentinels at head of inqueue to make workers finish 7250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao inqueue.not_empty.acquire() 7260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 7270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao inqueue.queue.clear() 7280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao inqueue.queue.extend([None] * size) 7290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao inqueue.not_empty.notify_all() 7300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao finally: 7310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao inqueue.not_empty.release() 732