14adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
24adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Module implementing queues
34adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
44adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# multiprocessing/queues.py
54adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
64adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Copyright (c) 2006-2008, R Oudkerk
74adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# All rights reserved.
84adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
94adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Redistribution and use in source and binary forms, with or without
104adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# modification, are permitted provided that the following conditions
114adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# are met:
124adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
134adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 1. Redistributions of source code must retain the above copyright
144adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#    notice, this list of conditions and the following disclaimer.
154adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 2. Redistributions in binary form must reproduce the above copyright
164adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#    notice, this list of conditions and the following disclaimer in the
174adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#    documentation and/or other materials provided with the distribution.
184adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 3. Neither the name of author nor the names of any contributors may be
194adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#    used to endorse or promote products derived from this software
204adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#    without specific prior written permission.
214adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
224adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
234adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
244adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
254adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
264adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
274adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
284adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
294adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
304adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
314adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
324adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# SUCH DAMAGE.
334adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
344adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
354adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao__all__ = ['Queue', 'SimpleQueue', 'JoinableQueue']
364adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
374adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoimport sys
384adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoimport os
394adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoimport threading
404adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoimport collections
414adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoimport time
424adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoimport atexit
434adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoimport weakref
444adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
454adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaofrom Queue import Empty, Full
464adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoimport _multiprocessing
474adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaofrom multiprocessing import Pipe
484adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaofrom multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
494adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaofrom multiprocessing.util import debug, info, Finalize, register_after_fork
504adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaofrom multiprocessing.forking import assert_spawning
514adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
524adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
534adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Queue type using a pipe, buffer and thread
544adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
554adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
564adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoclass Queue(object):
574adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
584adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def __init__(self, maxsize=0):
594adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if maxsize <= 0:
604adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
614adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._maxsize = maxsize
624adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._reader, self._writer = Pipe(duplex=False)
634adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._rlock = Lock()
644adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._opid = os.getpid()
654adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if sys.platform == 'win32':
664adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._wlock = None
674adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        else:
684adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._wlock = Lock()
694adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._sem = BoundedSemaphore(maxsize)
704adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
714adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._after_fork()
724adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
734adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if sys.platform != 'win32':
744adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            register_after_fork(self, Queue._after_fork)
754adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
764adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def __getstate__(self):
774adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        assert_spawning(self)
784adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        return (self._maxsize, self._reader, self._writer,
794adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                self._rlock, self._wlock, self._sem, self._opid)
804adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
814adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def __setstate__(self, state):
824adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        (self._maxsize, self._reader, self._writer,
834adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao         self._rlock, self._wlock, self._sem, self._opid) = state
844adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._after_fork()
854adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
864adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def _after_fork(self):
874adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        debug('Queue._after_fork()')
884adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._notempty = threading.Condition(threading.Lock())
894adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._buffer = collections.deque()
904adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._thread = None
914adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._jointhread = None
924adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._joincancelled = False
934adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._closed = False
944adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._close = None
954adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._send = self._writer.send
964adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._recv = self._reader.recv
974adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._poll = self._reader.poll
984adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
994adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def put(self, obj, block=True, timeout=None):
1004adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        assert not self._closed
1014adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if not self._sem.acquire(block, timeout):
1024adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            raise Full
1034adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1044adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._notempty.acquire()
1054adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        try:
1064adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            if self._thread is None:
1074adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                self._start_thread()
1084adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._buffer.append(obj)
1094adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._notempty.notify()
1104adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        finally:
1114adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._notempty.release()
1124adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1134adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def get(self, block=True, timeout=None):
1144adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if block and timeout is None:
1154adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._rlock.acquire()
1164adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            try:
1174adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                res = self._recv()
1184adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                self._sem.release()
1194adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                return res
1204adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            finally:
1214adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                self._rlock.release()
1224adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1234adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        else:
1244adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            if block:
1254adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                deadline = time.time() + timeout
1264adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            if not self._rlock.acquire(block, timeout):
1274adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                raise Empty
1284adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            try:
1294adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                if block:
1304adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    timeout = deadline - time.time()
1314adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    if timeout < 0 or not self._poll(timeout):
1324adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                        raise Empty
1334adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                elif not self._poll():
1344adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    raise Empty
1354adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                res = self._recv()
1364adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                self._sem.release()
1374adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                return res
1384adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            finally:
1394adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                self._rlock.release()
1404adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1414adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def qsize(self):
1424adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
1434adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        return self._maxsize - self._sem._semlock._get_value()
1444adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1454adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def empty(self):
1464adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        return not self._poll()
1474adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1484adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def full(self):
1494adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        return self._sem._semlock._is_zero()
1504adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1514adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def get_nowait(self):
1524adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        return self.get(False)
1534adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1544adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def put_nowait(self, obj):
1554adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        return self.put(obj, False)
1564adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1574adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def close(self):
1584adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._closed = True
1594adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._reader.close()
1604adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if self._close:
1614adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._close()
1624adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1634adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def join_thread(self):
1644adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        debug('Queue.join_thread()')
1654adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        assert self._closed
1664adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if self._jointhread:
1674adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._jointhread()
1684adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1694adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def cancel_join_thread(self):
1704adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        debug('Queue.cancel_join_thread()')
1714adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._joincancelled = True
1724adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        try:
1734adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._jointhread.cancel()
1744adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        except AttributeError:
1754adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            pass
1764adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1774adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def _start_thread(self):
1784adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        debug('Queue._start_thread()')
1794adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1804adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        # Start thread which transfers data from buffer to pipe
1814adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._buffer.clear()
1824adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._thread = threading.Thread(
1834adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            target=Queue._feed,
1844adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            args=(self._buffer, self._notempty, self._send,
1854adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                  self._wlock, self._writer.close),
1864adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            name='QueueFeederThread'
1874adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            )
1884adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._thread.daemon = True
1894adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1904adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        debug('doing self._thread.start()')
1914adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._thread.start()
1924adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        debug('... done self._thread.start()')
1934adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
1944adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        # On process exit we will wait for data to be flushed to pipe.
1954adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if not self._joincancelled:
1964adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._jointhread = Finalize(
1974adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                self._thread, Queue._finalize_join,
1984adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                [weakref.ref(self._thread)],
1994adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                exitpriority=-5
2004adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                )
2014adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
2024adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        # Send sentinel to the thread queue object when garbage collected
2034adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._close = Finalize(
2044adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self, Queue._finalize_close,
2054adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            [self._buffer, self._notempty],
2064adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            exitpriority=10
2074adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            )
2084adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
2094adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    @staticmethod
2104adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def _finalize_join(twr):
2114adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        debug('joining queue thread')
2124adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        thread = twr()
2134adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if thread is not None:
2144adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            thread.join()
2154adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            debug('... queue thread joined')
2164adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        else:
2174adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            debug('... queue thread already dead')
2184adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
2194adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    @staticmethod
2204adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def _finalize_close(buffer, notempty):
2214adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        debug('telling queue thread to quit')
2224adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        notempty.acquire()
2234adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        try:
2244adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            buffer.append(_sentinel)
2254adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            notempty.notify()
2264adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        finally:
2274adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            notempty.release()
2284adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
2294adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    @staticmethod
2304adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def _feed(buffer, notempty, send, writelock, close):
2314adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        debug('starting thread to feed data to pipe')
2324adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        from .util import is_exiting
2334adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
2344adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        nacquire = notempty.acquire
2354adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        nrelease = notempty.release
2364adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        nwait = notempty.wait
2374adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        bpopleft = buffer.popleft
2384adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        sentinel = _sentinel
2394adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if sys.platform != 'win32':
2404adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            wacquire = writelock.acquire
2414adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            wrelease = writelock.release
2424adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        else:
2434adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            wacquire = None
2444adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
2454adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        try:
2464adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            while 1:
2474adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                nacquire()
2484adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                try:
2494adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    if not buffer:
2504adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                        nwait()
2514adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                finally:
2524adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    nrelease()
2534adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                try:
2544adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    while 1:
2554adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                        obj = bpopleft()
2564adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                        if obj is sentinel:
2574adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                            debug('feeder thread got sentinel -- exiting')
2584adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                            close()
2594adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                            return
2604adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
2614adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                        if wacquire is None:
2624adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                            send(obj)
2634adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                        else:
2644adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                            wacquire()
2654adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                            try:
2664adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                                send(obj)
2674adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                            finally:
2684adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                                wrelease()
2694adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                except IndexError:
2704adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    pass
2714adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        except Exception, e:
2724adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            # Since this runs in a daemon thread the resources it uses
2734adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            # may be become unusable while the process is cleaning up.
2744adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            # We ignore errors which happen after the process has
2754adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            # started to cleanup.
2764adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            try:
2774adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                if is_exiting():
2784adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    info('error in queue thread: %s', e)
2794adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                else:
2804adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    import traceback
2814adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    traceback.print_exc()
2824adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            except Exception:
2834adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                pass
2844adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
2854adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao_sentinel = object()
2864adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
2874adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
2884adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# A queue type which also supports join() and task_done() methods
2894adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
2904adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Note that if you do not call task_done() for each finished task then
2914adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# eventually the counter's semaphore may overflow causing Bad Things
2924adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# to happen.
2934adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
2944adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
2954adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoclass JoinableQueue(Queue):
2964adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
2974adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def __init__(self, maxsize=0):
2984adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        Queue.__init__(self, maxsize)
2994adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._unfinished_tasks = Semaphore(0)
3004adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._cond = Condition()
3014adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3024adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def __getstate__(self):
3034adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
3044adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3054adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def __setstate__(self, state):
3064adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        Queue.__setstate__(self, state[:-2])
3074adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._cond, self._unfinished_tasks = state[-2:]
3084adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3094adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def put(self, obj, block=True, timeout=None):
3104adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        assert not self._closed
3114adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if not self._sem.acquire(block, timeout):
3124adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            raise Full
3134adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3144adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._notempty.acquire()
3154adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._cond.acquire()
3164adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        try:
3174adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            if self._thread is None:
3184adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                self._start_thread()
3194adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._buffer.append(obj)
3204adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._unfinished_tasks.release()
3214adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._notempty.notify()
3224adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        finally:
3234adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._cond.release()
3244adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._notempty.release()
3254adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3264adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def task_done(self):
3274adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._cond.acquire()
3284adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        try:
3294adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            if not self._unfinished_tasks.acquire(False):
3304adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                raise ValueError('task_done() called too many times')
3314adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            if self._unfinished_tasks._semlock._is_zero():
3324adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                self._cond.notify_all()
3334adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        finally:
3344adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._cond.release()
3354adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3364adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def join(self):
3374adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._cond.acquire()
3384adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        try:
3394adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            if not self._unfinished_tasks._semlock._is_zero():
3404adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                self._cond.wait()
3414adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        finally:
3424adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._cond.release()
3434adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3444adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
3454adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Simplified Queue type -- really just a locked pipe
3464adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao#
3474adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3484adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoclass SimpleQueue(object):
3494adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3504adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def __init__(self):
3514adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._reader, self._writer = Pipe(duplex=False)
3524adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._rlock = Lock()
3534adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if sys.platform == 'win32':
3544adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._wlock = None
3554adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        else:
3564adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self._wlock = Lock()
3574adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._make_methods()
3584adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3594adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def empty(self):
3604adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        return not self._reader.poll()
3614adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3624adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def __getstate__(self):
3634adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        assert_spawning(self)
3644adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        return (self._reader, self._writer, self._rlock, self._wlock)
3654adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3664adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def __setstate__(self, state):
3674adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        (self._reader, self._writer, self._rlock, self._wlock) = state
3684adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self._make_methods()
3694adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3704adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao    def _make_methods(self):
3714adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        recv = self._reader.recv
3724adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        racquire, rrelease = self._rlock.acquire, self._rlock.release
3734adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        def get():
3744adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            racquire()
3754adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            try:
3764adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                return recv()
3774adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            finally:
3784adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                rrelease()
3794adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        self.get = get
3804adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao
3814adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        if self._wlock is None:
3824adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            # writes to a message oriented win32 pipe are atomic
3834adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self.put = self._writer.send
3844adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao        else:
3854adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            send = self._writer.send
3864adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            wacquire, wrelease = self._wlock.acquire, self._wlock.release
3874adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            def put(obj):
3884adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                wacquire()
3894adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                try:
3904adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    return send(obj)
3914adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                finally:
3924adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao                    wrelease()
3934adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao            self.put = put
394