1# Some simple queue module tests, plus some failure conditions
2# to ensure the Queue locks remain stable.
3import queue
4import time
5import unittest
6from test import support
7threading = support.import_module('threading')
8
9QUEUE_SIZE = 5
10
11def qfull(q):
12    return q.maxsize > 0 and q.qsize() == q.maxsize
13
14# A thread to run a function that unclogs a blocked Queue.
15class _TriggerThread(threading.Thread):
16    def __init__(self, fn, args):
17        self.fn = fn
18        self.args = args
19        self.startedEvent = threading.Event()
20        threading.Thread.__init__(self)
21
22    def run(self):
23        # The sleep isn't necessary, but is intended to give the blocking
24        # function in the main thread a chance at actually blocking before
25        # we unclog it.  But if the sleep is longer than the timeout-based
26        # tests wait in their blocking functions, those tests will fail.
27        # So we give them much longer timeout values compared to the
28        # sleep here (I aimed at 10 seconds for blocking functions --
29        # they should never actually wait that long - they should make
30        # progress as soon as we call self.fn()).
31        time.sleep(0.1)
32        self.startedEvent.set()
33        self.fn(*self.args)
34
35
36# Execute a function that blocks, and in a separate thread, a function that
37# triggers the release.  Returns the result of the blocking function.  Caution:
38# block_func must guarantee to block until trigger_func is called, and
39# trigger_func must guarantee to change queue state so that block_func can make
40# enough progress to return.  In particular, a block_func that just raises an
41# exception regardless of whether trigger_func is called will lead to
42# timing-dependent sporadic failures, and one of those went rarely seen but
43# undiagnosed for years.  Now block_func must be unexceptional.  If block_func
44# is supposed to raise an exception, call do_exceptional_blocking_test()
45# instead.
46
47class BlockingTestMixin:
48
49    def tearDown(self):
50        self.t = None
51
52    def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
53        self.t = _TriggerThread(trigger_func, trigger_args)
54        self.t.start()
55        self.result = block_func(*block_args)
56        # If block_func returned before our thread made the call, we failed!
57        if not self.t.startedEvent.is_set():
58            self.fail("blocking function '%r' appeared not to block" %
59                      block_func)
60        self.t.join(10) # make sure the thread terminates
61        if self.t.is_alive():
62            self.fail("trigger function '%r' appeared to not return" %
63                      trigger_func)
64        return self.result
65
66    # Call this instead if block_func is supposed to raise an exception.
67    def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,
68                                   trigger_args, expected_exception_class):
69        self.t = _TriggerThread(trigger_func, trigger_args)
70        self.t.start()
71        try:
72            try:
73                block_func(*block_args)
74            except expected_exception_class:
75                raise
76            else:
77                self.fail("expected exception of kind %r" %
78                                 expected_exception_class)
79        finally:
80            self.t.join(10) # make sure the thread terminates
81            if self.t.is_alive():
82                self.fail("trigger function '%r' appeared to not return" %
83                                 trigger_func)
84            if not self.t.startedEvent.is_set():
85                self.fail("trigger thread ended but event never set")
86
87
88class BaseQueueTestMixin(BlockingTestMixin):
89    def setUp(self):
90        self.cum = 0
91        self.cumlock = threading.Lock()
92
93    def simple_queue_test(self, q):
94        if q.qsize():
95            raise RuntimeError("Call this function with an empty queue")
96        self.assertTrue(q.empty())
97        self.assertFalse(q.full())
98        # I guess we better check things actually queue correctly a little :)
99        q.put(111)
100        q.put(333)
101        q.put(222)
102        target_order = dict(Queue = [111, 333, 222],
103                            LifoQueue = [222, 333, 111],
104                            PriorityQueue = [111, 222, 333])
105        actual_order = [q.get(), q.get(), q.get()]
106        self.assertEqual(actual_order, target_order[q.__class__.__name__],
107                         "Didn't seem to queue the correct data!")
108        for i in range(QUEUE_SIZE-1):
109            q.put(i)
110            self.assertTrue(q.qsize(), "Queue should not be empty")
111        self.assertTrue(not qfull(q), "Queue should not be full")
112        last = 2 * QUEUE_SIZE
113        full = 3 * 2 * QUEUE_SIZE
114        q.put(last)
115        self.assertTrue(qfull(q), "Queue should be full")
116        self.assertFalse(q.empty())
117        self.assertTrue(q.full())
118        try:
119            q.put(full, block=0)
120            self.fail("Didn't appear to block with a full queue")
121        except queue.Full:
122            pass
123        try:
124            q.put(full, timeout=0.01)
125            self.fail("Didn't appear to time-out with a full queue")
126        except queue.Full:
127            pass
128        # Test a blocking put
129        self.do_blocking_test(q.put, (full,), q.get, ())
130        self.do_blocking_test(q.put, (full, True, 10), q.get, ())
131        # Empty it
132        for i in range(QUEUE_SIZE):
133            q.get()
134        self.assertTrue(not q.qsize(), "Queue should be empty")
135        try:
136            q.get(block=0)
137            self.fail("Didn't appear to block with an empty queue")
138        except queue.Empty:
139            pass
140        try:
141            q.get(timeout=0.01)
142            self.fail("Didn't appear to time-out with an empty queue")
143        except queue.Empty:
144            pass
145        # Test a blocking get
146        self.do_blocking_test(q.get, (), q.put, ('empty',))
147        self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))
148
149
150    def worker(self, q):
151        while True:
152            x = q.get()
153            if x < 0:
154                q.task_done()
155                return
156            with self.cumlock:
157                self.cum += x
158            q.task_done()
159
160    def queue_join_test(self, q):
161        self.cum = 0
162        for i in (0,1):
163            threading.Thread(target=self.worker, args=(q,)).start()
164        for i in range(100):
165            q.put(i)
166        q.join()
167        self.assertEqual(self.cum, sum(range(100)),
168                         "q.join() did not block until all tasks were done")
169        for i in (0,1):
170            q.put(-1)         # instruct the threads to close
171        q.join()                # verify that you can join twice
172
173    def test_queue_task_done(self):
174        # Test to make sure a queue task completed successfully.
175        q = self.type2test()
176        try:
177            q.task_done()
178        except ValueError:
179            pass
180        else:
181            self.fail("Did not detect task count going negative")
182
183    def test_queue_join(self):
184        # Test that a queue join()s successfully, and before anything else
185        # (done twice for insurance).
186        q = self.type2test()
187        self.queue_join_test(q)
188        self.queue_join_test(q)
189        try:
190            q.task_done()
191        except ValueError:
192            pass
193        else:
194            self.fail("Did not detect task count going negative")
195
196    def test_simple_queue(self):
197        # Do it a couple of times on the same queue.
198        # Done twice to make sure works with same instance reused.
199        q = self.type2test(QUEUE_SIZE)
200        self.simple_queue_test(q)
201        self.simple_queue_test(q)
202
203    def test_negative_timeout_raises_exception(self):
204        q = self.type2test(QUEUE_SIZE)
205        with self.assertRaises(ValueError):
206            q.put(1, timeout=-1)
207        with self.assertRaises(ValueError):
208            q.get(1, timeout=-1)
209
210    def test_nowait(self):
211        q = self.type2test(QUEUE_SIZE)
212        for i in range(QUEUE_SIZE):
213            q.put_nowait(1)
214        with self.assertRaises(queue.Full):
215            q.put_nowait(1)
216
217        for i in range(QUEUE_SIZE):
218            q.get_nowait()
219        with self.assertRaises(queue.Empty):
220            q.get_nowait()
221
222    def test_shrinking_queue(self):
223        # issue 10110
224        q = self.type2test(3)
225        q.put(1)
226        q.put(2)
227        q.put(3)
228        with self.assertRaises(queue.Full):
229            q.put_nowait(4)
230        self.assertEqual(q.qsize(), 3)
231        q.maxsize = 2                       # shrink the queue
232        with self.assertRaises(queue.Full):
233            q.put_nowait(4)
234
235class QueueTest(BaseQueueTestMixin, unittest.TestCase):
236    type2test = queue.Queue
237
238class LifoQueueTest(BaseQueueTestMixin, unittest.TestCase):
239    type2test = queue.LifoQueue
240
241class PriorityQueueTest(BaseQueueTestMixin, unittest.TestCase):
242    type2test = queue.PriorityQueue
243
244
245
246# A Queue subclass that can provoke failure at a moment's notice :)
247class FailingQueueException(Exception):
248    pass
249
250class FailingQueue(queue.Queue):
251    def __init__(self, *args):
252        self.fail_next_put = False
253        self.fail_next_get = False
254        queue.Queue.__init__(self, *args)
255    def _put(self, item):
256        if self.fail_next_put:
257            self.fail_next_put = False
258            raise FailingQueueException("You Lose")
259        return queue.Queue._put(self, item)
260    def _get(self):
261        if self.fail_next_get:
262            self.fail_next_get = False
263            raise FailingQueueException("You Lose")
264        return queue.Queue._get(self)
265
266class FailingQueueTest(BlockingTestMixin, unittest.TestCase):
267
268    def failing_queue_test(self, q):
269        if q.qsize():
270            raise RuntimeError("Call this function with an empty queue")
271        for i in range(QUEUE_SIZE-1):
272            q.put(i)
273        # Test a failing non-blocking put.
274        q.fail_next_put = True
275        try:
276            q.put("oops", block=0)
277            self.fail("The queue didn't fail when it should have")
278        except FailingQueueException:
279            pass
280        q.fail_next_put = True
281        try:
282            q.put("oops", timeout=0.1)
283            self.fail("The queue didn't fail when it should have")
284        except FailingQueueException:
285            pass
286        q.put("last")
287        self.assertTrue(qfull(q), "Queue should be full")
288        # Test a failing blocking put
289        q.fail_next_put = True
290        try:
291            self.do_blocking_test(q.put, ("full",), q.get, ())
292            self.fail("The queue didn't fail when it should have")
293        except FailingQueueException:
294            pass
295        # Check the Queue isn't damaged.
296        # put failed, but get succeeded - re-add
297        q.put("last")
298        # Test a failing timeout put
299        q.fail_next_put = True
300        try:
301            self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (),
302                                              FailingQueueException)
303            self.fail("The queue didn't fail when it should have")
304        except FailingQueueException:
305            pass
306        # Check the Queue isn't damaged.
307        # put failed, but get succeeded - re-add
308        q.put("last")
309        self.assertTrue(qfull(q), "Queue should be full")
310        q.get()
311        self.assertTrue(not qfull(q), "Queue should not be full")
312        q.put("last")
313        self.assertTrue(qfull(q), "Queue should be full")
314        # Test a blocking put
315        self.do_blocking_test(q.put, ("full",), q.get, ())
316        # Empty it
317        for i in range(QUEUE_SIZE):
318            q.get()
319        self.assertTrue(not q.qsize(), "Queue should be empty")
320        q.put("first")
321        q.fail_next_get = True
322        try:
323            q.get()
324            self.fail("The queue didn't fail when it should have")
325        except FailingQueueException:
326            pass
327        self.assertTrue(q.qsize(), "Queue should not be empty")
328        q.fail_next_get = True
329        try:
330            q.get(timeout=0.1)
331            self.fail("The queue didn't fail when it should have")
332        except FailingQueueException:
333            pass
334        self.assertTrue(q.qsize(), "Queue should not be empty")
335        q.get()
336        self.assertTrue(not q.qsize(), "Queue should be empty")
337        q.fail_next_get = True
338        try:
339            self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),
340                                              FailingQueueException)
341            self.fail("The queue didn't fail when it should have")
342        except FailingQueueException:
343            pass
344        # put succeeded, but get failed.
345        self.assertTrue(q.qsize(), "Queue should not be empty")
346        q.get()
347        self.assertTrue(not q.qsize(), "Queue should be empty")
348
349    def test_failing_queue(self):
350        # Test to make sure a queue is functioning correctly.
351        # Done twice to the same instance.
352        q = FailingQueue(QUEUE_SIZE)
353        self.failing_queue_test(q)
354        self.failing_queue_test(q)
355
356
357if __name__ == "__main__":
358    unittest.main()
359