1import test.support 2 3# Skip tests if _multiprocessing wasn't built. 4test.support.import_module('_multiprocessing') 5# Skip tests if sem_open implementation is broken. 6test.support.import_module('multiprocessing.synchronize') 7# import threading after _multiprocessing to raise a more relevant error 8# message: "No module named _multiprocessing". _multiprocessing is not compiled 9# without thread support. 10test.support.import_module('threading') 11 12from test.support.script_helper import assert_python_ok 13 14import os 15import sys 16import threading 17import time 18import unittest 19import weakref 20 21from concurrent import futures 22from concurrent.futures._base import ( 23 PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) 24from concurrent.futures.process import BrokenProcessPool 25 26 27def create_future(state=PENDING, exception=None, result=None): 28 f = Future() 29 f._state = state 30 f._exception = exception 31 f._result = result 32 return f 33 34 35PENDING_FUTURE = create_future(state=PENDING) 36RUNNING_FUTURE = create_future(state=RUNNING) 37CANCELLED_FUTURE = create_future(state=CANCELLED) 38CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) 39EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError()) 40SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) 41 42 43def mul(x, y): 44 return x * y 45 46 47def sleep_and_raise(t): 48 time.sleep(t) 49 raise Exception('this is an exception') 50 51def sleep_and_print(t, msg): 52 time.sleep(t) 53 print(msg) 54 sys.stdout.flush() 55 56 57class MyObject(object): 58 def my_method(self): 59 pass 60 61 62class ExecutorMixin: 63 worker_count = 5 64 65 def setUp(self): 66 self.t1 = time.time() 67 try: 68 self.executor = self.executor_type(max_workers=self.worker_count) 69 except NotImplementedError as e: 70 self.skipTest(str(e)) 71 self._prime_executor() 72 73 def tearDown(self): 74 self.executor.shutdown(wait=True) 75 dt = time.time() - self.t1 76 if test.support.verbose: 77 print("%.2fs" % dt, end=' ') 78 self.assertLess(dt, 60, "synchronization issue: test lasted too long") 79 80 def _prime_executor(self): 81 # Make sure that the executor is ready to do work before running the 82 # tests. This should reduce the probability of timeouts in the tests. 83 futures = [self.executor.submit(time.sleep, 0.1) 84 for _ in range(self.worker_count)] 85 86 for f in futures: 87 f.result() 88 89 90class ThreadPoolMixin(ExecutorMixin): 91 executor_type = futures.ThreadPoolExecutor 92 93 94class ProcessPoolMixin(ExecutorMixin): 95 executor_type = futures.ProcessPoolExecutor 96 97 98class ExecutorShutdownTest: 99 def test_run_after_shutdown(self): 100 self.executor.shutdown() 101 self.assertRaises(RuntimeError, 102 self.executor.submit, 103 pow, 2, 5) 104 105 def test_interpreter_shutdown(self): 106 # Test the atexit hook for shutdown of worker threads and processes 107 rc, out, err = assert_python_ok('-c', """if 1: 108 from concurrent.futures import {executor_type} 109 from time import sleep 110 from test.test_concurrent_futures import sleep_and_print 111 t = {executor_type}(5) 112 t.submit(sleep_and_print, 1.0, "apple") 113 """.format(executor_type=self.executor_type.__name__)) 114 # Errors in atexit hooks don't change the process exit code, check 115 # stderr manually. 116 self.assertFalse(err) 117 self.assertEqual(out.strip(), b"apple") 118 119 def test_hang_issue12364(self): 120 fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)] 121 self.executor.shutdown() 122 for f in fs: 123 f.result() 124 125 126class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, unittest.TestCase): 127 def _prime_executor(self): 128 pass 129 130 def test_threads_terminate(self): 131 self.executor.submit(mul, 21, 2) 132 self.executor.submit(mul, 6, 7) 133 self.executor.submit(mul, 3, 14) 134 self.assertEqual(len(self.executor._threads), 3) 135 self.executor.shutdown() 136 for t in self.executor._threads: 137 t.join() 138 139 def test_context_manager_shutdown(self): 140 with futures.ThreadPoolExecutor(max_workers=5) as e: 141 executor = e 142 self.assertEqual(list(e.map(abs, range(-5, 5))), 143 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) 144 145 for t in executor._threads: 146 t.join() 147 148 def test_del_shutdown(self): 149 executor = futures.ThreadPoolExecutor(max_workers=5) 150 executor.map(abs, range(-5, 5)) 151 threads = executor._threads 152 del executor 153 154 for t in threads: 155 t.join() 156 157 def test_thread_names_assigned(self): 158 executor = futures.ThreadPoolExecutor( 159 max_workers=5, thread_name_prefix='SpecialPool') 160 executor.map(abs, range(-5, 5)) 161 threads = executor._threads 162 del executor 163 164 for t in threads: 165 self.assertRegex(t.name, r'^SpecialPool_[0-4]$') 166 t.join() 167 168 def test_thread_names_default(self): 169 executor = futures.ThreadPoolExecutor(max_workers=5) 170 executor.map(abs, range(-5, 5)) 171 threads = executor._threads 172 del executor 173 174 for t in threads: 175 # We don't particularly care what the default name is, just that 176 # it has a default name implying that it is a ThreadPoolExecutor 177 # followed by what looks like a thread number. 178 self.assertRegex(t.name, r'^.*ThreadPoolExecutor.*_[0-4]$') 179 t.join() 180 181 182class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest, unittest.TestCase): 183 def _prime_executor(self): 184 pass 185 186 def test_processes_terminate(self): 187 self.executor.submit(mul, 21, 2) 188 self.executor.submit(mul, 6, 7) 189 self.executor.submit(mul, 3, 14) 190 self.assertEqual(len(self.executor._processes), 5) 191 processes = self.executor._processes 192 self.executor.shutdown() 193 194 for p in processes.values(): 195 p.join() 196 197 def test_context_manager_shutdown(self): 198 with futures.ProcessPoolExecutor(max_workers=5) as e: 199 processes = e._processes 200 self.assertEqual(list(e.map(abs, range(-5, 5))), 201 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) 202 203 for p in processes.values(): 204 p.join() 205 206 def test_del_shutdown(self): 207 executor = futures.ProcessPoolExecutor(max_workers=5) 208 list(executor.map(abs, range(-5, 5))) 209 queue_management_thread = executor._queue_management_thread 210 processes = executor._processes 211 del executor 212 213 queue_management_thread.join() 214 for p in processes.values(): 215 p.join() 216 217 218class WaitTests: 219 220 def test_first_completed(self): 221 future1 = self.executor.submit(mul, 21, 2) 222 future2 = self.executor.submit(time.sleep, 1.5) 223 224 done, not_done = futures.wait( 225 [CANCELLED_FUTURE, future1, future2], 226 return_when=futures.FIRST_COMPLETED) 227 228 self.assertEqual(set([future1]), done) 229 self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done) 230 231 def test_first_completed_some_already_completed(self): 232 future1 = self.executor.submit(time.sleep, 1.5) 233 234 finished, pending = futures.wait( 235 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1], 236 return_when=futures.FIRST_COMPLETED) 237 238 self.assertEqual( 239 set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]), 240 finished) 241 self.assertEqual(set([future1]), pending) 242 243 def test_first_exception(self): 244 future1 = self.executor.submit(mul, 2, 21) 245 future2 = self.executor.submit(sleep_and_raise, 1.5) 246 future3 = self.executor.submit(time.sleep, 3) 247 248 finished, pending = futures.wait( 249 [future1, future2, future3], 250 return_when=futures.FIRST_EXCEPTION) 251 252 self.assertEqual(set([future1, future2]), finished) 253 self.assertEqual(set([future3]), pending) 254 255 def test_first_exception_some_already_complete(self): 256 future1 = self.executor.submit(divmod, 21, 0) 257 future2 = self.executor.submit(time.sleep, 1.5) 258 259 finished, pending = futures.wait( 260 [SUCCESSFUL_FUTURE, 261 CANCELLED_FUTURE, 262 CANCELLED_AND_NOTIFIED_FUTURE, 263 future1, future2], 264 return_when=futures.FIRST_EXCEPTION) 265 266 self.assertEqual(set([SUCCESSFUL_FUTURE, 267 CANCELLED_AND_NOTIFIED_FUTURE, 268 future1]), finished) 269 self.assertEqual(set([CANCELLED_FUTURE, future2]), pending) 270 271 def test_first_exception_one_already_failed(self): 272 future1 = self.executor.submit(time.sleep, 2) 273 274 finished, pending = futures.wait( 275 [EXCEPTION_FUTURE, future1], 276 return_when=futures.FIRST_EXCEPTION) 277 278 self.assertEqual(set([EXCEPTION_FUTURE]), finished) 279 self.assertEqual(set([future1]), pending) 280 281 def test_all_completed(self): 282 future1 = self.executor.submit(divmod, 2, 0) 283 future2 = self.executor.submit(mul, 2, 21) 284 285 finished, pending = futures.wait( 286 [SUCCESSFUL_FUTURE, 287 CANCELLED_AND_NOTIFIED_FUTURE, 288 EXCEPTION_FUTURE, 289 future1, 290 future2], 291 return_when=futures.ALL_COMPLETED) 292 293 self.assertEqual(set([SUCCESSFUL_FUTURE, 294 CANCELLED_AND_NOTIFIED_FUTURE, 295 EXCEPTION_FUTURE, 296 future1, 297 future2]), finished) 298 self.assertEqual(set(), pending) 299 300 def test_timeout(self): 301 future1 = self.executor.submit(mul, 6, 7) 302 future2 = self.executor.submit(time.sleep, 6) 303 304 finished, pending = futures.wait( 305 [CANCELLED_AND_NOTIFIED_FUTURE, 306 EXCEPTION_FUTURE, 307 SUCCESSFUL_FUTURE, 308 future1, future2], 309 timeout=5, 310 return_when=futures.ALL_COMPLETED) 311 312 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, 313 EXCEPTION_FUTURE, 314 SUCCESSFUL_FUTURE, 315 future1]), finished) 316 self.assertEqual(set([future2]), pending) 317 318 319class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, unittest.TestCase): 320 321 def test_pending_calls_race(self): 322 # Issue #14406: multi-threaded race condition when waiting on all 323 # futures. 324 event = threading.Event() 325 def future_func(): 326 event.wait() 327 oldswitchinterval = sys.getswitchinterval() 328 sys.setswitchinterval(1e-6) 329 try: 330 fs = {self.executor.submit(future_func) for i in range(100)} 331 event.set() 332 futures.wait(fs, return_when=futures.ALL_COMPLETED) 333 finally: 334 sys.setswitchinterval(oldswitchinterval) 335 336 337class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests, unittest.TestCase): 338 pass 339 340 341class AsCompletedTests: 342 # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout. 343 def test_no_timeout(self): 344 future1 = self.executor.submit(mul, 2, 21) 345 future2 = self.executor.submit(mul, 7, 6) 346 347 completed = set(futures.as_completed( 348 [CANCELLED_AND_NOTIFIED_FUTURE, 349 EXCEPTION_FUTURE, 350 SUCCESSFUL_FUTURE, 351 future1, future2])) 352 self.assertEqual(set( 353 [CANCELLED_AND_NOTIFIED_FUTURE, 354 EXCEPTION_FUTURE, 355 SUCCESSFUL_FUTURE, 356 future1, future2]), 357 completed) 358 359 def test_zero_timeout(self): 360 future1 = self.executor.submit(time.sleep, 2) 361 completed_futures = set() 362 try: 363 for future in futures.as_completed( 364 [CANCELLED_AND_NOTIFIED_FUTURE, 365 EXCEPTION_FUTURE, 366 SUCCESSFUL_FUTURE, 367 future1], 368 timeout=0): 369 completed_futures.add(future) 370 except futures.TimeoutError: 371 pass 372 373 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, 374 EXCEPTION_FUTURE, 375 SUCCESSFUL_FUTURE]), 376 completed_futures) 377 378 def test_duplicate_futures(self): 379 # Issue 20367. Duplicate futures should not raise exceptions or give 380 # duplicate responses. 381 future1 = self.executor.submit(time.sleep, 2) 382 completed = [f for f in futures.as_completed([future1,future1])] 383 self.assertEqual(len(completed), 1) 384 385 386class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests, unittest.TestCase): 387 pass 388 389 390class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests, unittest.TestCase): 391 pass 392 393 394class ExecutorTest: 395 # Executor.shutdown() and context manager usage is tested by 396 # ExecutorShutdownTest. 397 def test_submit(self): 398 future = self.executor.submit(pow, 2, 8) 399 self.assertEqual(256, future.result()) 400 401 def test_submit_keyword(self): 402 future = self.executor.submit(mul, 2, y=8) 403 self.assertEqual(16, future.result()) 404 405 def test_map(self): 406 self.assertEqual( 407 list(self.executor.map(pow, range(10), range(10))), 408 list(map(pow, range(10), range(10)))) 409 410 def test_map_exception(self): 411 i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) 412 self.assertEqual(i.__next__(), (0, 1)) 413 self.assertEqual(i.__next__(), (0, 1)) 414 self.assertRaises(ZeroDivisionError, i.__next__) 415 416 def test_map_timeout(self): 417 results = [] 418 try: 419 for i in self.executor.map(time.sleep, 420 [0, 0, 6], 421 timeout=5): 422 results.append(i) 423 except futures.TimeoutError: 424 pass 425 else: 426 self.fail('expected TimeoutError') 427 428 self.assertEqual([None, None], results) 429 430 def test_shutdown_race_issue12456(self): 431 # Issue #12456: race condition at shutdown where trying to post a 432 # sentinel in the call queue blocks (the queue is full while processes 433 # have exited). 434 self.executor.map(str, [2] * (self.worker_count + 1)) 435 self.executor.shutdown() 436 437 @test.support.cpython_only 438 def test_no_stale_references(self): 439 # Issue #16284: check that the executors don't unnecessarily hang onto 440 # references. 441 my_object = MyObject() 442 my_object_collected = threading.Event() 443 my_object_callback = weakref.ref( 444 my_object, lambda obj: my_object_collected.set()) 445 # Deliberately discarding the future. 446 self.executor.submit(my_object.my_method) 447 del my_object 448 449 collected = my_object_collected.wait(timeout=5.0) 450 self.assertTrue(collected, 451 "Stale reference not collected within timeout.") 452 453 def test_max_workers_negative(self): 454 for number in (0, -1): 455 with self.assertRaisesRegex(ValueError, 456 "max_workers must be greater " 457 "than 0"): 458 self.executor_type(max_workers=number) 459 460 461class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, unittest.TestCase): 462 def test_map_submits_without_iteration(self): 463 """Tests verifying issue 11777.""" 464 finished = [] 465 def record_finished(n): 466 finished.append(n) 467 468 self.executor.map(record_finished, range(10)) 469 self.executor.shutdown(wait=True) 470 self.assertCountEqual(finished, range(10)) 471 472 def test_default_workers(self): 473 executor = self.executor_type() 474 self.assertEqual(executor._max_workers, 475 (os.cpu_count() or 1) * 5) 476 477 478class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest, unittest.TestCase): 479 def test_killed_child(self): 480 # When a child process is abruptly terminated, the whole pool gets 481 # "broken". 482 futures = [self.executor.submit(time.sleep, 3)] 483 # Get one of the processes, and terminate (kill) it 484 p = next(iter(self.executor._processes.values())) 485 p.terminate() 486 for fut in futures: 487 self.assertRaises(BrokenProcessPool, fut.result) 488 # Submitting other jobs fails as well. 489 self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8) 490 491 def test_map_chunksize(self): 492 def bad_map(): 493 list(self.executor.map(pow, range(40), range(40), chunksize=-1)) 494 495 ref = list(map(pow, range(40), range(40))) 496 self.assertEqual( 497 list(self.executor.map(pow, range(40), range(40), chunksize=6)), 498 ref) 499 self.assertEqual( 500 list(self.executor.map(pow, range(40), range(40), chunksize=50)), 501 ref) 502 self.assertEqual( 503 list(self.executor.map(pow, range(40), range(40), chunksize=40)), 504 ref) 505 self.assertRaises(ValueError, bad_map) 506 507 @classmethod 508 def _test_traceback(cls): 509 raise RuntimeError(123) # some comment 510 511 def test_traceback(self): 512 # We want ensure that the traceback from the child process is 513 # contained in the traceback raised in the main process. 514 future = self.executor.submit(self._test_traceback) 515 with self.assertRaises(Exception) as cm: 516 future.result() 517 518 exc = cm.exception 519 self.assertIs(type(exc), RuntimeError) 520 self.assertEqual(exc.args, (123,)) 521 cause = exc.__cause__ 522 self.assertIs(type(cause), futures.process._RemoteTraceback) 523 self.assertIn('raise RuntimeError(123) # some comment', cause.tb) 524 525 with test.support.captured_stderr() as f1: 526 try: 527 raise exc 528 except RuntimeError: 529 sys.excepthook(*sys.exc_info()) 530 self.assertIn('raise RuntimeError(123) # some comment', 531 f1.getvalue()) 532 533 534class FutureTests(unittest.TestCase): 535 def test_done_callback_with_result(self): 536 callback_result = None 537 def fn(callback_future): 538 nonlocal callback_result 539 callback_result = callback_future.result() 540 541 f = Future() 542 f.add_done_callback(fn) 543 f.set_result(5) 544 self.assertEqual(5, callback_result) 545 546 def test_done_callback_with_exception(self): 547 callback_exception = None 548 def fn(callback_future): 549 nonlocal callback_exception 550 callback_exception = callback_future.exception() 551 552 f = Future() 553 f.add_done_callback(fn) 554 f.set_exception(Exception('test')) 555 self.assertEqual(('test',), callback_exception.args) 556 557 def test_done_callback_with_cancel(self): 558 was_cancelled = None 559 def fn(callback_future): 560 nonlocal was_cancelled 561 was_cancelled = callback_future.cancelled() 562 563 f = Future() 564 f.add_done_callback(fn) 565 self.assertTrue(f.cancel()) 566 self.assertTrue(was_cancelled) 567 568 def test_done_callback_raises(self): 569 with test.support.captured_stderr() as stderr: 570 raising_was_called = False 571 fn_was_called = False 572 573 def raising_fn(callback_future): 574 nonlocal raising_was_called 575 raising_was_called = True 576 raise Exception('doh!') 577 578 def fn(callback_future): 579 nonlocal fn_was_called 580 fn_was_called = True 581 582 f = Future() 583 f.add_done_callback(raising_fn) 584 f.add_done_callback(fn) 585 f.set_result(5) 586 self.assertTrue(raising_was_called) 587 self.assertTrue(fn_was_called) 588 self.assertIn('Exception: doh!', stderr.getvalue()) 589 590 def test_done_callback_already_successful(self): 591 callback_result = None 592 def fn(callback_future): 593 nonlocal callback_result 594 callback_result = callback_future.result() 595 596 f = Future() 597 f.set_result(5) 598 f.add_done_callback(fn) 599 self.assertEqual(5, callback_result) 600 601 def test_done_callback_already_failed(self): 602 callback_exception = None 603 def fn(callback_future): 604 nonlocal callback_exception 605 callback_exception = callback_future.exception() 606 607 f = Future() 608 f.set_exception(Exception('test')) 609 f.add_done_callback(fn) 610 self.assertEqual(('test',), callback_exception.args) 611 612 def test_done_callback_already_cancelled(self): 613 was_cancelled = None 614 def fn(callback_future): 615 nonlocal was_cancelled 616 was_cancelled = callback_future.cancelled() 617 618 f = Future() 619 self.assertTrue(f.cancel()) 620 f.add_done_callback(fn) 621 self.assertTrue(was_cancelled) 622 623 def test_repr(self): 624 self.assertRegex(repr(PENDING_FUTURE), 625 '<Future at 0x[0-9a-f]+ state=pending>') 626 self.assertRegex(repr(RUNNING_FUTURE), 627 '<Future at 0x[0-9a-f]+ state=running>') 628 self.assertRegex(repr(CANCELLED_FUTURE), 629 '<Future at 0x[0-9a-f]+ state=cancelled>') 630 self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE), 631 '<Future at 0x[0-9a-f]+ state=cancelled>') 632 self.assertRegex( 633 repr(EXCEPTION_FUTURE), 634 '<Future at 0x[0-9a-f]+ state=finished raised OSError>') 635 self.assertRegex( 636 repr(SUCCESSFUL_FUTURE), 637 '<Future at 0x[0-9a-f]+ state=finished returned int>') 638 639 640 def test_cancel(self): 641 f1 = create_future(state=PENDING) 642 f2 = create_future(state=RUNNING) 643 f3 = create_future(state=CANCELLED) 644 f4 = create_future(state=CANCELLED_AND_NOTIFIED) 645 f5 = create_future(state=FINISHED, exception=OSError()) 646 f6 = create_future(state=FINISHED, result=5) 647 648 self.assertTrue(f1.cancel()) 649 self.assertEqual(f1._state, CANCELLED) 650 651 self.assertFalse(f2.cancel()) 652 self.assertEqual(f2._state, RUNNING) 653 654 self.assertTrue(f3.cancel()) 655 self.assertEqual(f3._state, CANCELLED) 656 657 self.assertTrue(f4.cancel()) 658 self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED) 659 660 self.assertFalse(f5.cancel()) 661 self.assertEqual(f5._state, FINISHED) 662 663 self.assertFalse(f6.cancel()) 664 self.assertEqual(f6._state, FINISHED) 665 666 def test_cancelled(self): 667 self.assertFalse(PENDING_FUTURE.cancelled()) 668 self.assertFalse(RUNNING_FUTURE.cancelled()) 669 self.assertTrue(CANCELLED_FUTURE.cancelled()) 670 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled()) 671 self.assertFalse(EXCEPTION_FUTURE.cancelled()) 672 self.assertFalse(SUCCESSFUL_FUTURE.cancelled()) 673 674 def test_done(self): 675 self.assertFalse(PENDING_FUTURE.done()) 676 self.assertFalse(RUNNING_FUTURE.done()) 677 self.assertTrue(CANCELLED_FUTURE.done()) 678 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done()) 679 self.assertTrue(EXCEPTION_FUTURE.done()) 680 self.assertTrue(SUCCESSFUL_FUTURE.done()) 681 682 def test_running(self): 683 self.assertFalse(PENDING_FUTURE.running()) 684 self.assertTrue(RUNNING_FUTURE.running()) 685 self.assertFalse(CANCELLED_FUTURE.running()) 686 self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running()) 687 self.assertFalse(EXCEPTION_FUTURE.running()) 688 self.assertFalse(SUCCESSFUL_FUTURE.running()) 689 690 def test_result_with_timeout(self): 691 self.assertRaises(futures.TimeoutError, 692 PENDING_FUTURE.result, timeout=0) 693 self.assertRaises(futures.TimeoutError, 694 RUNNING_FUTURE.result, timeout=0) 695 self.assertRaises(futures.CancelledError, 696 CANCELLED_FUTURE.result, timeout=0) 697 self.assertRaises(futures.CancelledError, 698 CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) 699 self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0) 700 self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) 701 702 def test_result_with_success(self): 703 # TODO(brian@sweetapp.com): This test is timing dependent. 704 def notification(): 705 # Wait until the main thread is waiting for the result. 706 time.sleep(1) 707 f1.set_result(42) 708 709 f1 = create_future(state=PENDING) 710 t = threading.Thread(target=notification) 711 t.start() 712 713 self.assertEqual(f1.result(timeout=5), 42) 714 715 def test_result_with_cancel(self): 716 # TODO(brian@sweetapp.com): This test is timing dependent. 717 def notification(): 718 # Wait until the main thread is waiting for the result. 719 time.sleep(1) 720 f1.cancel() 721 722 f1 = create_future(state=PENDING) 723 t = threading.Thread(target=notification) 724 t.start() 725 726 self.assertRaises(futures.CancelledError, f1.result, timeout=5) 727 728 def test_exception_with_timeout(self): 729 self.assertRaises(futures.TimeoutError, 730 PENDING_FUTURE.exception, timeout=0) 731 self.assertRaises(futures.TimeoutError, 732 RUNNING_FUTURE.exception, timeout=0) 733 self.assertRaises(futures.CancelledError, 734 CANCELLED_FUTURE.exception, timeout=0) 735 self.assertRaises(futures.CancelledError, 736 CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) 737 self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), 738 OSError)) 739 self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) 740 741 def test_exception_with_success(self): 742 def notification(): 743 # Wait until the main thread is waiting for the exception. 744 time.sleep(1) 745 with f1._condition: 746 f1._state = FINISHED 747 f1._exception = OSError() 748 f1._condition.notify_all() 749 750 f1 = create_future(state=PENDING) 751 t = threading.Thread(target=notification) 752 t.start() 753 754 self.assertTrue(isinstance(f1.exception(timeout=5), OSError)) 755 756@test.support.reap_threads 757def test_main(): 758 try: 759 test.support.run_unittest(__name__) 760 finally: 761 test.support.reap_children() 762 763if __name__ == "__main__": 764 test_main() 765