14adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
24adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Module providing the `Pool` class for managing a process pool
34adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
44adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# multiprocessing/pool.py
54adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
64adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Copyright (c) 2006-2008, R Oudkerk
74adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# All rights reserved.
84adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
94adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Redistribution and use in source and binary forms, with or without
104adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# modification, are permitted provided that the following conditions
114adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# are met:
124adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
134adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 1. Redistributions of source code must retain the above copyright
144adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#    notice, this list of conditions and the following disclaimer.
154adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 2. Redistributions in binary form must reproduce the above copyright
164adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#    notice, this list of conditions and the following disclaimer in the
174adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#    documentation and/or other materials provided with the distribution.
184adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 3. Neither the name of author nor the names of any contributors may be
194adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#    used to endorse or promote products derived from this software
204adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#    without specific prior written permission.
214adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
224adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
234adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
244adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
254adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
264adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
274adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
284adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
294adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
304adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
314adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
324adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# SUCH DAMAGE.
334adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
344adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
354adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao__all__ = ['Pool']
364adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
374adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
384adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Imports
394adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
404adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
414adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoimport threading
424adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoimport Queue
434adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoimport itertools
444adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoimport collections
454adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoimport time
464adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
474adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaofrom multiprocessing import Process, cpu_count, TimeoutError
484adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaofrom multiprocessing.util import Finalize, debug
494adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
504adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
514adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Constants representing the state of a pool
524adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
534adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
544adfde8bc82dd39f59e0445588c3e599ada477dJosh GaoRUN = 0
554adfde8bc82dd39f59e0445588c3e599ada477dJosh GaoCLOSE = 1
564adfde8bc82dd39f59e0445588c3e599ada477dJosh GaoTERMINATE = 2
574adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
584adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
594adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Miscellaneous
604adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
614adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
624adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaojob_counter = itertools.count()
634adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
644adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaodef mapstar(args):
654adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    return map(*args)
664adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
674adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
684adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Code run by worker processes
694adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
704adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
714adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoclass MaybeEncodingError(Exception):
724adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    """Wraps possible unpickleable errors, so they can be
734adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    safely sent through the socket."""
744adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
754adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def __init__(self, exc, value):
764adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self.exc = repr(exc)
774adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self.value = repr(value)
784adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        super(MaybeEncodingError, self).__init__(self.exc, self.value)
794adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
804adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def __str__(self):
814adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        return "Error sending result: '%s'. Reason: '%s'" % (self.value,
824adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                                                             self.exc)
834adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
844adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def __repr__(self):
854adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        return "<MaybeEncodingError: %s>" % str(self)
864adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
874adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
884adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaodef worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
894adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
904adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    put = outqueue.put
914adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    get = inqueue.get
924adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    if hasattr(inqueue, '_writer'):
934adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        inqueue._writer.close()
944adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        outqueue._reader.close()
954adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
964adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    if initializer is not None:
974adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        initializer(*initargs)
984adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
994adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    completed = 0
1004adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    while maxtasks is None or (maxtasks and completed < maxtasks):
1014adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        try:
1024adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            task = get()
1034adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        except (EOFError, IOError):
1044adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            debug('worker got EOFError or IOError -- exiting')
1054adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            break
1064adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1074adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if task is None:
1084adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            debug('worker got sentinel -- exiting')
1094adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            break
1104adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1114adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        job, i, func, args, kwds = task
1124adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        try:
1134adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            result = (True, func(*args, **kwds))
1144adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        except Exception, e:
1154adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            result = (False, e)
1164adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        try:
1174adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            put((job, i, result))
1184adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        except Exception as e:
1194adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            wrapped = MaybeEncodingError(e, result[1])
1204adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            debug("Possible encoding error while sending result: %s" % (
1214adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                wrapped))
1224adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            put((job, i, (False, wrapped)))
1234adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        completed += 1
1244adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    debug('worker exiting after %d tasks' % completed)
1254adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1264adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
1274adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Class representing a process pool
1284adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
1294adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1304adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoclass Pool(object):
1314adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    '''
1324adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    Class which supports an async version of the `apply()` builtin
1334adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    '''
1344adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    Process = Process
1354adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1364adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def __init__(self, processes=None, initializer=None, initargs=(),
1374adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                 maxtasksperchild=None):
1384adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._setup_queues()
1394adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._taskqueue = Queue.Queue()
1404adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._cache = {}
1414adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._state = RUN
1424adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._maxtasksperchild = maxtasksperchild
1434adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._initializer = initializer
1444adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._initargs = initargs
1454adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1464adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if processes is None:
1474adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            try:
1484adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                processes = cpu_count()
1494adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            except NotImplementedError:
1504adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                processes = 1
1514adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if processes < 1:
1524adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            raise ValueError("Number of processes must be at least 1")
1534adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1544adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if initializer is not None and not hasattr(initializer, '__call__'):
1554adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            raise TypeError('initializer must be a callable')
1564adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1574adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._processes = processes
1584adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._pool = []
1594adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._repopulate_pool()
1604adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1614adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._worker_handler = threading.Thread(
1624adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            target=Pool._handle_workers,
1634adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            args=(self, )
1644adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            )
1654adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._worker_handler.daemon = True
1664adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._worker_handler._state = RUN
1674adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._worker_handler.start()
1684adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1694adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1704adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._task_handler = threading.Thread(
1714adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            target=Pool._handle_tasks,
1724adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
1734adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            )
1744adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._task_handler.daemon = True
1754adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._task_handler._state = RUN
1764adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._task_handler.start()
1774adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1784adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._result_handler = threading.Thread(
1794adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            target=Pool._handle_results,
1804adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            args=(self._outqueue, self._quick_get, self._cache)
1814adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            )
1824adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._result_handler.daemon = True
1834adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._result_handler._state = RUN
1844adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._result_handler.start()
1854adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1864adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._terminate = Finalize(
1874adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self, self._terminate_pool,
1884adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
1894adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                  self._worker_handler, self._task_handler,
1904adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                  self._result_handler, self._cache),
1914adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            exitpriority=15
1924adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            )
1934adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1944adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def _join_exited_workers(self):
1954adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        """Cleanup after any worker processes which have exited due to reaching
1964adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        their specified lifetime.  Returns True if any workers were cleaned up.
1974adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        """
1984adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        cleaned = False
1994adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        for i in reversed(range(len(self._pool))):
2004adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            worker = self._pool[i]
2014adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            if worker.exitcode is not None:
2024adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                # worker exited
2034adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                debug('cleaning up worker %d' % i)
2044adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                worker.join()
2054adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                cleaned = True
2064adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                del self._pool[i]
2074adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        return cleaned
2084adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
2094adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def _repopulate_pool(self):
2104adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        """Bring the number of pool processes up to the specified number,
2114adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        for use after reaping workers which have exited.
2124adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        """
2134adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        for i in range(self._processes - len(self._pool)):
2144adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            w = self.Process(target=worker,
2154adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                             args=(self._inqueue, self._outqueue,
2164adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                                   self._initializer,
2174adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                                   self._initargs, self._maxtasksperchild)
2184adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                            )
2194adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._pool.append(w)
2204adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            w.name = w.name.replace('Process', 'PoolWorker')
2214adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            w.daemon = True
2224adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            w.start()
2234adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            debug('added worker')
2244adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
2254adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def _maintain_pool(self):
2264adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        """Clean up any exited workers and start replacements for them.
2274adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        """
2284adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if self._join_exited_workers():
2294adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._repopulate_pool()
2304adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
2314adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def _setup_queues(self):
2324adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        from .queues import SimpleQueue
2334adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._inqueue = SimpleQueue()
2344adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._outqueue = SimpleQueue()
2354adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._quick_put = self._inqueue._writer.send
2364adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._quick_get = self._outqueue._reader.recv
2374adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
2384adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def apply(self, func, args=(), kwds={}):
2394adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        '''
2404adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        Equivalent of `apply()` builtin
2414adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        '''
2424adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        assert self._state == RUN
2434adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        return self.apply_async(func, args, kwds).get()
2444adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
2454adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def map(self, func, iterable, chunksize=None):
2464adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        '''
2474adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        Equivalent of `map()` builtin
2484adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        '''
2494adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        assert self._state == RUN
2504adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        return self.map_async(func, iterable, chunksize).get()
2514adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
2524adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def imap(self, func, iterable, chunksize=1):
2534adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        '''
2544adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()`
2554adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        '''
2564adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        assert self._state == RUN
2574adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if chunksize == 1:
2584adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            result = IMapIterator(self._cache)
2594adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._taskqueue.put((((result._job, i, func, (x,), {})
2604adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                         for i, x in enumerate(iterable)), result._set_length))
2614adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            return result
2624adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        else:
2634adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            assert chunksize > 1
2644adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            task_batches = Pool._get_tasks(func, iterable, chunksize)
2654adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            result = IMapIterator(self._cache)
2664adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._taskqueue.put((((result._job, i, mapstar, (x,), {})
2674adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                     for i, x in enumerate(task_batches)), result._set_length))
2684adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            return (item for chunk in result for item in chunk)
2694adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
2704adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def imap_unordered(self, func, iterable, chunksize=1):
2714adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        '''
2724adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        Like `imap()` method but ordering of results is arbitrary
2734adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        '''
2744adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        assert self._state == RUN
2754adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if chunksize == 1:
2764adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            result = IMapUnorderedIterator(self._cache)
2774adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._taskqueue.put((((result._job, i, func, (x,), {})
2784adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                         for i, x in enumerate(iterable)), result._set_length))
2794adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            return result
2804adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        else:
2814adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            assert chunksize > 1
2824adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            task_batches = Pool._get_tasks(func, iterable, chunksize)
2834adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            result = IMapUnorderedIterator(self._cache)
2844adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._taskqueue.put((((result._job, i, mapstar, (x,), {})
2854adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                     for i, x in enumerate(task_batches)), result._set_length))
2864adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            return (item for chunk in result for item in chunk)
2874adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
2884adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def apply_async(self, func, args=(), kwds={}, callback=None):
2894adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        '''
2904adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        Asynchronous equivalent of `apply()` builtin
2914adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        '''
2924adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        assert self._state == RUN
2934adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        result = ApplyResult(self._cache, callback)
2944adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
2954adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        return result
2964adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
2974adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def map_async(self, func, iterable, chunksize=None, callback=None):
2984adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        '''
2994adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        Asynchronous equivalent of `map()` builtin
3004adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        '''
3014adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        assert self._state == RUN
3024adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if not hasattr(iterable, '__len__'):
3034adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            iterable = list(iterable)
3044adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3054adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if chunksize is None:
3064adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
3074adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            if extra:
3084adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                chunksize += 1
3094adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if len(iterable) == 0:
3104adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            chunksize = 0
3114adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3124adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        task_batches = Pool._get_tasks(func, iterable, chunksize)
3134adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        result = MapResult(self._cache, chunksize, len(iterable), callback)
3144adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._taskqueue.put((((result._job, i, mapstar, (x,), {})
3154adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                              for i, x in enumerate(task_batches)), None))
3164adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        return result
3174adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3184adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    @staticmethod
3194adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def _handle_workers(pool):
3204adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        thread = threading.current_thread()
3214adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3224adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        # Keep maintaining workers until the cache gets drained, unless the pool
3234adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        # is terminated.
3244adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
3254adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            pool._maintain_pool()
3264adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            time.sleep(0.1)
3274adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        # send sentinel to stop workers
3284adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        pool._taskqueue.put(None)
3294adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        debug('worker handler exiting')
3304adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3314adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    @staticmethod
3324adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def _handle_tasks(taskqueue, put, outqueue, pool):
3334adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        thread = threading.current_thread()
3344adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3354adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        for taskseq, set_length in iter(taskqueue.get, None):
3364adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            i = -1
3374adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            for i, task in enumerate(taskseq):
3384adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                if thread._state:
3394adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    debug('task handler found thread._state != RUN')
3404adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    break
3414adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                try:
3424adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    put(task)
3434adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                except IOError:
3444adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    debug('could not put task on queue')
3454adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    break
3464adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            else:
3474adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                if set_length:
3484adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    debug('doing set_length()')
3494adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    set_length(i+1)
3504adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                continue
3514adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            break
3524adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        else:
3534adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            debug('task handler got sentinel')
3544adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3554adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3564adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        try:
3574adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            # tell result handler to finish when cache is empty
3584adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            debug('task handler sending sentinel to result handler')
3594adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            outqueue.put(None)
3604adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3614adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            # tell workers there is no more work
3624adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            debug('task handler sending sentinel to workers')
3634adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            for p in pool:
3644adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                put(None)
3654adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        except IOError:
3664adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            debug('task handler got IOError when sending sentinels')
3674adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3684adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        debug('task handler exiting')
3694adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3704adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    @staticmethod
3714adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def _handle_results(outqueue, get, cache):
3724adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        thread = threading.current_thread()
3734adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3744adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        while 1:
3754adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            try:
3764adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                task = get()
3774adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            except (IOError, EOFError):
3784adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                debug('result handler got EOFError/IOError -- exiting')
3794adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                return
3804adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3814adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            if thread._state:
3824adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                assert thread._state == TERMINATE
3834adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                debug('result handler found thread._state=TERMINATE')
3844adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                break
3854adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3864adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            if task is None:
3874adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                debug('result handler got sentinel')
3884adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                break
3894adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3904adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            job, i, obj = task
3914adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            try:
3924adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                cache[job]._set(i, obj)
3934adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            except KeyError:
3944adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                pass
3954adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3964adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        while cache and thread._state != TERMINATE:
3974adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            try:
3984adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                task = get()
3994adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            except (IOError, EOFError):
4004adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                debug('result handler got EOFError/IOError -- exiting')
4014adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                return
4024adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
4034adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            if task is None:
4044adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                debug('result handler ignoring extra sentinel')
4054adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                continue
4064adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            job, i, obj = task
4074adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            try:
4084adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                cache[job]._set(i, obj)
4094adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            except KeyError:
4104adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                pass
4114adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
4124adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if hasattr(outqueue, '_reader'):
4134adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            debug('ensuring that outqueue is not full')
4144adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            # If we don't make room available in outqueue then
4154adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            # attempts to add the sentinel (None) to outqueue may
4164adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            # block.  There is guaranteed to be no more than 2 sentinels.
4174adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            try:
4184adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                for i in range(10):
4194adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    if not outqueue._reader.poll():
4204adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                        break
4214adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    get()
4224adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            except (IOError, EOFError):
4234adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                pass
4244adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
4254adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        debug('result handler exiting: len(cache)=%s, thread._state=%s',
4264adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao              len(cache), thread._state)
4274adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
4284adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    @staticmethod
4294adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def _get_tasks(func, it, size):
4304adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        it = iter(it)
4314adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        while 1:
4324adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            x = tuple(itertools.islice(it, size))
4334adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            if not x:
4344adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                return
4354adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            yield (func, x)
4364adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
4374adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def __reduce__(self):
4384adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        raise NotImplementedError(
4394adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao              'pool objects cannot be passed between processes or pickled'
4404adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao              )
4414adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
4424adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def close(self):
4434adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        debug('closing pool')
4444adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if self._state == RUN:
4454adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._state = CLOSE
4464adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._worker_handler._state = CLOSE
4474adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
4484adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def terminate(self):
4494adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        debug('terminating pool')
4504adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._state = TERMINATE
4514adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._worker_handler._state = TERMINATE
4524adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._terminate()
4534adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
4544adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def join(self):
4554adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        debug('joining pool')
4564adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        assert self._state in (CLOSE, TERMINATE)
4574adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._worker_handler.join()
4584adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._task_handler.join()
4594adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._result_handler.join()
4604adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        for p in self._pool:
4614adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            p.join()
4624adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
4634adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    @staticmethod
4644adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def _help_stuff_finish(inqueue, task_handler, size):
4654adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        # task_handler may be blocked trying to put items on inqueue
4664adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        debug('removing tasks from inqueue until task handler finished')
4674adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        inqueue._rlock.acquire()
4684adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        while task_handler.is_alive() and inqueue._reader.poll():
4694adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            inqueue._reader.recv()
4704adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            time.sleep(0)
4714adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
4724adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    @classmethod
4734adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
4744adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                        worker_handler, task_handler, result_handler, cache):
4754adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        # this is guaranteed to only be called once
4764adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        debug('finalizing pool')
4774adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
4784adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        worker_handler._state = TERMINATE
4794adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        task_handler._state = TERMINATE
4804adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
4814adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        debug('helping task handler/workers to finish')
4824adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        cls._help_stuff_finish(inqueue, task_handler, len(pool))
4834adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
4844adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        assert result_handler.is_alive() or len(cache) == 0
4854adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
4864adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        result_handler._state = TERMINATE
4874adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        outqueue.put(None)                  # sentinel
4884adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
4894adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        # We must wait for the worker handler to exit before terminating
4904adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        # workers because we don't want workers to be restarted behind our back.
4914adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        debug('joining worker handler')
4924adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if threading.current_thread() is not worker_handler:
4934adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            worker_handler.join(1e100)
4944adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
4954adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        # Terminate workers which haven't already finished.
4964adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if pool and hasattr(pool[0], 'terminate'):
4974adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            debug('terminating workers')
4984adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            for p in pool:
4994adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                if p.exitcode is None:
5004adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    p.terminate()
5014adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
5024adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        debug('joining task handler')
5034adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if threading.current_thread() is not task_handler:
5044adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            task_handler.join(1e100)
5054adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
5064adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        debug('joining result handler')
5074adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if threading.current_thread() is not result_handler:
5084adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            result_handler.join(1e100)
5094adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
5104adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if pool and hasattr(pool[0], 'terminate'):
5114adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            debug('joining pool workers')
5124adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            for p in pool:
5134adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                if p.is_alive():
5144adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    # worker has not yet exited
5154adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    debug('cleaning up worker %d' % p.pid)
5164adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    p.join()
5174adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
5184adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
5194adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Class whose instances are returned by `Pool.apply_async()`
5204adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
5214adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
5224adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoclass ApplyResult(object):
5234adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
5244adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def __init__(self, cache, callback):
5254adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._cond = threading.Condition(threading.Lock())
5264adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._job = job_counter.next()
5274adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._cache = cache
5284adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._ready = False
5294adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._callback = callback
5304adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        cache[self._job] = self
5314adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
5324adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def ready(self):
5334adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        return self._ready
5344adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
5354adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def successful(self):
5364adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        assert self._ready
5374adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        return self._success
5384adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
5394adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def wait(self, timeout=None):
5404adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._cond.acquire()
5414adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        try:
5424adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            if not self._ready:
5434adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                self._cond.wait(timeout)
5444adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        finally:
5454adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._cond.release()
5464adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
5474adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def get(self, timeout=None):
5484adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self.wait(timeout)
5494adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if not self._ready:
5504adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            raise TimeoutError
5514adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if self._success:
5524adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            return self._value
5534adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        else:
5544adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            raise self._value
5554adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
5564adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def _set(self, i, obj):
5574adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._success, self._value = obj
5584adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if self._callback and self._success:
5594adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._callback(self._value)
5604adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._cond.acquire()
5614adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        try:
5624adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._ready = True
5634adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._cond.notify()
5644adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        finally:
5654adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._cond.release()
5664adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        del self._cache[self._job]
5674adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
5684adfde8bc82dd39f59e0445588c3e599ada477dJosh GaoAsyncResult = ApplyResult       # create alias -- see #17805
5694adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
5704adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
5714adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Class whose instances are returned by `Pool.map_async()`
5724adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
5734adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
5744adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoclass MapResult(ApplyResult):
5754adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
5764adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def __init__(self, cache, chunksize, length, callback):
5774adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        ApplyResult.__init__(self, cache, callback)
5784adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._success = True
5794adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._value = [None] * length
5804adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._chunksize = chunksize
5814adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if chunksize <= 0:
5824adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._number_left = 0
5834adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._ready = True
5844adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            del cache[self._job]
5854adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        else:
5864adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._number_left = length//chunksize + bool(length % chunksize)
5874adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
5884adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def _set(self, i, success_result):
5894adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        success, result = success_result
5904adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if success:
5914adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._value[i*self._chunksize:(i+1)*self._chunksize] = result
5924adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._number_left -= 1
5934adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            if self._number_left == 0:
5944adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                if self._callback:
5954adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    self._callback(self._value)
5964adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                del self._cache[self._job]
5974adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                self._cond.acquire()
5984adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                try:
5994adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    self._ready = True
6004adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    self._cond.notify()
6014adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                finally:
6024adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    self._cond.release()
6034adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
6044adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        else:
6054adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._success = False
6064adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._value = result
6074adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            del self._cache[self._job]
6084adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._cond.acquire()
6094adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            try:
6104adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                self._ready = True
6114adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                self._cond.notify()
6124adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            finally:
6134adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                self._cond.release()
6144adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
6154adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
6164adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Class whose instances are returned by `Pool.imap()`
6174adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
6184adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
6194adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoclass IMapIterator(object):
6204adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
6214adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def __init__(self, cache):
6224adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._cond = threading.Condition(threading.Lock())
6234adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._job = job_counter.next()
6244adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._cache = cache
6254adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._items = collections.deque()
6264adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._index = 0
6274adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._length = None
6284adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._unsorted = {}
6294adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        cache[self._job] = self
6304adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
6314adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def __iter__(self):
6324adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        return self
6334adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
6344adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def next(self, timeout=None):
6354adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._cond.acquire()
6364adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        try:
6374adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            try:
6384adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                item = self._items.popleft()
6394adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            except IndexError:
6404adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                if self._index == self._length:
6414adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    raise StopIteration
6424adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                self._cond.wait(timeout)
6434adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                try:
6444adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    item = self._items.popleft()
6454adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                except IndexError:
6464adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    if self._index == self._length:
6474adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                        raise StopIteration
6484adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    raise TimeoutError
6494adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        finally:
6504adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._cond.release()
6514adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
6524adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        success, value = item
6534adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if success:
6544adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            return value
6554adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        raise value
6564adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
6574adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    __next__ = next                    # XXX
6584adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
6594adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def _set(self, i, obj):
6604adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._cond.acquire()
6614adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        try:
6624adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            if self._index == i:
6634adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                self._items.append(obj)
6644adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                self._index += 1
6654adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                while self._index in self._unsorted:
6664adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    obj = self._unsorted.pop(self._index)
6674adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    self._items.append(obj)
6684adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    self._index += 1
6694adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                self._cond.notify()
6704adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            else:
6714adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                self._unsorted[i] = obj
6724adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
6734adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            if self._index == self._length:
6744adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                del self._cache[self._job]
6754adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        finally:
6764adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._cond.release()
6774adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
6784adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def _set_length(self, length):
6794adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._cond.acquire()
6804adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        try:
6814adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._length = length
6824adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            if self._index == self._length:
6834adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                self._cond.notify()
6844adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                del self._cache[self._job]
6854adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        finally:
6864adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._cond.release()
6874adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
6884adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
6894adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Class whose instances are returned by `Pool.imap_unordered()`
6904adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
6914adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
6924adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoclass IMapUnorderedIterator(IMapIterator):
6934adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
6944adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def _set(self, i, obj):
6954adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._cond.acquire()
6964adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        try:
6974adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._items.append(obj)
6984adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._index += 1
6994adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._cond.notify()
7004adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            if self._index == self._length:
7014adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                del self._cache[self._job]
7024adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        finally:
7034adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._cond.release()
7044adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
7054adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
7064adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
7074adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
7084adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
7094adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoclass ThreadPool(Pool):
7104adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
7114adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    from .dummy import Process
7124adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
7134adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def __init__(self, processes=None, initializer=None, initargs=()):
7144adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        Pool.__init__(self, processes, initializer, initargs)
7154adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
7164adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def _setup_queues(self):
7174adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._inqueue = Queue.Queue()
7184adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._outqueue = Queue.Queue()
7194adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._quick_put = self._inqueue.put
7204adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._quick_get = self._outqueue.get
7214adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
7224adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    @staticmethod
7234adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def _help_stuff_finish(inqueue, task_handler, size):
7244adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        # put sentinels at head of inqueue to make workers finish
7254adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        inqueue.not_empty.acquire()
7264adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        try:
7274adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            inqueue.queue.clear()
7284adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            inqueue.queue.extend([None] * size)
7294adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            inqueue.not_empty.notify_all()
7304adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        finally:
7314adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            inqueue.not_empty.release()
732