10a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
20a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Module providing the `Pool` class for managing a process pool
30a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
40a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# multiprocessing/pool.py
50a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
60a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Copyright (c) 2006-2008, R Oudkerk
70a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# All rights reserved.
80a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
90a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Redistribution and use in source and binary forms, with or without
100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# modification, are permitted provided that the following conditions
110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# are met:
120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 1. Redistributions of source code must retain the above copyright
140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#    notice, this list of conditions and the following disclaimer.
150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 2. Redistributions in binary form must reproduce the above copyright
160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#    notice, this list of conditions and the following disclaimer in the
170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#    documentation and/or other materials provided with the distribution.
180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# 3. Neither the name of author nor the names of any contributors may be
190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#    used to endorse or promote products derived from this software
200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#    without specific prior written permission.
210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# SUCH DAMAGE.
330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao__all__ = ['Pool']
360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Imports
390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
410a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport threading
420a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport Queue
430a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport itertools
440a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport collections
450a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport time
460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
470a8c90248264a8b26970b4473770bcc3df8515fJosh Gaofrom multiprocessing import Process, cpu_count, TimeoutError
480a8c90248264a8b26970b4473770bcc3df8515fJosh Gaofrom multiprocessing.util import Finalize, debug
490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Constants representing the state of a pool
520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
540a8c90248264a8b26970b4473770bcc3df8515fJosh GaoRUN = 0
550a8c90248264a8b26970b4473770bcc3df8515fJosh GaoCLOSE = 1
560a8c90248264a8b26970b4473770bcc3df8515fJosh GaoTERMINATE = 2
570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Miscellaneous
600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
620a8c90248264a8b26970b4473770bcc3df8515fJosh Gaojob_counter = itertools.count()
630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
640a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef mapstar(args):
650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    return map(*args)
660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Code run by worker processes
690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
710a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass MaybeEncodingError(Exception):
720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    """Wraps possible unpickleable errors, so they can be
730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    safely sent through the socket."""
740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def __init__(self, exc, value):
760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.exc = repr(exc)
770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.value = repr(value)
780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        super(MaybeEncodingError, self).__init__(self.exc, self.value)
790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def __str__(self):
810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return "Error sending result: '%s'. Reason: '%s'" % (self.value,
820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                                                             self.exc)
830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def __repr__(self):
850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return "<MaybeEncodingError: %s>" % str(self)
860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
880a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    put = outqueue.put
910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    get = inqueue.get
920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    if hasattr(inqueue, '_writer'):
930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        inqueue._writer.close()
940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        outqueue._reader.close()
950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    if initializer is not None:
970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        initializer(*initargs)
980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    completed = 0
1000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    while maxtasks is None or (maxtasks and completed < maxtasks):
1010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        try:
1020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            task = get()
1030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        except (EOFError, IOError):
1040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            debug('worker got EOFError or IOError -- exiting')
1050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            break
1060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if task is None:
1080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            debug('worker got sentinel -- exiting')
1090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            break
1100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        job, i, func, args, kwds = task
1120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        try:
1130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            result = (True, func(*args, **kwds))
1140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        except Exception, e:
1150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            result = (False, e)
1160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        try:
1170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            put((job, i, result))
1180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        except Exception as e:
1190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            wrapped = MaybeEncodingError(e, result[1])
1200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            debug("Possible encoding error while sending result: %s" % (
1210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                wrapped))
1220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            put((job, i, (False, wrapped)))
1230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        completed += 1
1240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    debug('worker exiting after %d tasks' % completed)
1250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
1270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Class representing a process pool
1280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
1290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1300a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass Pool(object):
1310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    '''
1320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    Class which supports an async version of the `apply()` builtin
1330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    '''
1340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    Process = Process
1350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def __init__(self, processes=None, initializer=None, initargs=(),
1370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                 maxtasksperchild=None):
1380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._setup_queues()
1390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._taskqueue = Queue.Queue()
1400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._cache = {}
1410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._state = RUN
1420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._maxtasksperchild = maxtasksperchild
1430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._initializer = initializer
1440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._initargs = initargs
1450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if processes is None:
1470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            try:
1480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                processes = cpu_count()
1490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            except NotImplementedError:
1500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                processes = 1
1510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if processes < 1:
1520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            raise ValueError("Number of processes must be at least 1")
1530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if initializer is not None and not hasattr(initializer, '__call__'):
1550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            raise TypeError('initializer must be a callable')
1560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._processes = processes
1580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._pool = []
1590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._repopulate_pool()
1600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._worker_handler = threading.Thread(
1620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            target=Pool._handle_workers,
1630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            args=(self, )
1640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            )
1650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._worker_handler.daemon = True
1660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._worker_handler._state = RUN
1670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._worker_handler.start()
1680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._task_handler = threading.Thread(
1710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            target=Pool._handle_tasks,
1720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
1730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            )
1740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._task_handler.daemon = True
1750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._task_handler._state = RUN
1760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._task_handler.start()
1770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._result_handler = threading.Thread(
1790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            target=Pool._handle_results,
1800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            args=(self._outqueue, self._quick_get, self._cache)
1810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            )
1820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._result_handler.daemon = True
1830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._result_handler._state = RUN
1840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._result_handler.start()
1850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._terminate = Finalize(
1870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self, self._terminate_pool,
1880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
1890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                  self._worker_handler, self._task_handler,
1900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                  self._result_handler, self._cache),
1910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            exitpriority=15
1920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            )
1930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _join_exited_workers(self):
1950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        """Cleanup after any worker processes which have exited due to reaching
1960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        their specified lifetime.  Returns True if any workers were cleaned up.
1970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        """
1980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        cleaned = False
1990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for i in reversed(range(len(self._pool))):
2000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            worker = self._pool[i]
2010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            if worker.exitcode is not None:
2020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                # worker exited
2030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                debug('cleaning up worker %d' % i)
2040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                worker.join()
2050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                cleaned = True
2060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                del self._pool[i]
2070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return cleaned
2080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _repopulate_pool(self):
2100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        """Bring the number of pool processes up to the specified number,
2110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for use after reaping workers which have exited.
2120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        """
2130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for i in range(self._processes - len(self._pool)):
2140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            w = self.Process(target=worker,
2150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                             args=(self._inqueue, self._outqueue,
2160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                                   self._initializer,
2170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                                   self._initargs, self._maxtasksperchild)
2180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                            )
2190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._pool.append(w)
2200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            w.name = w.name.replace('Process', 'PoolWorker')
2210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            w.daemon = True
2220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            w.start()
2230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            debug('added worker')
2240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _maintain_pool(self):
2260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        """Clean up any exited workers and start replacements for them.
2270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        """
2280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if self._join_exited_workers():
2290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._repopulate_pool()
2300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _setup_queues(self):
2320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        from .queues import SimpleQueue
2330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._inqueue = SimpleQueue()
2340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._outqueue = SimpleQueue()
2350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._quick_put = self._inqueue._writer.send
2360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._quick_get = self._outqueue._reader.recv
2370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def apply(self, func, args=(), kwds={}):
2390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        '''
2400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        Equivalent of `apply()` builtin
2410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        '''
2420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        assert self._state == RUN
2430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return self.apply_async(func, args, kwds).get()
2440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def map(self, func, iterable, chunksize=None):
2460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        '''
2470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        Equivalent of `map()` builtin
2480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        '''
2490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        assert self._state == RUN
2500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return self.map_async(func, iterable, chunksize).get()
2510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def imap(self, func, iterable, chunksize=1):
2530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        '''
2540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()`
2550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        '''
2560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        assert self._state == RUN
2570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if chunksize == 1:
2580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            result = IMapIterator(self._cache)
2590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._taskqueue.put((((result._job, i, func, (x,), {})
2600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                         for i, x in enumerate(iterable)), result._set_length))
2610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            return result
2620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        else:
2630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            assert chunksize > 1
2640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            task_batches = Pool._get_tasks(func, iterable, chunksize)
2650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            result = IMapIterator(self._cache)
2660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._taskqueue.put((((result._job, i, mapstar, (x,), {})
2670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                     for i, x in enumerate(task_batches)), result._set_length))
2680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            return (item for chunk in result for item in chunk)
2690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def imap_unordered(self, func, iterable, chunksize=1):
2710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        '''
2720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        Like `imap()` method but ordering of results is arbitrary
2730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        '''
2740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        assert self._state == RUN
2750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if chunksize == 1:
2760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            result = IMapUnorderedIterator(self._cache)
2770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._taskqueue.put((((result._job, i, func, (x,), {})
2780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                         for i, x in enumerate(iterable)), result._set_length))
2790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            return result
2800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        else:
2810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            assert chunksize > 1
2820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            task_batches = Pool._get_tasks(func, iterable, chunksize)
2830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            result = IMapUnorderedIterator(self._cache)
2840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._taskqueue.put((((result._job, i, mapstar, (x,), {})
2850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                     for i, x in enumerate(task_batches)), result._set_length))
2860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            return (item for chunk in result for item in chunk)
2870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def apply_async(self, func, args=(), kwds={}, callback=None):
2890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        '''
2900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        Asynchronous equivalent of `apply()` builtin
2910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        '''
2920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        assert self._state == RUN
2930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        result = ApplyResult(self._cache, callback)
2940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
2950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return result
2960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def map_async(self, func, iterable, chunksize=None, callback=None):
2980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        '''
2990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        Asynchronous equivalent of `map()` builtin
3000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        '''
3010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        assert self._state == RUN
3020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if not hasattr(iterable, '__len__'):
3030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            iterable = list(iterable)
3040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if chunksize is None:
3060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
3070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            if extra:
3080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                chunksize += 1
3090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if len(iterable) == 0:
3100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            chunksize = 0
3110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        task_batches = Pool._get_tasks(func, iterable, chunksize)
3130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        result = MapResult(self._cache, chunksize, len(iterable), callback)
3140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._taskqueue.put((((result._job, i, mapstar, (x,), {})
3150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                              for i, x in enumerate(task_batches)), None))
3160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return result
3170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @staticmethod
3190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _handle_workers(pool):
3200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        thread = threading.current_thread()
3210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # Keep maintaining workers until the cache gets drained, unless the pool
3230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # is terminated.
3240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
3250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            pool._maintain_pool()
3260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            time.sleep(0.1)
3270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # send sentinel to stop workers
3280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        pool._taskqueue.put(None)
3290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        debug('worker handler exiting')
3300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @staticmethod
3320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _handle_tasks(taskqueue, put, outqueue, pool):
3330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        thread = threading.current_thread()
3340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for taskseq, set_length in iter(taskqueue.get, None):
3360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            i = -1
3370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            for i, task in enumerate(taskseq):
3380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                if thread._state:
3390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    debug('task handler found thread._state != RUN')
3400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    break
3410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                try:
3420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    put(task)
3430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                except IOError:
3440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    debug('could not put task on queue')
3450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    break
3460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            else:
3470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                if set_length:
3480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    debug('doing set_length()')
3490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    set_length(i+1)
3500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                continue
3510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            break
3520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        else:
3530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            debug('task handler got sentinel')
3540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        try:
3570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            # tell result handler to finish when cache is empty
3580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            debug('task handler sending sentinel to result handler')
3590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            outqueue.put(None)
3600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            # tell workers there is no more work
3620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            debug('task handler sending sentinel to workers')
3630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            for p in pool:
3640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                put(None)
3650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        except IOError:
3660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            debug('task handler got IOError when sending sentinels')
3670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        debug('task handler exiting')
3690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @staticmethod
3710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _handle_results(outqueue, get, cache):
3720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        thread = threading.current_thread()
3730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        while 1:
3750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            try:
3760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                task = get()
3770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            except (IOError, EOFError):
3780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                debug('result handler got EOFError/IOError -- exiting')
3790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                return
3800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            if thread._state:
3820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                assert thread._state == TERMINATE
3830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                debug('result handler found thread._state=TERMINATE')
3840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                break
3850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            if task is None:
3870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                debug('result handler got sentinel')
3880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                break
3890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            job, i, obj = task
3910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            try:
3920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                cache[job]._set(i, obj)
3930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            except KeyError:
3940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                pass
3950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
3960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        while cache and thread._state != TERMINATE:
3970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            try:
3980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                task = get()
3990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            except (IOError, EOFError):
4000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                debug('result handler got EOFError/IOError -- exiting')
4010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                return
4020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            if task is None:
4040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                debug('result handler ignoring extra sentinel')
4050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                continue
4060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            job, i, obj = task
4070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            try:
4080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                cache[job]._set(i, obj)
4090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            except KeyError:
4100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                pass
4110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if hasattr(outqueue, '_reader'):
4130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            debug('ensuring that outqueue is not full')
4140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            # If we don't make room available in outqueue then
4150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            # attempts to add the sentinel (None) to outqueue may
4160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            # block.  There is guaranteed to be no more than 2 sentinels.
4170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            try:
4180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                for i in range(10):
4190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    if not outqueue._reader.poll():
4200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                        break
4210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    get()
4220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            except (IOError, EOFError):
4230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                pass
4240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        debug('result handler exiting: len(cache)=%s, thread._state=%s',
4260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao              len(cache), thread._state)
4270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @staticmethod
4290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _get_tasks(func, it, size):
4300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        it = iter(it)
4310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        while 1:
4320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            x = tuple(itertools.islice(it, size))
4330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            if not x:
4340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                return
4350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            yield (func, x)
4360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def __reduce__(self):
4380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        raise NotImplementedError(
4390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao              'pool objects cannot be passed between processes or pickled'
4400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao              )
4410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def close(self):
4430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        debug('closing pool')
4440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if self._state == RUN:
4450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._state = CLOSE
4460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._worker_handler._state = CLOSE
4470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def terminate(self):
4490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        debug('terminating pool')
4500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._state = TERMINATE
4510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._worker_handler._state = TERMINATE
4520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._terminate()
4530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def join(self):
4550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        debug('joining pool')
4560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        assert self._state in (CLOSE, TERMINATE)
4570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._worker_handler.join()
4580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._task_handler.join()
4590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._result_handler.join()
4600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for p in self._pool:
4610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            p.join()
4620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @staticmethod
4640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _help_stuff_finish(inqueue, task_handler, size):
4650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # task_handler may be blocked trying to put items on inqueue
4660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        debug('removing tasks from inqueue until task handler finished')
4670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        inqueue._rlock.acquire()
4680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        while task_handler.is_alive() and inqueue._reader.poll():
4690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            inqueue._reader.recv()
4700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            time.sleep(0)
4710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @classmethod
4730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
4740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                        worker_handler, task_handler, result_handler, cache):
4750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # this is guaranteed to only be called once
4760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        debug('finalizing pool')
4770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        worker_handler._state = TERMINATE
4790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        task_handler._state = TERMINATE
4800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        debug('helping task handler/workers to finish')
4820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        cls._help_stuff_finish(inqueue, task_handler, len(pool))
4830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        assert result_handler.is_alive() or len(cache) == 0
4850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        result_handler._state = TERMINATE
4870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        outqueue.put(None)                  # sentinel
4880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # We must wait for the worker handler to exit before terminating
4900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # workers because we don't want workers to be restarted behind our back.
4910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        debug('joining worker handler')
4920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if threading.current_thread() is not worker_handler:
4930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            worker_handler.join(1e100)
4940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
4950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # Terminate workers which haven't already finished.
4960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if pool and hasattr(pool[0], 'terminate'):
4970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            debug('terminating workers')
4980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            for p in pool:
4990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                if p.exitcode is None:
5000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    p.terminate()
5010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        debug('joining task handler')
5030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if threading.current_thread() is not task_handler:
5040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            task_handler.join(1e100)
5050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        debug('joining result handler')
5070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if threading.current_thread() is not result_handler:
5080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            result_handler.join(1e100)
5090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if pool and hasattr(pool[0], 'terminate'):
5110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            debug('joining pool workers')
5120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            for p in pool:
5130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                if p.is_alive():
5140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    # worker has not yet exited
5150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    debug('cleaning up worker %d' % p.pid)
5160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    p.join()
5170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
5190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Class whose instances are returned by `Pool.apply_async()`
5200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
5210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5220a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass ApplyResult(object):
5230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def __init__(self, cache, callback):
5250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._cond = threading.Condition(threading.Lock())
5260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._job = job_counter.next()
5270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._cache = cache
5280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._ready = False
5290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._callback = callback
5300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        cache[self._job] = self
5310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def ready(self):
5330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return self._ready
5340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def successful(self):
5360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        assert self._ready
5370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return self._success
5380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def wait(self, timeout=None):
5400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._cond.acquire()
5410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        try:
5420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            if not self._ready:
5430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self._cond.wait(timeout)
5440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        finally:
5450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._cond.release()
5460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def get(self, timeout=None):
5480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.wait(timeout)
5490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if not self._ready:
5500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            raise TimeoutError
5510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if self._success:
5520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            return self._value
5530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        else:
5540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            raise self._value
5550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _set(self, i, obj):
5570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._success, self._value = obj
5580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if self._callback and self._success:
5590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._callback(self._value)
5600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._cond.acquire()
5610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        try:
5620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._ready = True
5630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._cond.notify()
5640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        finally:
5650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._cond.release()
5660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        del self._cache[self._job]
5670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5680a8c90248264a8b26970b4473770bcc3df8515fJosh GaoAsyncResult = ApplyResult       # create alias -- see #17805
5690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
5710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Class whose instances are returned by `Pool.map_async()`
5720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
5730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5740a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass MapResult(ApplyResult):
5750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def __init__(self, cache, chunksize, length, callback):
5770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        ApplyResult.__init__(self, cache, callback)
5780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._success = True
5790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._value = [None] * length
5800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._chunksize = chunksize
5810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if chunksize <= 0:
5820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._number_left = 0
5830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._ready = True
5840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            del cache[self._job]
5850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        else:
5860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._number_left = length//chunksize + bool(length % chunksize)
5870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
5880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _set(self, i, success_result):
5890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        success, result = success_result
5900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if success:
5910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._value[i*self._chunksize:(i+1)*self._chunksize] = result
5920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._number_left -= 1
5930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            if self._number_left == 0:
5940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                if self._callback:
5950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    self._callback(self._value)
5960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                del self._cache[self._job]
5970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self._cond.acquire()
5980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                try:
5990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    self._ready = True
6000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    self._cond.notify()
6010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                finally:
6020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    self._cond.release()
6030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        else:
6050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._success = False
6060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._value = result
6070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            del self._cache[self._job]
6080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._cond.acquire()
6090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            try:
6100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self._ready = True
6110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self._cond.notify()
6120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            finally:
6130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self._cond.release()
6140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
6160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Class whose instances are returned by `Pool.imap()`
6170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
6180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6190a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass IMapIterator(object):
6200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def __init__(self, cache):
6220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._cond = threading.Condition(threading.Lock())
6230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._job = job_counter.next()
6240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._cache = cache
6250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._items = collections.deque()
6260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._index = 0
6270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._length = None
6280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._unsorted = {}
6290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        cache[self._job] = self
6300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def __iter__(self):
6320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return self
6330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def next(self, timeout=None):
6350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._cond.acquire()
6360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        try:
6370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            try:
6380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                item = self._items.popleft()
6390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            except IndexError:
6400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                if self._index == self._length:
6410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    raise StopIteration
6420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self._cond.wait(timeout)
6430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                try:
6440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    item = self._items.popleft()
6450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                except IndexError:
6460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    if self._index == self._length:
6470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                        raise StopIteration
6480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    raise TimeoutError
6490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        finally:
6500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._cond.release()
6510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        success, value = item
6530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if success:
6540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            return value
6550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        raise value
6560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    __next__ = next                    # XXX
6580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _set(self, i, obj):
6600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._cond.acquire()
6610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        try:
6620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            if self._index == i:
6630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self._items.append(obj)
6640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self._index += 1
6650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                while self._index in self._unsorted:
6660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    obj = self._unsorted.pop(self._index)
6670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    self._items.append(obj)
6680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    self._index += 1
6690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self._cond.notify()
6700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            else:
6710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self._unsorted[i] = obj
6720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            if self._index == self._length:
6740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                del self._cache[self._job]
6750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        finally:
6760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._cond.release()
6770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _set_length(self, length):
6790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._cond.acquire()
6800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        try:
6810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._length = length
6820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            if self._index == self._length:
6830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self._cond.notify()
6840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                del self._cache[self._job]
6850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        finally:
6860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._cond.release()
6870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
6890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# Class whose instances are returned by `Pool.imap_unordered()`
6900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
6910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6920a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass IMapUnorderedIterator(IMapIterator):
6930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
6940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _set(self, i, obj):
6950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._cond.acquire()
6960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        try:
6970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._items.append(obj)
6980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._index += 1
6990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._cond.notify()
7000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            if self._index == self._length:
7010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                del self._cache[self._job]
7020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        finally:
7030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self._cond.release()
7040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
7060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
7070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao#
7080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7090a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass ThreadPool(Pool):
7100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    from .dummy import Process
7120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def __init__(self, processes=None, initializer=None, initargs=()):
7140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        Pool.__init__(self, processes, initializer, initargs)
7150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _setup_queues(self):
7170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._inqueue = Queue.Queue()
7180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._outqueue = Queue.Queue()
7190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._quick_put = self._inqueue.put
7200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._quick_get = self._outqueue.get
7210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
7220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @staticmethod
7230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _help_stuff_finish(inqueue, task_handler, size):
7240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # put sentinels at head of inqueue to make workers finish
7250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        inqueue.not_empty.acquire()
7260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        try:
7270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            inqueue.queue.clear()
7280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            inqueue.queue.extend([None] * size)
7290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            inqueue.not_empty.notify_all()
7300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        finally:
7310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            inqueue.not_empty.release()
732