14adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 24adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Module implementing queues 34adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 44adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# multiprocessing/queues.py 54adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 64adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Copyright (c) 2006-2008, R Oudkerk 74adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# All rights reserved. 84adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 94adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Redistribution and use in source and binary forms, with or without 104adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# modification, are permitted provided that the following conditions 114adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# are met: 124adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 134adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 1. Redistributions of source code must retain the above copyright 144adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# notice, this list of conditions and the following disclaimer. 154adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 2. Redistributions in binary form must reproduce the above copyright 164adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# notice, this list of conditions and the following disclaimer in the 174adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# documentation and/or other materials provided with the distribution. 184adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 3. Neither the name of author nor the names of any contributors may be 194adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# used to endorse or promote products derived from this software 204adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# without specific prior written permission. 214adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 224adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 234adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 244adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 254adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 264adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 274adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 284adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 294adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 304adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 314adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 324adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# SUCH DAMAGE. 334adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 344adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 354adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao__all__ = ['Queue', 'SimpleQueue', 'JoinableQueue'] 364adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 374adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoimport sys 384adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoimport os 394adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoimport threading 404adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoimport collections 414adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoimport time 424adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoimport atexit 434adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoimport weakref 444adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 454adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaofrom Queue import Empty, Full 464adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoimport _multiprocessing 474adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaofrom multiprocessing import Pipe 484adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaofrom multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition 494adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaofrom multiprocessing.util import debug, info, Finalize, register_after_fork 504adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaofrom multiprocessing.forking import assert_spawning 514adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 524adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 534adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Queue type using a pipe, buffer and thread 544adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 554adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 564adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoclass Queue(object): 574adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 584adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def __init__(self, maxsize=0): 594adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if maxsize <= 0: 604adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX 614adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._maxsize = maxsize 624adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._reader, self._writer = Pipe(duplex=False) 634adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._rlock = Lock() 644adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._opid = os.getpid() 654adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if sys.platform == 'win32': 664adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._wlock = None 674adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao else: 684adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._wlock = Lock() 694adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._sem = BoundedSemaphore(maxsize) 704adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 714adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._after_fork() 724adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 734adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if sys.platform != 'win32': 744adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao register_after_fork(self, Queue._after_fork) 754adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 764adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def __getstate__(self): 774adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao assert_spawning(self) 784adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return (self._maxsize, self._reader, self._writer, 794adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._rlock, self._wlock, self._sem, self._opid) 804adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 814adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def __setstate__(self, state): 824adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao (self._maxsize, self._reader, self._writer, 834adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._rlock, self._wlock, self._sem, self._opid) = state 844adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._after_fork() 854adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 864adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def _after_fork(self): 874adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('Queue._after_fork()') 884adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._notempty = threading.Condition(threading.Lock()) 894adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._buffer = collections.deque() 904adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._thread = None 914adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._jointhread = None 924adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._joincancelled = False 934adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._closed = False 944adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._close = None 954adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._send = self._writer.send 964adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._recv = self._reader.recv 974adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._poll = self._reader.poll 984adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 994adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def put(self, obj, block=True, timeout=None): 1004adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao assert not self._closed 1014adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if not self._sem.acquire(block, timeout): 1024adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao raise Full 1034adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1044adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._notempty.acquire() 1054adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 1064adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if self._thread is None: 1074adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._start_thread() 1084adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._buffer.append(obj) 1094adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._notempty.notify() 1104adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao finally: 1114adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._notempty.release() 1124adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1134adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def get(self, block=True, timeout=None): 1144adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if block and timeout is None: 1154adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._rlock.acquire() 1164adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 1174adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao res = self._recv() 1184adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._sem.release() 1194adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return res 1204adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao finally: 1214adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._rlock.release() 1224adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1234adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao else: 1244adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if block: 1254adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao deadline = time.time() + timeout 1264adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if not self._rlock.acquire(block, timeout): 1274adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao raise Empty 1284adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 1294adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if block: 1304adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao timeout = deadline - time.time() 1314adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if timeout < 0 or not self._poll(timeout): 1324adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao raise Empty 1334adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao elif not self._poll(): 1344adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao raise Empty 1354adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao res = self._recv() 1364adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._sem.release() 1374adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return res 1384adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao finally: 1394adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._rlock.release() 1404adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1414adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def qsize(self): 1424adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao # Raises NotImplementedError on Mac OSX because of broken sem_getvalue() 1434adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return self._maxsize - self._sem._semlock._get_value() 1444adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1454adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def empty(self): 1464adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return not self._poll() 1474adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1484adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def full(self): 1494adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return self._sem._semlock._is_zero() 1504adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1514adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def get_nowait(self): 1524adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return self.get(False) 1534adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1544adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def put_nowait(self, obj): 1554adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return self.put(obj, False) 1564adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1574adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def close(self): 1584adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._closed = True 1594adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._reader.close() 1604adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if self._close: 1614adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._close() 1624adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1634adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def join_thread(self): 1644adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('Queue.join_thread()') 1654adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao assert self._closed 1664adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if self._jointhread: 1674adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._jointhread() 1684adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1694adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def cancel_join_thread(self): 1704adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('Queue.cancel_join_thread()') 1714adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._joincancelled = True 1724adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 1734adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._jointhread.cancel() 1744adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao except AttributeError: 1754adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao pass 1764adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1774adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def _start_thread(self): 1784adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('Queue._start_thread()') 1794adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1804adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao # Start thread which transfers data from buffer to pipe 1814adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._buffer.clear() 1824adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._thread = threading.Thread( 1834adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao target=Queue._feed, 1844adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao args=(self._buffer, self._notempty, self._send, 1854adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._wlock, self._writer.close), 1864adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao name='QueueFeederThread' 1874adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao ) 1884adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._thread.daemon = True 1894adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1904adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('doing self._thread.start()') 1914adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._thread.start() 1924adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('... done self._thread.start()') 1934adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 1944adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao # On process exit we will wait for data to be flushed to pipe. 1954adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if not self._joincancelled: 1964adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._jointhread = Finalize( 1974adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._thread, Queue._finalize_join, 1984adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao [weakref.ref(self._thread)], 1994adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao exitpriority=-5 2004adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao ) 2014adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 2024adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao # Send sentinel to the thread queue object when garbage collected 2034adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._close = Finalize( 2044adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self, Queue._finalize_close, 2054adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao [self._buffer, self._notempty], 2064adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao exitpriority=10 2074adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao ) 2084adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 2094adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao @staticmethod 2104adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def _finalize_join(twr): 2114adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('joining queue thread') 2124adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao thread = twr() 2134adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if thread is not None: 2144adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao thread.join() 2154adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('... queue thread joined') 2164adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao else: 2174adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('... queue thread already dead') 2184adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 2194adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao @staticmethod 2204adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def _finalize_close(buffer, notempty): 2214adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('telling queue thread to quit') 2224adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao notempty.acquire() 2234adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 2244adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao buffer.append(_sentinel) 2254adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao notempty.notify() 2264adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao finally: 2274adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao notempty.release() 2284adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 2294adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao @staticmethod 2304adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def _feed(buffer, notempty, send, writelock, close): 2314adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('starting thread to feed data to pipe') 2324adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao from .util import is_exiting 2334adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 2344adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao nacquire = notempty.acquire 2354adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao nrelease = notempty.release 2364adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao nwait = notempty.wait 2374adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao bpopleft = buffer.popleft 2384adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao sentinel = _sentinel 2394adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if sys.platform != 'win32': 2404adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao wacquire = writelock.acquire 2414adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao wrelease = writelock.release 2424adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao else: 2434adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao wacquire = None 2444adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 2454adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 2464adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao while 1: 2474adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao nacquire() 2484adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 2494adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if not buffer: 2504adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao nwait() 2514adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao finally: 2524adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao nrelease() 2534adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 2544adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao while 1: 2554adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao obj = bpopleft() 2564adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if obj is sentinel: 2574adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao debug('feeder thread got sentinel -- exiting') 2584adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao close() 2594adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return 2604adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 2614adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if wacquire is None: 2624adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao send(obj) 2634adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao else: 2644adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao wacquire() 2654adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 2664adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao send(obj) 2674adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao finally: 2684adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao wrelease() 2694adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao except IndexError: 2704adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao pass 2714adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao except Exception, e: 2724adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao # Since this runs in a daemon thread the resources it uses 2734adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao # may be become unusable while the process is cleaning up. 2744adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao # We ignore errors which happen after the process has 2754adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao # started to cleanup. 2764adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 2774adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if is_exiting(): 2784adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao info('error in queue thread: %s', e) 2794adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao else: 2804adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao import traceback 2814adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao traceback.print_exc() 2824adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao except Exception: 2834adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao pass 2844adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 2854adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao_sentinel = object() 2864adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 2874adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 2884adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# A queue type which also supports join() and task_done() methods 2894adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 2904adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Note that if you do not call task_done() for each finished task then 2914adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# eventually the counter's semaphore may overflow causing Bad Things 2924adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# to happen. 2934adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 2944adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 2954adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoclass JoinableQueue(Queue): 2964adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 2974adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def __init__(self, maxsize=0): 2984adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao Queue.__init__(self, maxsize) 2994adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._unfinished_tasks = Semaphore(0) 3004adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond = Condition() 3014adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3024adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def __getstate__(self): 3034adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks) 3044adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3054adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def __setstate__(self, state): 3064adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao Queue.__setstate__(self, state[:-2]) 3074adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond, self._unfinished_tasks = state[-2:] 3084adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3094adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def put(self, obj, block=True, timeout=None): 3104adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao assert not self._closed 3114adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if not self._sem.acquire(block, timeout): 3124adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao raise Full 3134adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3144adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._notempty.acquire() 3154adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.acquire() 3164adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 3174adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if self._thread is None: 3184adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._start_thread() 3194adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._buffer.append(obj) 3204adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._unfinished_tasks.release() 3214adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._notempty.notify() 3224adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao finally: 3234adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.release() 3244adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._notempty.release() 3254adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3264adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def task_done(self): 3274adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.acquire() 3284adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 3294adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if not self._unfinished_tasks.acquire(False): 3304adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao raise ValueError('task_done() called too many times') 3314adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if self._unfinished_tasks._semlock._is_zero(): 3324adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.notify_all() 3334adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao finally: 3344adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.release() 3354adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3364adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def join(self): 3374adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.acquire() 3384adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 3394adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if not self._unfinished_tasks._semlock._is_zero(): 3404adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.wait() 3414adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao finally: 3424adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._cond.release() 3434adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3444adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 3454adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# Simplified Queue type -- really just a locked pipe 3464adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao# 3474adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3484adfde8bc82dd39f59e0445588c3e599ada477dJosh Gaoclass SimpleQueue(object): 3494adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3504adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def __init__(self): 3514adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._reader, self._writer = Pipe(duplex=False) 3524adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._rlock = Lock() 3534adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if sys.platform == 'win32': 3544adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._wlock = None 3554adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao else: 3564adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._wlock = Lock() 3574adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._make_methods() 3584adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3594adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def empty(self): 3604adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return not self._reader.poll() 3614adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3624adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def __getstate__(self): 3634adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao assert_spawning(self) 3644adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return (self._reader, self._writer, self._rlock, self._wlock) 3654adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3664adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def __setstate__(self, state): 3674adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao (self._reader, self._writer, self._rlock, self._wlock) = state 3684adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self._make_methods() 3694adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3704adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def _make_methods(self): 3714adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao recv = self._reader.recv 3724adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao racquire, rrelease = self._rlock.acquire, self._rlock.release 3734adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def get(): 3744adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao racquire() 3754adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 3764adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return recv() 3774adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao finally: 3784adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao rrelease() 3794adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self.get = get 3804adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao 3814adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao if self._wlock is None: 3824adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao # writes to a message oriented win32 pipe are atomic 3834adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self.put = self._writer.send 3844adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao else: 3854adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao send = self._writer.send 3864adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao wacquire, wrelease = self._wlock.acquire, self._wlock.release 3874adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao def put(obj): 3884adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao wacquire() 3894adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao try: 3904adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao return send(obj) 3914adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao finally: 3924adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao wrelease() 3934adfde8bc82dd39f59e0445588c3e599ada477dJosh Gao self.put = put 394