17f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson# 27f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson# Module implementing queues 37f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson# 47f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson# multiprocessing/queues.py 57f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson# 679af245fb24a3608c341410ab9c782ce930401feR. David Murray# Copyright (c) 2006-2008, R Oudkerk 779af245fb24a3608c341410ab9c782ce930401feR. David Murray# All rights reserved. 879af245fb24a3608c341410ab9c782ce930401feR. David Murray# 979af245fb24a3608c341410ab9c782ce930401feR. David Murray# Redistribution and use in source and binary forms, with or without 1079af245fb24a3608c341410ab9c782ce930401feR. David Murray# modification, are permitted provided that the following conditions 1179af245fb24a3608c341410ab9c782ce930401feR. David Murray# are met: 1279af245fb24a3608c341410ab9c782ce930401feR. David Murray# 1379af245fb24a3608c341410ab9c782ce930401feR. David Murray# 1. Redistributions of source code must retain the above copyright 1479af245fb24a3608c341410ab9c782ce930401feR. David Murray# notice, this list of conditions and the following disclaimer. 1579af245fb24a3608c341410ab9c782ce930401feR. David Murray# 2. Redistributions in binary form must reproduce the above copyright 1679af245fb24a3608c341410ab9c782ce930401feR. David Murray# notice, this list of conditions and the following disclaimer in the 1779af245fb24a3608c341410ab9c782ce930401feR. David Murray# documentation and/or other materials provided with the distribution. 1879af245fb24a3608c341410ab9c782ce930401feR. David Murray# 3. Neither the name of author nor the names of any contributors may be 1979af245fb24a3608c341410ab9c782ce930401feR. David Murray# used to endorse or promote products derived from this software 2079af245fb24a3608c341410ab9c782ce930401feR. David Murray# without specific prior written permission. 2179af245fb24a3608c341410ab9c782ce930401feR. David Murray# 2279af245fb24a3608c341410ab9c782ce930401feR. David Murray# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 2379af245fb24a3608c341410ab9c782ce930401feR. David Murray# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 2479af245fb24a3608c341410ab9c782ce930401feR. David Murray# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 2579af245fb24a3608c341410ab9c782ce930401feR. David Murray# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 2679af245fb24a3608c341410ab9c782ce930401feR. David Murray# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 2779af245fb24a3608c341410ab9c782ce930401feR. David Murray# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 2879af245fb24a3608c341410ab9c782ce930401feR. David Murray# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 2979af245fb24a3608c341410ab9c782ce930401feR. David Murray# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 3079af245fb24a3608c341410ab9c782ce930401feR. David Murray# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 3179af245fb24a3608c341410ab9c782ce930401feR. David Murray# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 3279af245fb24a3608c341410ab9c782ce930401feR. David Murray# SUCH DAMAGE. 337f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson# 347f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 35b2898e0acb7c40c8e77f0210d564eefa4779f24aJesse Noller__all__ = ['Queue', 'SimpleQueue', 'JoinableQueue'] 367f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 377f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Petersonimport sys 387f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Petersonimport os 397f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Petersonimport threading 407f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Petersonimport collections 417f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Petersonimport time 427f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Petersonimport atexit 437f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Petersonimport weakref 447f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 457f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Petersonfrom Queue import Empty, Full 467f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Petersonimport _multiprocessing 47233e6988f4b439f160732dd5b55e9d1a06a64b47Serhiy Storchakafrom . import Pipe 48233e6988f4b439f160732dd5b55e9d1a06a64b47Serhiy Storchakafrom .synchronize import Lock, BoundedSemaphore, Semaphore, Condition 49233e6988f4b439f160732dd5b55e9d1a06a64b47Serhiy Storchakafrom .util import debug, info, Finalize, register_after_fork, is_exiting 50233e6988f4b439f160732dd5b55e9d1a06a64b47Serhiy Storchakafrom .forking import assert_spawning 517f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 527f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson# 537f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson# Queue type using a pipe, buffer and thread 547f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson# 557f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 567f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Petersonclass Queue(object): 577f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 587f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def __init__(self, maxsize=0): 597f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson if maxsize <= 0: 607f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX 617f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._maxsize = maxsize 627f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._reader, self._writer = Pipe(duplex=False) 637f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._rlock = Lock() 647f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._opid = os.getpid() 657f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson if sys.platform == 'win32': 667f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._wlock = None 677f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson else: 687f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._wlock = Lock() 697f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._sem = BoundedSemaphore(maxsize) 707f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 717f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._after_fork() 727f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 737f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson if sys.platform != 'win32': 747f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson register_after_fork(self, Queue._after_fork) 757f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 767f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def __getstate__(self): 777f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson assert_spawning(self) 787f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson return (self._maxsize, self._reader, self._writer, 797f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._rlock, self._wlock, self._sem, self._opid) 807f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 817f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def __setstate__(self, state): 827f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson (self._maxsize, self._reader, self._writer, 837f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._rlock, self._wlock, self._sem, self._opid) = state 847f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._after_fork() 857f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 867f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def _after_fork(self): 877f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson debug('Queue._after_fork()') 887f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._notempty = threading.Condition(threading.Lock()) 897f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._buffer = collections.deque() 907f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._thread = None 917f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._jointhread = None 927f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._joincancelled = False 937f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._closed = False 947f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._close = None 957f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._send = self._writer.send 967f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._recv = self._reader.recv 977f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._poll = self._reader.poll 987f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 997f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def put(self, obj, block=True, timeout=None): 1007f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson assert not self._closed 1017f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson if not self._sem.acquire(block, timeout): 1027f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson raise Full 1037f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 1047f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._notempty.acquire() 1057f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson try: 1067f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson if self._thread is None: 1077f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._start_thread() 1087f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._buffer.append(obj) 1097f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._notempty.notify() 1107f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson finally: 1117f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._notempty.release() 1127f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 1137f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def get(self, block=True, timeout=None): 1147f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson if block and timeout is None: 1157f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._rlock.acquire() 1167f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson try: 1177f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson res = self._recv() 1187f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._sem.release() 1197f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson return res 1207f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson finally: 1217f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._rlock.release() 1227f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 1237f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson else: 1247f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson if block: 1257f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson deadline = time.time() + timeout 1267f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson if not self._rlock.acquire(block, timeout): 1277f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson raise Empty 1287f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson try: 1296a570d6b9af07a03859dd2175086e45ab60f4ee9Antoine Pitrou if block: 1306a570d6b9af07a03859dd2175086e45ab60f4ee9Antoine Pitrou timeout = deadline - time.time() 1316a570d6b9af07a03859dd2175086e45ab60f4ee9Antoine Pitrou if timeout < 0 or not self._poll(timeout): 1326a570d6b9af07a03859dd2175086e45ab60f4ee9Antoine Pitrou raise Empty 1336a570d6b9af07a03859dd2175086e45ab60f4ee9Antoine Pitrou elif not self._poll(): 1347f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson raise Empty 1357f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson res = self._recv() 1367f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._sem.release() 1377f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson return res 1387f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson finally: 1397f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._rlock.release() 1407f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 1417f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def qsize(self): 1420c6d16647984c6e274662bb64ee99bf2c1c2a86cGeorg Brandl # Raises NotImplementedError on Mac OSX because of broken sem_getvalue() 1437f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson return self._maxsize - self._sem._semlock._get_value() 1447f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 1457f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def empty(self): 1467f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson return not self._poll() 1477f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 1487f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def full(self): 1497f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson return self._sem._semlock._is_zero() 1507f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 1517f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def get_nowait(self): 1527f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson return self.get(False) 1537f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 1547f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def put_nowait(self, obj): 1557f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson return self.put(obj, False) 1567f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 1577f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def close(self): 1587f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._closed = True 1591aa2c0f073bdbed4fa824591d53e20bbf3d01addSerhiy Storchaka try: 1601aa2c0f073bdbed4fa824591d53e20bbf3d01addSerhiy Storchaka self._reader.close() 1611aa2c0f073bdbed4fa824591d53e20bbf3d01addSerhiy Storchaka finally: 1621aa2c0f073bdbed4fa824591d53e20bbf3d01addSerhiy Storchaka close = self._close 1631aa2c0f073bdbed4fa824591d53e20bbf3d01addSerhiy Storchaka if close: 1641aa2c0f073bdbed4fa824591d53e20bbf3d01addSerhiy Storchaka self._close = None 1651aa2c0f073bdbed4fa824591d53e20bbf3d01addSerhiy Storchaka close() 1667f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 1677f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def join_thread(self): 1687f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson debug('Queue.join_thread()') 1697f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson assert self._closed 1707f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson if self._jointhread: 1717f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._jointhread() 1727f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 1737f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def cancel_join_thread(self): 1747f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson debug('Queue.cancel_join_thread()') 1757f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._joincancelled = True 1767f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson try: 1777f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._jointhread.cancel() 1787f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson except AttributeError: 1797f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson pass 1807f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 1817f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def _start_thread(self): 1827f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson debug('Queue._start_thread()') 1837f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 1847f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson # Start thread which transfers data from buffer to pipe 1857f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._buffer.clear() 1867f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._thread = threading.Thread( 1877f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson target=Queue._feed, 1887f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson args=(self._buffer, self._notempty, self._send, 1897f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._wlock, self._writer.close), 1907f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson name='QueueFeederThread' 1917f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson ) 192a9b2222de40ed62e6ec1c79f6a89607913f4babdBenjamin Peterson self._thread.daemon = True 1937f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 1947f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson debug('doing self._thread.start()') 1957f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._thread.start() 1967f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson debug('... done self._thread.start()') 1977f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 1987f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson # On process exit we will wait for data to be flushed to pipe. 19977657e40fa5f43fe6f7ffb6e32da4613dba657e1Antoine Pitrou if not self._joincancelled: 2007f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._jointhread = Finalize( 2017f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._thread, Queue._finalize_join, 2027f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson [weakref.ref(self._thread)], 2037f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson exitpriority=-5 2047f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson ) 2057f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 2067f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson # Send sentinel to the thread queue object when garbage collected 2077f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._close = Finalize( 2087f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self, Queue._finalize_close, 2097f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson [self._buffer, self._notempty], 2107f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson exitpriority=10 2117f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson ) 2127f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 2137f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson @staticmethod 2147f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def _finalize_join(twr): 2157f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson debug('joining queue thread') 2167f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson thread = twr() 2177f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson if thread is not None: 2187f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson thread.join() 2197f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson debug('... queue thread joined') 2207f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson else: 2217f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson debug('... queue thread already dead') 2227f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 2237f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson @staticmethod 2247f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def _finalize_close(buffer, notempty): 2257f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson debug('telling queue thread to quit') 2267f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson notempty.acquire() 2277f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson try: 2287f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson buffer.append(_sentinel) 2297f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson notempty.notify() 2307f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson finally: 2317f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson notempty.release() 2327f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 2337f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson @staticmethod 2347f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def _feed(buffer, notempty, send, writelock, close): 2357f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson debug('starting thread to feed data to pipe') 2367f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson nacquire = notempty.acquire 2377f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson nrelease = notempty.release 2387f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson nwait = notempty.wait 2397f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson bpopleft = buffer.popleft 2407f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson sentinel = _sentinel 2417f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson if sys.platform != 'win32': 2427f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson wacquire = writelock.acquire 2437f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson wrelease = writelock.release 2447f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson else: 2457f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson wacquire = None 2467f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 2477f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson try: 2487f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson while 1: 2497f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson nacquire() 2507f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson try: 2517f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson if not buffer: 2527f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson nwait() 2537f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson finally: 2547f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson nrelease() 2557f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson try: 2567f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson while 1: 2577f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson obj = bpopleft() 2587f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson if obj is sentinel: 2597f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson debug('feeder thread got sentinel -- exiting') 2607f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson close() 2617f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson return 2627bdd8d946ba3913ad8631f4d7bbc952f16144747Jesse Noller 2637f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson if wacquire is None: 2647f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson send(obj) 2657f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson else: 2667bdd8d946ba3913ad8631f4d7bbc952f16144747Jesse Noller wacquire() 2677f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson try: 2687f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson send(obj) 2697f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson finally: 2707f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson wrelease() 2717f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson except IndexError: 2727f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson pass 2737f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson except Exception, e: 2747f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson # Since this runs in a daemon thread the resources it uses 2757f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson # may be become unusable while the process is cleaning up. 2767f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson # We ignore errors which happen after the process has 2777f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson # started to cleanup. 2787f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson try: 2797f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson if is_exiting(): 2807f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson info('error in queue thread: %s', e) 2817f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson else: 2827f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson import traceback 2837f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson traceback.print_exc() 2847f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson except Exception: 2857f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson pass 2867f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 2877f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson_sentinel = object() 2887f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 2897f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson# 2907f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson# A queue type which also supports join() and task_done() methods 2917f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson# 2927f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson# Note that if you do not call task_done() for each finished task then 2937f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson# eventually the counter's semaphore may overflow causing Bad Things 2947f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson# to happen. 2957f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson# 2967f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 2977f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Petersonclass JoinableQueue(Queue): 2987f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 2997f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def __init__(self, maxsize=0): 3007f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson Queue.__init__(self, maxsize) 3017f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._unfinished_tasks = Semaphore(0) 3027f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._cond = Condition() 3037f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 3047f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def __getstate__(self): 3057f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks) 3067f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 3077f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def __setstate__(self, state): 3087f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson Queue.__setstate__(self, state[:-2]) 3097f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._cond, self._unfinished_tasks = state[-2:] 3107f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 3118497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller def put(self, obj, block=True, timeout=None): 3128497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller assert not self._closed 3138497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller if not self._sem.acquire(block, timeout): 3148497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller raise Full 3158497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller 3168497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller self._notempty.acquire() 3178497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller self._cond.acquire() 3188497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller try: 3198497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller if self._thread is None: 3208497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller self._start_thread() 3218497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller self._buffer.append(obj) 3228497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller self._unfinished_tasks.release() 3238497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller self._notempty.notify() 3248497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller finally: 3258497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller self._cond.release() 3268497efeb4064be366a76e50a8650ed4b6dd3fd01Jesse Noller self._notempty.release() 3277f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 3287f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def task_done(self): 3297f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._cond.acquire() 3307f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson try: 3317f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson if not self._unfinished_tasks.acquire(False): 3327f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson raise ValueError('task_done() called too many times') 3337f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson if self._unfinished_tasks._semlock._is_zero(): 3347f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._cond.notify_all() 3357f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson finally: 3367f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._cond.release() 3377f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 3387f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def join(self): 3397f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._cond.acquire() 3407f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson try: 3417f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson if not self._unfinished_tasks._semlock._is_zero(): 3427f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._cond.wait() 3437f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson finally: 3447f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._cond.release() 3457f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 3467f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson# 3477f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson# Simplified Queue type -- really just a locked pipe 3487f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson# 3497f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 3507f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Petersonclass SimpleQueue(object): 3517f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 3527f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def __init__(self): 3537f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._reader, self._writer = Pipe(duplex=False) 3547f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._rlock = Lock() 3557f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson if sys.platform == 'win32': 3567f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._wlock = None 3577f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson else: 3587f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._wlock = Lock() 3597f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._make_methods() 3607f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 3617f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def empty(self): 3627f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson return not self._reader.poll() 3637f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 3647f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def __getstate__(self): 3657f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson assert_spawning(self) 3667f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson return (self._reader, self._writer, self._rlock, self._wlock) 3677f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 3687f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def __setstate__(self, state): 3697f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson (self._reader, self._writer, self._rlock, self._wlock) = state 3707f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self._make_methods() 3717f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 3727f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def _make_methods(self): 3737f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson recv = self._reader.recv 3747f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson racquire, rrelease = self._rlock.acquire, self._rlock.release 3757f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def get(): 3767f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson racquire() 3777f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson try: 3787f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson return recv() 3797f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson finally: 3807f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson rrelease() 3817f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self.get = get 3827f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson 3837f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson if self._wlock is None: 3847f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson # writes to a message oriented win32 pipe are atomic 3857f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self.put = self._writer.send 3867f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson else: 3877f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson send = self._writer.send 3887f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson wacquire, wrelease = self._wlock.acquire, self._wlock.release 3897f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson def put(obj): 3907f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson wacquire() 3917f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson try: 3927f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson return send(obj) 3937f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson finally: 3947f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson wrelease() 3957f03ea77bf43257789469b5cbc16982eb0a63b0fBenjamin Peterson self.put = put 396