17f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson#
27f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson# Module implementing queues
37f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson#
47f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson# multiprocessing/queues.py
57f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson#
679af245fb24a3608c341410ab9c782ce930401feR. David Murray# Copyright (c) 2006-2008, R Oudkerk
779af245fb24a3608c341410ab9c782ce930401feR. David Murray# All rights reserved.
879af245fb24a3608c341410ab9c782ce930401feR. David Murray#
979af245fb24a3608c341410ab9c782ce930401feR. David Murray# Redistribution and use in source and binary forms, with or without
1079af245fb24a3608c341410ab9c782ce930401feR. David Murray# modification, are permitted provided that the following conditions
1179af245fb24a3608c341410ab9c782ce930401feR. David Murray# are met:
1279af245fb24a3608c341410ab9c782ce930401feR. David Murray#
1379af245fb24a3608c341410ab9c782ce930401feR. David Murray# 1. Redistributions of source code must retain the above copyright
1479af245fb24a3608c341410ab9c782ce930401feR. David Murray#    notice, this list of conditions and the following disclaimer.
1579af245fb24a3608c341410ab9c782ce930401feR. David Murray# 2. Redistributions in binary form must reproduce the above copyright
1679af245fb24a3608c341410ab9c782ce930401feR. David Murray#    notice, this list of conditions and the following disclaimer in the
1779af245fb24a3608c341410ab9c782ce930401feR. David Murray#    documentation and/or other materials provided with the distribution.
1879af245fb24a3608c341410ab9c782ce930401feR. David Murray# 3. Neither the name of author nor the names of any contributors may be
1979af245fb24a3608c341410ab9c782ce930401feR. David Murray#    used to endorse or promote products derived from this software
2079af245fb24a3608c341410ab9c782ce930401feR. David Murray#    without specific prior written permission.
2179af245fb24a3608c341410ab9c782ce930401feR. David Murray#
2279af245fb24a3608c341410ab9c782ce930401feR. David Murray# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
2379af245fb24a3608c341410ab9c782ce930401feR. David Murray# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
2479af245fb24a3608c341410ab9c782ce930401feR. David Murray# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
2579af245fb24a3608c341410ab9c782ce930401feR. David Murray# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
2679af245fb24a3608c341410ab9c782ce930401feR. David Murray# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
2779af245fb24a3608c341410ab9c782ce930401feR. David Murray# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
2879af245fb24a3608c341410ab9c782ce930401feR. David Murray# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
2979af245fb24a3608c341410ab9c782ce930401feR. David Murray# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
3079af245fb24a3608c341410ab9c782ce930401feR. David Murray# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
3179af245fb24a3608c341410ab9c782ce930401feR. David Murray# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
3279af245fb24a3608c341410ab9c782ce930401feR. David Murray# SUCH DAMAGE.
337f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson#
347f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
35b2898e0acb7c40c8e77f0210d564eefa4779f24aJesse Noller__all__ = ['Queue', 'SimpleQueue', 'JoinableQueue']
367f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
377f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Petersonimport sys
387f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Petersonimport os
397f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Petersonimport threading
407f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Petersonimport collections
417f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Petersonimport time
427f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Petersonimport atexit
437f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Petersonimport weakref
447f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
457f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Petersonfrom Queue import Empty, Full
467f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Petersonimport _multiprocessing
47233e6988f4b439f160732dd5b55e9d1a06a64b47Serhiy Storchakafrom . import Pipe
48233e6988f4b439f160732dd5b55e9d1a06a64b47Serhiy Storchakafrom .synchronize import Lock, BoundedSemaphore, Semaphore, Condition
49233e6988f4b439f160732dd5b55e9d1a06a64b47Serhiy Storchakafrom .util import debug, info, Finalize, register_after_fork, is_exiting
50233e6988f4b439f160732dd5b55e9d1a06a64b47Serhiy Storchakafrom .forking import assert_spawning
517f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
527f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson#
537f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson# Queue type using a pipe, buffer and thread
547f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson#
557f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
567f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Petersonclass Queue(object):
577f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
587f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def __init__(self, maxsize=0):
597f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        if maxsize <= 0:
607f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
617f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._maxsize = maxsize
627f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._reader, self._writer = Pipe(duplex=False)
637f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._rlock = Lock()
647f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._opid = os.getpid()
657f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        if sys.platform == 'win32':
667f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            self._wlock = None
677f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        else:
687f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            self._wlock = Lock()
697f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._sem = BoundedSemaphore(maxsize)
707f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
717f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._after_fork()
727f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
737f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        if sys.platform != 'win32':
747f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            register_after_fork(self, Queue._after_fork)
757f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
767f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def __getstate__(self):
777f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        assert_spawning(self)
787f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        return (self._maxsize, self._reader, self._writer,
797f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                self._rlock, self._wlock, self._sem, self._opid)
807f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
817f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def __setstate__(self, state):
827f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        (self._maxsize, self._reader, self._writer,
837f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson         self._rlock, self._wlock, self._sem, self._opid) = state
847f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._after_fork()
857f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
867f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def _after_fork(self):
877f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        debug('Queue._after_fork()')
887f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._notempty = threading.Condition(threading.Lock())
897f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._buffer = collections.deque()
907f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._thread = None
917f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._jointhread = None
927f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._joincancelled = False
937f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._closed = False
947f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._close = None
957f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._send = self._writer.send
967f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._recv = self._reader.recv
977f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._poll = self._reader.poll
987f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
997f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def put(self, obj, block=True, timeout=None):
1007f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        assert not self._closed
1017f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        if not self._sem.acquire(block, timeout):
1027f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            raise Full
1037f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
1047f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._notempty.acquire()
1057f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        try:
1067f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            if self._thread is None:
1077f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                self._start_thread()
1087f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            self._buffer.append(obj)
1097f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            self._notempty.notify()
1107f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        finally:
1117f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            self._notempty.release()
1127f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
1137f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def get(self, block=True, timeout=None):
1147f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        if block and timeout is None:
1157f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            self._rlock.acquire()
1167f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            try:
1177f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                res = self._recv()
1187f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                self._sem.release()
1197f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                return res
1207f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            finally:
1217f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                self._rlock.release()
1227f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
1237f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        else:
1247f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            if block:
1257f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                deadline = time.time() + timeout
1267f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            if not self._rlock.acquire(block, timeout):
1277f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                raise Empty
1287f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            try:
1296a570d6b9af07a03859dd2175086e45ab60f4ee9Antoine Pitrou                if block:
1306a570d6b9af07a03859dd2175086e45ab60f4ee9Antoine Pitrou                    timeout = deadline - time.time()
1316a570d6b9af07a03859dd2175086e45ab60f4ee9Antoine Pitrou                    if timeout < 0 or not self._poll(timeout):
1326a570d6b9af07a03859dd2175086e45ab60f4ee9Antoine Pitrou                        raise Empty
1336a570d6b9af07a03859dd2175086e45ab60f4ee9Antoine Pitrou                elif not self._poll():
1347f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                    raise Empty
1357f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                res = self._recv()
1367f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                self._sem.release()
1377f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                return res
1387f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            finally:
1397f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                self._rlock.release()
1407f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
1417f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def qsize(self):
1420c6d16647984c6e274662bb64ee99bf2c1c2a86cGeorg Brandl        # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
1437f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        return self._maxsize - self._sem._semlock._get_value()
1447f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
1457f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def empty(self):
1467f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        return not self._poll()
1477f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
1487f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def full(self):
1497f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        return self._sem._semlock._is_zero()
1507f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
1517f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def get_nowait(self):
1527f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        return self.get(False)
1537f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
1547f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def put_nowait(self, obj):
1557f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        return self.put(obj, False)
1567f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
1577f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def close(self):
1587f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._closed = True
1591aa2c0f073bdbed4fa824591d53e20bbf3d01addSerhiy Storchaka        try:
1601aa2c0f073bdbed4fa824591d53e20bbf3d01addSerhiy Storchaka            self._reader.close()
1611aa2c0f073bdbed4fa824591d53e20bbf3d01addSerhiy Storchaka        finally:
1621aa2c0f073bdbed4fa824591d53e20bbf3d01addSerhiy Storchaka            close = self._close
1631aa2c0f073bdbed4fa824591d53e20bbf3d01addSerhiy Storchaka            if close:
1641aa2c0f073bdbed4fa824591d53e20bbf3d01addSerhiy Storchaka                self._close = None
1651aa2c0f073bdbed4fa824591d53e20bbf3d01addSerhiy Storchaka                close()
1667f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
1677f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def join_thread(self):
1687f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        debug('Queue.join_thread()')
1697f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        assert self._closed
1707f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        if self._jointhread:
1717f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            self._jointhread()
1727f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
1737f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def cancel_join_thread(self):
1747f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        debug('Queue.cancel_join_thread()')
1757f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._joincancelled = True
1767f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        try:
1777f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            self._jointhread.cancel()
1787f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        except AttributeError:
1797f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            pass
1807f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
1817f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def _start_thread(self):
1827f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        debug('Queue._start_thread()')
1837f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
1847f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        # Start thread which transfers data from buffer to pipe
1857f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._buffer.clear()
1867f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._thread = threading.Thread(
1877f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            target=Queue._feed,
1887f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            args=(self._buffer, self._notempty, self._send,
1897f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                  self._wlock, self._writer.close),
1907f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            name='QueueFeederThread'
1917f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            )
192a9b2222de40ed62e6ec1c79f6a89607913f4babdBenjamin Peterson        self._thread.daemon = True
1937f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
1947f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        debug('doing self._thread.start()')
1957f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._thread.start()
1967f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        debug('... done self._thread.start()')
1977f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
1987f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        # On process exit we will wait for data to be flushed to pipe.
19977657e40fa5f43fe6f7ffb6e32da4613dba657e1Antoine Pitrou        if not self._joincancelled:
2007f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            self._jointhread = Finalize(
2017f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                self._thread, Queue._finalize_join,
2027f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                [weakref.ref(self._thread)],
2037f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                exitpriority=-5
2047f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                )
2057f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
2067f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        # Send sentinel to the thread queue object when garbage collected
2077f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._close = Finalize(
2087f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            self, Queue._finalize_close,
2097f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            [self._buffer, self._notempty],
2107f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            exitpriority=10
2117f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            )
2127f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
2137f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    @staticmethod
2147f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def _finalize_join(twr):
2157f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        debug('joining queue thread')
2167f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        thread = twr()
2177f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        if thread is not None:
2187f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            thread.join()
2197f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            debug('... queue thread joined')
2207f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        else:
2217f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            debug('... queue thread already dead')
2227f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
2237f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    @staticmethod
2247f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def _finalize_close(buffer, notempty):
2257f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        debug('telling queue thread to quit')
2267f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        notempty.acquire()
2277f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        try:
2287f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            buffer.append(_sentinel)
2297f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            notempty.notify()
2307f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        finally:
2317f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            notempty.release()
2327f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
2337f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    @staticmethod
2347f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def _feed(buffer, notempty, send, writelock, close):
2357f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        debug('starting thread to feed data to pipe')
2367f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        nacquire = notempty.acquire
2377f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        nrelease = notempty.release
2387f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        nwait = notempty.wait
2397f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        bpopleft = buffer.popleft
2407f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        sentinel = _sentinel
2417f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        if sys.platform != 'win32':
2427f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            wacquire = writelock.acquire
2437f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            wrelease = writelock.release
2447f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        else:
2457f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            wacquire = None
2467f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
2477f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        try:
2487f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            while 1:
2497f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                nacquire()
2507f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                try:
2517f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                    if not buffer:
2527f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                        nwait()
2537f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                finally:
2547f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                    nrelease()
2557f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                try:
2567f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                    while 1:
2577f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                        obj = bpopleft()
2587f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                        if obj is sentinel:
2597f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                            debug('feeder thread got sentinel -- exiting')
2607f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                            close()
2617f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                            return
2627bdd8d946ba3913ad8631f4d7bbc952f16144747Jesse Noller
2637f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                        if wacquire is None:
2647f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                            send(obj)
2657f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                        else:
2667bdd8d946ba3913ad8631f4d7bbc952f16144747Jesse Noller                            wacquire()
2677f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                            try:
2687f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                                send(obj)
2697f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                            finally:
2707f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                                wrelease()
2717f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                except IndexError:
2727f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                    pass
2737f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        except Exception, e:
2747f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            # Since this runs in a daemon thread the resources it uses
2757f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            # may be become unusable while the process is cleaning up.
2767f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            # We ignore errors which happen after the process has
2777f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            # started to cleanup.
2787f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            try:
2797f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                if is_exiting():
2807f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                    info('error in queue thread: %s', e)
2817f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                else:
2827f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                    import traceback
2837f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                    traceback.print_exc()
2847f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            except Exception:
2857f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                pass
2867f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
2877f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson_sentinel = object()
2887f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
2897f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson#
2907f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson# A queue type which also supports join() and task_done() methods
2917f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson#
2927f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson# Note that if you do not call task_done() for each finished task then
2937f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson# eventually the counter's semaphore may overflow causing Bad Things
2947f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson# to happen.
2957f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson#
2967f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
2977f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Petersonclass JoinableQueue(Queue):
2987f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
2997f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def __init__(self, maxsize=0):
3007f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        Queue.__init__(self, maxsize)
3017f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._unfinished_tasks = Semaphore(0)
3027f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._cond = Condition()
3037f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
3047f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def __getstate__(self):
3057f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
3067f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
3077f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def __setstate__(self, state):
3087f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        Queue.__setstate__(self, state[:-2])
3097f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._cond, self._unfinished_tasks = state[-2:]
3107f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
3118497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller    def put(self, obj, block=True, timeout=None):
3128497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller        assert not self._closed
3138497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller        if not self._sem.acquire(block, timeout):
3148497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller            raise Full
3158497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller
3168497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller        self._notempty.acquire()
3178497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller        self._cond.acquire()
3188497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller        try:
3198497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller            if self._thread is None:
3208497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller                self._start_thread()
3218497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller            self._buffer.append(obj)
3228497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller            self._unfinished_tasks.release()
3238497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller            self._notempty.notify()
3248497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller        finally:
3258497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller            self._cond.release()
3268497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller            self._notempty.release()
3277f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
3287f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def task_done(self):
3297f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._cond.acquire()
3307f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        try:
3317f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            if not self._unfinished_tasks.acquire(False):
3327f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                raise ValueError('task_done() called too many times')
3337f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            if self._unfinished_tasks._semlock._is_zero():
3347f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                self._cond.notify_all()
3357f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        finally:
3367f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            self._cond.release()
3377f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
3387f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def join(self):
3397f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._cond.acquire()
3407f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        try:
3417f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            if not self._unfinished_tasks._semlock._is_zero():
3427f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                self._cond.wait()
3437f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        finally:
3447f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            self._cond.release()
3457f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
3467f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson#
3477f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson# Simplified Queue type -- really just a locked pipe
3487f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson#
3497f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
3507f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Petersonclass SimpleQueue(object):
3517f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
3527f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def __init__(self):
3537f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._reader, self._writer = Pipe(duplex=False)
3547f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._rlock = Lock()
3557f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        if sys.platform == 'win32':
3567f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            self._wlock = None
3577f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        else:
3587f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            self._wlock = Lock()
3597f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._make_methods()
3607f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
3617f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def empty(self):
3627f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        return not self._reader.poll()
3637f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
3647f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def __getstate__(self):
3657f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        assert_spawning(self)
3667f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        return (self._reader, self._writer, self._rlock, self._wlock)
3677f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
3687f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def __setstate__(self, state):
3697f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        (self._reader, self._writer, self._rlock, self._wlock) = state
3707f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self._make_methods()
3717f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
3727f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson    def _make_methods(self):
3737f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        recv = self._reader.recv
3747f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        racquire, rrelease = self._rlock.acquire, self._rlock.release
3757f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        def get():
3767f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            racquire()
3777f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            try:
3787f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                return recv()
3797f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            finally:
3807f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                rrelease()
3817f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        self.get = get
3827f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson
3837f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        if self._wlock is None:
3847f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            # writes to a message oriented win32 pipe are atomic
3857f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            self.put = self._writer.send
3867f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson        else:
3877f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            send = self._writer.send
3887f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            wacquire, wrelease = self._wlock.acquire, self._wlock.release
3897f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            def put(obj):
3907f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                wacquire()
3917f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                try:
3927f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                    return send(obj)
3937f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                finally:
3947f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson                    wrelease()
3957f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson            self.put = put
396