1import test.support
2
3# Skip tests if _multiprocessing wasn't built.
4test.support.import_module('_multiprocessing')
5# Skip tests if sem_open implementation is broken.
6test.support.import_module('multiprocessing.synchronize')
7# import threading after _multiprocessing to raise a more relevant error
8# message: "No module named _multiprocessing". _multiprocessing is not compiled
9# without thread support.
10test.support.import_module('threading')
11
12from test.support.script_helper import assert_python_ok
13
14import os
15import sys
16import threading
17import time
18import unittest
19import weakref
20
21from concurrent import futures
22from concurrent.futures._base import (
23    PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
24from concurrent.futures.process import BrokenProcessPool
25
26
27def create_future(state=PENDING, exception=None, result=None):
28    f = Future()
29    f._state = state
30    f._exception = exception
31    f._result = result
32    return f
33
34
35PENDING_FUTURE = create_future(state=PENDING)
36RUNNING_FUTURE = create_future(state=RUNNING)
37CANCELLED_FUTURE = create_future(state=CANCELLED)
38CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
39EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
40SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
41
42
43def mul(x, y):
44    return x * y
45
46
47def sleep_and_raise(t):
48    time.sleep(t)
49    raise Exception('this is an exception')
50
51def sleep_and_print(t, msg):
52    time.sleep(t)
53    print(msg)
54    sys.stdout.flush()
55
56
57class MyObject(object):
58    def my_method(self):
59        pass
60
61
62class ExecutorMixin:
63    worker_count = 5
64
65    def setUp(self):
66        self.t1 = time.time()
67        try:
68            self.executor = self.executor_type(max_workers=self.worker_count)
69        except NotImplementedError as e:
70            self.skipTest(str(e))
71        self._prime_executor()
72
73    def tearDown(self):
74        self.executor.shutdown(wait=True)
75        dt = time.time() - self.t1
76        if test.support.verbose:
77            print("%.2fs" % dt, end=' ')
78        self.assertLess(dt, 60, "synchronization issue: test lasted too long")
79
80    def _prime_executor(self):
81        # Make sure that the executor is ready to do work before running the
82        # tests. This should reduce the probability of timeouts in the tests.
83        futures = [self.executor.submit(time.sleep, 0.1)
84                   for _ in range(self.worker_count)]
85
86        for f in futures:
87            f.result()
88
89
90class ThreadPoolMixin(ExecutorMixin):
91    executor_type = futures.ThreadPoolExecutor
92
93
94class ProcessPoolMixin(ExecutorMixin):
95    executor_type = futures.ProcessPoolExecutor
96
97
98class ExecutorShutdownTest:
99    def test_run_after_shutdown(self):
100        self.executor.shutdown()
101        self.assertRaises(RuntimeError,
102                          self.executor.submit,
103                          pow, 2, 5)
104
105    def test_interpreter_shutdown(self):
106        # Test the atexit hook for shutdown of worker threads and processes
107        rc, out, err = assert_python_ok('-c', """if 1:
108            from concurrent.futures import {executor_type}
109            from time import sleep
110            from test.test_concurrent_futures import sleep_and_print
111            t = {executor_type}(5)
112            t.submit(sleep_and_print, 1.0, "apple")
113            """.format(executor_type=self.executor_type.__name__))
114        # Errors in atexit hooks don't change the process exit code, check
115        # stderr manually.
116        self.assertFalse(err)
117        self.assertEqual(out.strip(), b"apple")
118
119    def test_hang_issue12364(self):
120        fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
121        self.executor.shutdown()
122        for f in fs:
123            f.result()
124
125
126class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, unittest.TestCase):
127    def _prime_executor(self):
128        pass
129
130    def test_threads_terminate(self):
131        self.executor.submit(mul, 21, 2)
132        self.executor.submit(mul, 6, 7)
133        self.executor.submit(mul, 3, 14)
134        self.assertEqual(len(self.executor._threads), 3)
135        self.executor.shutdown()
136        for t in self.executor._threads:
137            t.join()
138
139    def test_context_manager_shutdown(self):
140        with futures.ThreadPoolExecutor(max_workers=5) as e:
141            executor = e
142            self.assertEqual(list(e.map(abs, range(-5, 5))),
143                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
144
145        for t in executor._threads:
146            t.join()
147
148    def test_del_shutdown(self):
149        executor = futures.ThreadPoolExecutor(max_workers=5)
150        executor.map(abs, range(-5, 5))
151        threads = executor._threads
152        del executor
153
154        for t in threads:
155            t.join()
156
157    def test_thread_names_assigned(self):
158        executor = futures.ThreadPoolExecutor(
159            max_workers=5, thread_name_prefix='SpecialPool')
160        executor.map(abs, range(-5, 5))
161        threads = executor._threads
162        del executor
163
164        for t in threads:
165            self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
166            t.join()
167
168    def test_thread_names_default(self):
169        executor = futures.ThreadPoolExecutor(max_workers=5)
170        executor.map(abs, range(-5, 5))
171        threads = executor._threads
172        del executor
173
174        for t in threads:
175            # We don't particularly care what the default name is, just that
176            # it has a default name implying that it is a ThreadPoolExecutor
177            # followed by what looks like a thread number.
178            self.assertRegex(t.name, r'^.*ThreadPoolExecutor.*_[0-4]$')
179            t.join()
180
181
182class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest, unittest.TestCase):
183    def _prime_executor(self):
184        pass
185
186    def test_processes_terminate(self):
187        self.executor.submit(mul, 21, 2)
188        self.executor.submit(mul, 6, 7)
189        self.executor.submit(mul, 3, 14)
190        self.assertEqual(len(self.executor._processes), 5)
191        processes = self.executor._processes
192        self.executor.shutdown()
193
194        for p in processes.values():
195            p.join()
196
197    def test_context_manager_shutdown(self):
198        with futures.ProcessPoolExecutor(max_workers=5) as e:
199            processes = e._processes
200            self.assertEqual(list(e.map(abs, range(-5, 5))),
201                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
202
203        for p in processes.values():
204            p.join()
205
206    def test_del_shutdown(self):
207        executor = futures.ProcessPoolExecutor(max_workers=5)
208        list(executor.map(abs, range(-5, 5)))
209        queue_management_thread = executor._queue_management_thread
210        processes = executor._processes
211        del executor
212
213        queue_management_thread.join()
214        for p in processes.values():
215            p.join()
216
217
218class WaitTests:
219
220    def test_first_completed(self):
221        future1 = self.executor.submit(mul, 21, 2)
222        future2 = self.executor.submit(time.sleep, 1.5)
223
224        done, not_done = futures.wait(
225                [CANCELLED_FUTURE, future1, future2],
226                 return_when=futures.FIRST_COMPLETED)
227
228        self.assertEqual(set([future1]), done)
229        self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
230
231    def test_first_completed_some_already_completed(self):
232        future1 = self.executor.submit(time.sleep, 1.5)
233
234        finished, pending = futures.wait(
235                 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
236                 return_when=futures.FIRST_COMPLETED)
237
238        self.assertEqual(
239                set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
240                finished)
241        self.assertEqual(set([future1]), pending)
242
243    def test_first_exception(self):
244        future1 = self.executor.submit(mul, 2, 21)
245        future2 = self.executor.submit(sleep_and_raise, 1.5)
246        future3 = self.executor.submit(time.sleep, 3)
247
248        finished, pending = futures.wait(
249                [future1, future2, future3],
250                return_when=futures.FIRST_EXCEPTION)
251
252        self.assertEqual(set([future1, future2]), finished)
253        self.assertEqual(set([future3]), pending)
254
255    def test_first_exception_some_already_complete(self):
256        future1 = self.executor.submit(divmod, 21, 0)
257        future2 = self.executor.submit(time.sleep, 1.5)
258
259        finished, pending = futures.wait(
260                [SUCCESSFUL_FUTURE,
261                 CANCELLED_FUTURE,
262                 CANCELLED_AND_NOTIFIED_FUTURE,
263                 future1, future2],
264                return_when=futures.FIRST_EXCEPTION)
265
266        self.assertEqual(set([SUCCESSFUL_FUTURE,
267                              CANCELLED_AND_NOTIFIED_FUTURE,
268                              future1]), finished)
269        self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
270
271    def test_first_exception_one_already_failed(self):
272        future1 = self.executor.submit(time.sleep, 2)
273
274        finished, pending = futures.wait(
275                 [EXCEPTION_FUTURE, future1],
276                 return_when=futures.FIRST_EXCEPTION)
277
278        self.assertEqual(set([EXCEPTION_FUTURE]), finished)
279        self.assertEqual(set([future1]), pending)
280
281    def test_all_completed(self):
282        future1 = self.executor.submit(divmod, 2, 0)
283        future2 = self.executor.submit(mul, 2, 21)
284
285        finished, pending = futures.wait(
286                [SUCCESSFUL_FUTURE,
287                 CANCELLED_AND_NOTIFIED_FUTURE,
288                 EXCEPTION_FUTURE,
289                 future1,
290                 future2],
291                return_when=futures.ALL_COMPLETED)
292
293        self.assertEqual(set([SUCCESSFUL_FUTURE,
294                              CANCELLED_AND_NOTIFIED_FUTURE,
295                              EXCEPTION_FUTURE,
296                              future1,
297                              future2]), finished)
298        self.assertEqual(set(), pending)
299
300    def test_timeout(self):
301        future1 = self.executor.submit(mul, 6, 7)
302        future2 = self.executor.submit(time.sleep, 6)
303
304        finished, pending = futures.wait(
305                [CANCELLED_AND_NOTIFIED_FUTURE,
306                 EXCEPTION_FUTURE,
307                 SUCCESSFUL_FUTURE,
308                 future1, future2],
309                timeout=5,
310                return_when=futures.ALL_COMPLETED)
311
312        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
313                              EXCEPTION_FUTURE,
314                              SUCCESSFUL_FUTURE,
315                              future1]), finished)
316        self.assertEqual(set([future2]), pending)
317
318
319class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, unittest.TestCase):
320
321    def test_pending_calls_race(self):
322        # Issue #14406: multi-threaded race condition when waiting on all
323        # futures.
324        event = threading.Event()
325        def future_func():
326            event.wait()
327        oldswitchinterval = sys.getswitchinterval()
328        sys.setswitchinterval(1e-6)
329        try:
330            fs = {self.executor.submit(future_func) for i in range(100)}
331            event.set()
332            futures.wait(fs, return_when=futures.ALL_COMPLETED)
333        finally:
334            sys.setswitchinterval(oldswitchinterval)
335
336
337class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests, unittest.TestCase):
338    pass
339
340
341class AsCompletedTests:
342    # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
343    def test_no_timeout(self):
344        future1 = self.executor.submit(mul, 2, 21)
345        future2 = self.executor.submit(mul, 7, 6)
346
347        completed = set(futures.as_completed(
348                [CANCELLED_AND_NOTIFIED_FUTURE,
349                 EXCEPTION_FUTURE,
350                 SUCCESSFUL_FUTURE,
351                 future1, future2]))
352        self.assertEqual(set(
353                [CANCELLED_AND_NOTIFIED_FUTURE,
354                 EXCEPTION_FUTURE,
355                 SUCCESSFUL_FUTURE,
356                 future1, future2]),
357                completed)
358
359    def test_zero_timeout(self):
360        future1 = self.executor.submit(time.sleep, 2)
361        completed_futures = set()
362        try:
363            for future in futures.as_completed(
364                    [CANCELLED_AND_NOTIFIED_FUTURE,
365                     EXCEPTION_FUTURE,
366                     SUCCESSFUL_FUTURE,
367                     future1],
368                    timeout=0):
369                completed_futures.add(future)
370        except futures.TimeoutError:
371            pass
372
373        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
374                              EXCEPTION_FUTURE,
375                              SUCCESSFUL_FUTURE]),
376                         completed_futures)
377
378    def test_duplicate_futures(self):
379        # Issue 20367. Duplicate futures should not raise exceptions or give
380        # duplicate responses.
381        future1 = self.executor.submit(time.sleep, 2)
382        completed = [f for f in futures.as_completed([future1,future1])]
383        self.assertEqual(len(completed), 1)
384
385
386class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests, unittest.TestCase):
387    pass
388
389
390class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests, unittest.TestCase):
391    pass
392
393
394class ExecutorTest:
395    # Executor.shutdown() and context manager usage is tested by
396    # ExecutorShutdownTest.
397    def test_submit(self):
398        future = self.executor.submit(pow, 2, 8)
399        self.assertEqual(256, future.result())
400
401    def test_submit_keyword(self):
402        future = self.executor.submit(mul, 2, y=8)
403        self.assertEqual(16, future.result())
404
405    def test_map(self):
406        self.assertEqual(
407                list(self.executor.map(pow, range(10), range(10))),
408                list(map(pow, range(10), range(10))))
409
410    def test_map_exception(self):
411        i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
412        self.assertEqual(i.__next__(), (0, 1))
413        self.assertEqual(i.__next__(), (0, 1))
414        self.assertRaises(ZeroDivisionError, i.__next__)
415
416    def test_map_timeout(self):
417        results = []
418        try:
419            for i in self.executor.map(time.sleep,
420                                       [0, 0, 6],
421                                       timeout=5):
422                results.append(i)
423        except futures.TimeoutError:
424            pass
425        else:
426            self.fail('expected TimeoutError')
427
428        self.assertEqual([None, None], results)
429
430    def test_shutdown_race_issue12456(self):
431        # Issue #12456: race condition at shutdown where trying to post a
432        # sentinel in the call queue blocks (the queue is full while processes
433        # have exited).
434        self.executor.map(str, [2] * (self.worker_count + 1))
435        self.executor.shutdown()
436
437    @test.support.cpython_only
438    def test_no_stale_references(self):
439        # Issue #16284: check that the executors don't unnecessarily hang onto
440        # references.
441        my_object = MyObject()
442        my_object_collected = threading.Event()
443        my_object_callback = weakref.ref(
444            my_object, lambda obj: my_object_collected.set())
445        # Deliberately discarding the future.
446        self.executor.submit(my_object.my_method)
447        del my_object
448
449        collected = my_object_collected.wait(timeout=5.0)
450        self.assertTrue(collected,
451                        "Stale reference not collected within timeout.")
452
453    def test_max_workers_negative(self):
454        for number in (0, -1):
455            with self.assertRaisesRegex(ValueError,
456                                        "max_workers must be greater "
457                                        "than 0"):
458                self.executor_type(max_workers=number)
459
460
461class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, unittest.TestCase):
462    def test_map_submits_without_iteration(self):
463        """Tests verifying issue 11777."""
464        finished = []
465        def record_finished(n):
466            finished.append(n)
467
468        self.executor.map(record_finished, range(10))
469        self.executor.shutdown(wait=True)
470        self.assertCountEqual(finished, range(10))
471
472    def test_default_workers(self):
473        executor = self.executor_type()
474        self.assertEqual(executor._max_workers,
475                         (os.cpu_count() or 1) * 5)
476
477
478class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest, unittest.TestCase):
479    def test_killed_child(self):
480        # When a child process is abruptly terminated, the whole pool gets
481        # "broken".
482        futures = [self.executor.submit(time.sleep, 3)]
483        # Get one of the processes, and terminate (kill) it
484        p = next(iter(self.executor._processes.values()))
485        p.terminate()
486        for fut in futures:
487            self.assertRaises(BrokenProcessPool, fut.result)
488        # Submitting other jobs fails as well.
489        self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
490
491    def test_map_chunksize(self):
492        def bad_map():
493            list(self.executor.map(pow, range(40), range(40), chunksize=-1))
494
495        ref = list(map(pow, range(40), range(40)))
496        self.assertEqual(
497            list(self.executor.map(pow, range(40), range(40), chunksize=6)),
498            ref)
499        self.assertEqual(
500            list(self.executor.map(pow, range(40), range(40), chunksize=50)),
501            ref)
502        self.assertEqual(
503            list(self.executor.map(pow, range(40), range(40), chunksize=40)),
504            ref)
505        self.assertRaises(ValueError, bad_map)
506
507    @classmethod
508    def _test_traceback(cls):
509        raise RuntimeError(123) # some comment
510
511    def test_traceback(self):
512        # We want ensure that the traceback from the child process is
513        # contained in the traceback raised in the main process.
514        future = self.executor.submit(self._test_traceback)
515        with self.assertRaises(Exception) as cm:
516            future.result()
517
518        exc = cm.exception
519        self.assertIs(type(exc), RuntimeError)
520        self.assertEqual(exc.args, (123,))
521        cause = exc.__cause__
522        self.assertIs(type(cause), futures.process._RemoteTraceback)
523        self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
524
525        with test.support.captured_stderr() as f1:
526            try:
527                raise exc
528            except RuntimeError:
529                sys.excepthook(*sys.exc_info())
530        self.assertIn('raise RuntimeError(123) # some comment',
531                      f1.getvalue())
532
533
534class FutureTests(unittest.TestCase):
535    def test_done_callback_with_result(self):
536        callback_result = None
537        def fn(callback_future):
538            nonlocal callback_result
539            callback_result = callback_future.result()
540
541        f = Future()
542        f.add_done_callback(fn)
543        f.set_result(5)
544        self.assertEqual(5, callback_result)
545
546    def test_done_callback_with_exception(self):
547        callback_exception = None
548        def fn(callback_future):
549            nonlocal callback_exception
550            callback_exception = callback_future.exception()
551
552        f = Future()
553        f.add_done_callback(fn)
554        f.set_exception(Exception('test'))
555        self.assertEqual(('test',), callback_exception.args)
556
557    def test_done_callback_with_cancel(self):
558        was_cancelled = None
559        def fn(callback_future):
560            nonlocal was_cancelled
561            was_cancelled = callback_future.cancelled()
562
563        f = Future()
564        f.add_done_callback(fn)
565        self.assertTrue(f.cancel())
566        self.assertTrue(was_cancelled)
567
568    def test_done_callback_raises(self):
569        with test.support.captured_stderr() as stderr:
570            raising_was_called = False
571            fn_was_called = False
572
573            def raising_fn(callback_future):
574                nonlocal raising_was_called
575                raising_was_called = True
576                raise Exception('doh!')
577
578            def fn(callback_future):
579                nonlocal fn_was_called
580                fn_was_called = True
581
582            f = Future()
583            f.add_done_callback(raising_fn)
584            f.add_done_callback(fn)
585            f.set_result(5)
586            self.assertTrue(raising_was_called)
587            self.assertTrue(fn_was_called)
588            self.assertIn('Exception: doh!', stderr.getvalue())
589
590    def test_done_callback_already_successful(self):
591        callback_result = None
592        def fn(callback_future):
593            nonlocal callback_result
594            callback_result = callback_future.result()
595
596        f = Future()
597        f.set_result(5)
598        f.add_done_callback(fn)
599        self.assertEqual(5, callback_result)
600
601    def test_done_callback_already_failed(self):
602        callback_exception = None
603        def fn(callback_future):
604            nonlocal callback_exception
605            callback_exception = callback_future.exception()
606
607        f = Future()
608        f.set_exception(Exception('test'))
609        f.add_done_callback(fn)
610        self.assertEqual(('test',), callback_exception.args)
611
612    def test_done_callback_already_cancelled(self):
613        was_cancelled = None
614        def fn(callback_future):
615            nonlocal was_cancelled
616            was_cancelled = callback_future.cancelled()
617
618        f = Future()
619        self.assertTrue(f.cancel())
620        f.add_done_callback(fn)
621        self.assertTrue(was_cancelled)
622
623    def test_repr(self):
624        self.assertRegex(repr(PENDING_FUTURE),
625                         '<Future at 0x[0-9a-f]+ state=pending>')
626        self.assertRegex(repr(RUNNING_FUTURE),
627                         '<Future at 0x[0-9a-f]+ state=running>')
628        self.assertRegex(repr(CANCELLED_FUTURE),
629                         '<Future at 0x[0-9a-f]+ state=cancelled>')
630        self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
631                         '<Future at 0x[0-9a-f]+ state=cancelled>')
632        self.assertRegex(
633                repr(EXCEPTION_FUTURE),
634                '<Future at 0x[0-9a-f]+ state=finished raised OSError>')
635        self.assertRegex(
636                repr(SUCCESSFUL_FUTURE),
637                '<Future at 0x[0-9a-f]+ state=finished returned int>')
638
639
640    def test_cancel(self):
641        f1 = create_future(state=PENDING)
642        f2 = create_future(state=RUNNING)
643        f3 = create_future(state=CANCELLED)
644        f4 = create_future(state=CANCELLED_AND_NOTIFIED)
645        f5 = create_future(state=FINISHED, exception=OSError())
646        f6 = create_future(state=FINISHED, result=5)
647
648        self.assertTrue(f1.cancel())
649        self.assertEqual(f1._state, CANCELLED)
650
651        self.assertFalse(f2.cancel())
652        self.assertEqual(f2._state, RUNNING)
653
654        self.assertTrue(f3.cancel())
655        self.assertEqual(f3._state, CANCELLED)
656
657        self.assertTrue(f4.cancel())
658        self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
659
660        self.assertFalse(f5.cancel())
661        self.assertEqual(f5._state, FINISHED)
662
663        self.assertFalse(f6.cancel())
664        self.assertEqual(f6._state, FINISHED)
665
666    def test_cancelled(self):
667        self.assertFalse(PENDING_FUTURE.cancelled())
668        self.assertFalse(RUNNING_FUTURE.cancelled())
669        self.assertTrue(CANCELLED_FUTURE.cancelled())
670        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
671        self.assertFalse(EXCEPTION_FUTURE.cancelled())
672        self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
673
674    def test_done(self):
675        self.assertFalse(PENDING_FUTURE.done())
676        self.assertFalse(RUNNING_FUTURE.done())
677        self.assertTrue(CANCELLED_FUTURE.done())
678        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
679        self.assertTrue(EXCEPTION_FUTURE.done())
680        self.assertTrue(SUCCESSFUL_FUTURE.done())
681
682    def test_running(self):
683        self.assertFalse(PENDING_FUTURE.running())
684        self.assertTrue(RUNNING_FUTURE.running())
685        self.assertFalse(CANCELLED_FUTURE.running())
686        self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
687        self.assertFalse(EXCEPTION_FUTURE.running())
688        self.assertFalse(SUCCESSFUL_FUTURE.running())
689
690    def test_result_with_timeout(self):
691        self.assertRaises(futures.TimeoutError,
692                          PENDING_FUTURE.result, timeout=0)
693        self.assertRaises(futures.TimeoutError,
694                          RUNNING_FUTURE.result, timeout=0)
695        self.assertRaises(futures.CancelledError,
696                          CANCELLED_FUTURE.result, timeout=0)
697        self.assertRaises(futures.CancelledError,
698                          CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
699        self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
700        self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
701
702    def test_result_with_success(self):
703        # TODO(brian@sweetapp.com): This test is timing dependent.
704        def notification():
705            # Wait until the main thread is waiting for the result.
706            time.sleep(1)
707            f1.set_result(42)
708
709        f1 = create_future(state=PENDING)
710        t = threading.Thread(target=notification)
711        t.start()
712
713        self.assertEqual(f1.result(timeout=5), 42)
714
715    def test_result_with_cancel(self):
716        # TODO(brian@sweetapp.com): This test is timing dependent.
717        def notification():
718            # Wait until the main thread is waiting for the result.
719            time.sleep(1)
720            f1.cancel()
721
722        f1 = create_future(state=PENDING)
723        t = threading.Thread(target=notification)
724        t.start()
725
726        self.assertRaises(futures.CancelledError, f1.result, timeout=5)
727
728    def test_exception_with_timeout(self):
729        self.assertRaises(futures.TimeoutError,
730                          PENDING_FUTURE.exception, timeout=0)
731        self.assertRaises(futures.TimeoutError,
732                          RUNNING_FUTURE.exception, timeout=0)
733        self.assertRaises(futures.CancelledError,
734                          CANCELLED_FUTURE.exception, timeout=0)
735        self.assertRaises(futures.CancelledError,
736                          CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
737        self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
738                                   OSError))
739        self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
740
741    def test_exception_with_success(self):
742        def notification():
743            # Wait until the main thread is waiting for the exception.
744            time.sleep(1)
745            with f1._condition:
746                f1._state = FINISHED
747                f1._exception = OSError()
748                f1._condition.notify_all()
749
750        f1 = create_future(state=PENDING)
751        t = threading.Thread(target=notification)
752        t.start()
753
754        self.assertTrue(isinstance(f1.exception(timeout=5), OSError))
755
756@test.support.reap_threads
757def test_main():
758    try:
759        test.support.run_unittest(__name__)
760    finally:
761        test.support.reap_children()
762
763if __name__ == "__main__":
764    test_main()
765