1import unittest 2from test.support import (verbose, refcount_test, run_unittest, 3 strip_python_stderr, cpython_only, start_threads, 4 temp_dir, requires_type_collecting) 5from test.support.script_helper import assert_python_ok, make_script 6 7import sys 8import time 9import gc 10import weakref 11 12try: 13 import threading 14except ImportError: 15 threading = None 16 17try: 18 from _testcapi import with_tp_del 19except ImportError: 20 def with_tp_del(cls): 21 class C(object): 22 def __new__(cls, *args, **kwargs): 23 raise TypeError('requires _testcapi.with_tp_del') 24 return C 25 26### Support code 27############################################################################### 28 29# Bug 1055820 has several tests of longstanding bugs involving weakrefs and 30# cyclic gc. 31 32# An instance of C1055820 has a self-loop, so becomes cyclic trash when 33# unreachable. 34class C1055820(object): 35 def __init__(self, i): 36 self.i = i 37 self.loop = self 38 39class GC_Detector(object): 40 # Create an instance I. Then gc hasn't happened again so long as 41 # I.gc_happened is false. 42 43 def __init__(self): 44 self.gc_happened = False 45 46 def it_happened(ignored): 47 self.gc_happened = True 48 49 # Create a piece of cyclic trash that triggers it_happened when 50 # gc collects it. 51 self.wr = weakref.ref(C1055820(666), it_happened) 52 53@with_tp_del 54class Uncollectable(object): 55 """Create a reference cycle with multiple __del__ methods. 56 57 An object in a reference cycle will never have zero references, 58 and so must be garbage collected. If one or more objects in the 59 cycle have __del__ methods, the gc refuses to guess an order, 60 and leaves the cycle uncollected.""" 61 def __init__(self, partner=None): 62 if partner is None: 63 self.partner = Uncollectable(partner=self) 64 else: 65 self.partner = partner 66 def __tp_del__(self): 67 pass 68 69### Tests 70############################################################################### 71 72class GCTests(unittest.TestCase): 73 def test_list(self): 74 l = [] 75 l.append(l) 76 gc.collect() 77 del l 78 self.assertEqual(gc.collect(), 1) 79 80 def test_dict(self): 81 d = {} 82 d[1] = d 83 gc.collect() 84 del d 85 self.assertEqual(gc.collect(), 1) 86 87 def test_tuple(self): 88 # since tuples are immutable we close the loop with a list 89 l = [] 90 t = (l,) 91 l.append(t) 92 gc.collect() 93 del t 94 del l 95 self.assertEqual(gc.collect(), 2) 96 97 def test_class(self): 98 class A: 99 pass 100 A.a = A 101 gc.collect() 102 del A 103 self.assertNotEqual(gc.collect(), 0) 104 105 def test_newstyleclass(self): 106 class A(object): 107 pass 108 gc.collect() 109 del A 110 self.assertNotEqual(gc.collect(), 0) 111 112 def test_instance(self): 113 class A: 114 pass 115 a = A() 116 a.a = a 117 gc.collect() 118 del a 119 self.assertNotEqual(gc.collect(), 0) 120 121 @requires_type_collecting 122 def test_newinstance(self): 123 class A(object): 124 pass 125 a = A() 126 a.a = a 127 gc.collect() 128 del a 129 self.assertNotEqual(gc.collect(), 0) 130 class B(list): 131 pass 132 class C(B, A): 133 pass 134 a = C() 135 a.a = a 136 gc.collect() 137 del a 138 self.assertNotEqual(gc.collect(), 0) 139 del B, C 140 self.assertNotEqual(gc.collect(), 0) 141 A.a = A() 142 del A 143 self.assertNotEqual(gc.collect(), 0) 144 self.assertEqual(gc.collect(), 0) 145 146 def test_method(self): 147 # Tricky: self.__init__ is a bound method, it references the instance. 148 class A: 149 def __init__(self): 150 self.init = self.__init__ 151 a = A() 152 gc.collect() 153 del a 154 self.assertNotEqual(gc.collect(), 0) 155 156 @cpython_only 157 def test_legacy_finalizer(self): 158 # A() is uncollectable if it is part of a cycle, make sure it shows up 159 # in gc.garbage. 160 @with_tp_del 161 class A: 162 def __tp_del__(self): pass 163 class B: 164 pass 165 a = A() 166 a.a = a 167 id_a = id(a) 168 b = B() 169 b.b = b 170 gc.collect() 171 del a 172 del b 173 self.assertNotEqual(gc.collect(), 0) 174 for obj in gc.garbage: 175 if id(obj) == id_a: 176 del obj.a 177 break 178 else: 179 self.fail("didn't find obj in garbage (finalizer)") 180 gc.garbage.remove(obj) 181 182 @cpython_only 183 def test_legacy_finalizer_newclass(self): 184 # A() is uncollectable if it is part of a cycle, make sure it shows up 185 # in gc.garbage. 186 @with_tp_del 187 class A(object): 188 def __tp_del__(self): pass 189 class B(object): 190 pass 191 a = A() 192 a.a = a 193 id_a = id(a) 194 b = B() 195 b.b = b 196 gc.collect() 197 del a 198 del b 199 self.assertNotEqual(gc.collect(), 0) 200 for obj in gc.garbage: 201 if id(obj) == id_a: 202 del obj.a 203 break 204 else: 205 self.fail("didn't find obj in garbage (finalizer)") 206 gc.garbage.remove(obj) 207 208 def test_function(self): 209 # Tricky: f -> d -> f, code should call d.clear() after the exec to 210 # break the cycle. 211 d = {} 212 exec("def f(): pass\n", d) 213 gc.collect() 214 del d 215 self.assertEqual(gc.collect(), 2) 216 217 @refcount_test 218 def test_frame(self): 219 def f(): 220 frame = sys._getframe() 221 gc.collect() 222 f() 223 self.assertEqual(gc.collect(), 1) 224 225 def test_saveall(self): 226 # Verify that cyclic garbage like lists show up in gc.garbage if the 227 # SAVEALL option is enabled. 228 229 # First make sure we don't save away other stuff that just happens to 230 # be waiting for collection. 231 gc.collect() 232 # if this fails, someone else created immortal trash 233 self.assertEqual(gc.garbage, []) 234 235 L = [] 236 L.append(L) 237 id_L = id(L) 238 239 debug = gc.get_debug() 240 gc.set_debug(debug | gc.DEBUG_SAVEALL) 241 del L 242 gc.collect() 243 gc.set_debug(debug) 244 245 self.assertEqual(len(gc.garbage), 1) 246 obj = gc.garbage.pop() 247 self.assertEqual(id(obj), id_L) 248 249 def test_del(self): 250 # __del__ methods can trigger collection, make this to happen 251 thresholds = gc.get_threshold() 252 gc.enable() 253 gc.set_threshold(1) 254 255 class A: 256 def __del__(self): 257 dir(self) 258 a = A() 259 del a 260 261 gc.disable() 262 gc.set_threshold(*thresholds) 263 264 def test_del_newclass(self): 265 # __del__ methods can trigger collection, make this to happen 266 thresholds = gc.get_threshold() 267 gc.enable() 268 gc.set_threshold(1) 269 270 class A(object): 271 def __del__(self): 272 dir(self) 273 a = A() 274 del a 275 276 gc.disable() 277 gc.set_threshold(*thresholds) 278 279 # The following two tests are fragile: 280 # They precisely count the number of allocations, 281 # which is highly implementation-dependent. 282 # For example, disposed tuples are not freed, but reused. 283 # To minimize variations, though, we first store the get_count() results 284 # and check them at the end. 285 @refcount_test 286 def test_get_count(self): 287 gc.collect() 288 a, b, c = gc.get_count() 289 x = [] 290 d, e, f = gc.get_count() 291 self.assertEqual((b, c), (0, 0)) 292 self.assertEqual((e, f), (0, 0)) 293 # This is less fragile than asserting that a equals 0. 294 self.assertLess(a, 5) 295 # Between the two calls to get_count(), at least one object was 296 # created (the list). 297 self.assertGreater(d, a) 298 299 @refcount_test 300 def test_collect_generations(self): 301 gc.collect() 302 # This object will "trickle" into generation N + 1 after 303 # each call to collect(N) 304 x = [] 305 gc.collect(0) 306 # x is now in gen 1 307 a, b, c = gc.get_count() 308 gc.collect(1) 309 # x is now in gen 2 310 d, e, f = gc.get_count() 311 gc.collect(2) 312 # x is now in gen 3 313 g, h, i = gc.get_count() 314 # We don't check a, d, g since their exact values depends on 315 # internal implementation details of the interpreter. 316 self.assertEqual((b, c), (1, 0)) 317 self.assertEqual((e, f), (0, 1)) 318 self.assertEqual((h, i), (0, 0)) 319 320 def test_trashcan(self): 321 class Ouch: 322 n = 0 323 def __del__(self): 324 Ouch.n = Ouch.n + 1 325 if Ouch.n % 17 == 0: 326 gc.collect() 327 328 # "trashcan" is a hack to prevent stack overflow when deallocating 329 # very deeply nested tuples etc. It works in part by abusing the 330 # type pointer and refcount fields, and that can yield horrible 331 # problems when gc tries to traverse the structures. 332 # If this test fails (as it does in 2.0, 2.1 and 2.2), it will 333 # most likely die via segfault. 334 335 # Note: In 2.3 the possibility for compiling without cyclic gc was 336 # removed, and that in turn allows the trashcan mechanism to work 337 # via much simpler means (e.g., it never abuses the type pointer or 338 # refcount fields anymore). Since it's much less likely to cause a 339 # problem now, the various constants in this expensive (we force a lot 340 # of full collections) test are cut back from the 2.2 version. 341 gc.enable() 342 N = 150 343 for count in range(2): 344 t = [] 345 for i in range(N): 346 t = [t, Ouch()] 347 u = [] 348 for i in range(N): 349 u = [u, Ouch()] 350 v = {} 351 for i in range(N): 352 v = {1: v, 2: Ouch()} 353 gc.disable() 354 355 @unittest.skipUnless(threading, "test meaningless on builds without threads") 356 def test_trashcan_threads(self): 357 # Issue #13992: trashcan mechanism should be thread-safe 358 NESTING = 60 359 N_THREADS = 2 360 361 def sleeper_gen(): 362 """A generator that releases the GIL when closed or dealloc'ed.""" 363 try: 364 yield 365 finally: 366 time.sleep(0.000001) 367 368 class C(list): 369 # Appending to a list is atomic, which avoids the use of a lock. 370 inits = [] 371 dels = [] 372 def __init__(self, alist): 373 self[:] = alist 374 C.inits.append(None) 375 def __del__(self): 376 # This __del__ is called by subtype_dealloc(). 377 C.dels.append(None) 378 # `g` will release the GIL when garbage-collected. This 379 # helps assert subtype_dealloc's behaviour when threads 380 # switch in the middle of it. 381 g = sleeper_gen() 382 next(g) 383 # Now that __del__ is finished, subtype_dealloc will proceed 384 # to call list_dealloc, which also uses the trashcan mechanism. 385 386 def make_nested(): 387 """Create a sufficiently nested container object so that the 388 trashcan mechanism is invoked when deallocating it.""" 389 x = C([]) 390 for i in range(NESTING): 391 x = [C([x])] 392 del x 393 394 def run_thread(): 395 """Exercise make_nested() in a loop.""" 396 while not exit: 397 make_nested() 398 399 old_switchinterval = sys.getswitchinterval() 400 sys.setswitchinterval(1e-5) 401 try: 402 exit = [] 403 threads = [] 404 for i in range(N_THREADS): 405 t = threading.Thread(target=run_thread) 406 threads.append(t) 407 with start_threads(threads, lambda: exit.append(1)): 408 time.sleep(1.0) 409 finally: 410 sys.setswitchinterval(old_switchinterval) 411 gc.collect() 412 self.assertEqual(len(C.inits), len(C.dels)) 413 414 def test_boom(self): 415 class Boom: 416 def __getattr__(self, someattribute): 417 del self.attr 418 raise AttributeError 419 420 a = Boom() 421 b = Boom() 422 a.attr = b 423 b.attr = a 424 425 gc.collect() 426 garbagelen = len(gc.garbage) 427 del a, b 428 # a<->b are in a trash cycle now. Collection will invoke 429 # Boom.__getattr__ (to see whether a and b have __del__ methods), and 430 # __getattr__ deletes the internal "attr" attributes as a side effect. 431 # That causes the trash cycle to get reclaimed via refcounts falling to 432 # 0, thus mutating the trash graph as a side effect of merely asking 433 # whether __del__ exists. This used to (before 2.3b1) crash Python. 434 # Now __getattr__ isn't called. 435 self.assertEqual(gc.collect(), 4) 436 self.assertEqual(len(gc.garbage), garbagelen) 437 438 def test_boom2(self): 439 class Boom2: 440 def __init__(self): 441 self.x = 0 442 443 def __getattr__(self, someattribute): 444 self.x += 1 445 if self.x > 1: 446 del self.attr 447 raise AttributeError 448 449 a = Boom2() 450 b = Boom2() 451 a.attr = b 452 b.attr = a 453 454 gc.collect() 455 garbagelen = len(gc.garbage) 456 del a, b 457 # Much like test_boom(), except that __getattr__ doesn't break the 458 # cycle until the second time gc checks for __del__. As of 2.3b1, 459 # there isn't a second time, so this simply cleans up the trash cycle. 460 # We expect a, b, a.__dict__ and b.__dict__ (4 objects) to get 461 # reclaimed this way. 462 self.assertEqual(gc.collect(), 4) 463 self.assertEqual(len(gc.garbage), garbagelen) 464 465 def test_boom_new(self): 466 # boom__new and boom2_new are exactly like boom and boom2, except use 467 # new-style classes. 468 469 class Boom_New(object): 470 def __getattr__(self, someattribute): 471 del self.attr 472 raise AttributeError 473 474 a = Boom_New() 475 b = Boom_New() 476 a.attr = b 477 b.attr = a 478 479 gc.collect() 480 garbagelen = len(gc.garbage) 481 del a, b 482 self.assertEqual(gc.collect(), 4) 483 self.assertEqual(len(gc.garbage), garbagelen) 484 485 def test_boom2_new(self): 486 class Boom2_New(object): 487 def __init__(self): 488 self.x = 0 489 490 def __getattr__(self, someattribute): 491 self.x += 1 492 if self.x > 1: 493 del self.attr 494 raise AttributeError 495 496 a = Boom2_New() 497 b = Boom2_New() 498 a.attr = b 499 b.attr = a 500 501 gc.collect() 502 garbagelen = len(gc.garbage) 503 del a, b 504 self.assertEqual(gc.collect(), 4) 505 self.assertEqual(len(gc.garbage), garbagelen) 506 507 def test_get_referents(self): 508 alist = [1, 3, 5] 509 got = gc.get_referents(alist) 510 got.sort() 511 self.assertEqual(got, alist) 512 513 atuple = tuple(alist) 514 got = gc.get_referents(atuple) 515 got.sort() 516 self.assertEqual(got, alist) 517 518 adict = {1: 3, 5: 7} 519 expected = [1, 3, 5, 7] 520 got = gc.get_referents(adict) 521 got.sort() 522 self.assertEqual(got, expected) 523 524 got = gc.get_referents([1, 2], {3: 4}, (0, 0, 0)) 525 got.sort() 526 self.assertEqual(got, [0, 0] + list(range(5))) 527 528 self.assertEqual(gc.get_referents(1, 'a', 4j), []) 529 530 def test_is_tracked(self): 531 # Atomic built-in types are not tracked, user-defined objects and 532 # mutable containers are. 533 # NOTE: types with special optimizations (e.g. tuple) have tests 534 # in their own test files instead. 535 self.assertFalse(gc.is_tracked(None)) 536 self.assertFalse(gc.is_tracked(1)) 537 self.assertFalse(gc.is_tracked(1.0)) 538 self.assertFalse(gc.is_tracked(1.0 + 5.0j)) 539 self.assertFalse(gc.is_tracked(True)) 540 self.assertFalse(gc.is_tracked(False)) 541 self.assertFalse(gc.is_tracked(b"a")) 542 self.assertFalse(gc.is_tracked("a")) 543 self.assertFalse(gc.is_tracked(bytearray(b"a"))) 544 self.assertFalse(gc.is_tracked(type)) 545 self.assertFalse(gc.is_tracked(int)) 546 self.assertFalse(gc.is_tracked(object)) 547 self.assertFalse(gc.is_tracked(object())) 548 549 class UserClass: 550 pass 551 552 class UserInt(int): 553 pass 554 555 # Base class is object; no extra fields. 556 class UserClassSlots: 557 __slots__ = () 558 559 # Base class is fixed size larger than object; no extra fields. 560 class UserFloatSlots(float): 561 __slots__ = () 562 563 # Base class is variable size; no extra fields. 564 class UserIntSlots(int): 565 __slots__ = () 566 567 self.assertTrue(gc.is_tracked(gc)) 568 self.assertTrue(gc.is_tracked(UserClass)) 569 self.assertTrue(gc.is_tracked(UserClass())) 570 self.assertTrue(gc.is_tracked(UserInt())) 571 self.assertTrue(gc.is_tracked([])) 572 self.assertTrue(gc.is_tracked(set())) 573 self.assertFalse(gc.is_tracked(UserClassSlots())) 574 self.assertFalse(gc.is_tracked(UserFloatSlots())) 575 self.assertFalse(gc.is_tracked(UserIntSlots())) 576 577 def test_bug1055820b(self): 578 # Corresponds to temp2b.py in the bug report. 579 580 ouch = [] 581 def callback(ignored): 582 ouch[:] = [wr() for wr in WRs] 583 584 Cs = [C1055820(i) for i in range(2)] 585 WRs = [weakref.ref(c, callback) for c in Cs] 586 c = None 587 588 gc.collect() 589 self.assertEqual(len(ouch), 0) 590 # Make the two instances trash, and collect again. The bug was that 591 # the callback materialized a strong reference to an instance, but gc 592 # cleared the instance's dict anyway. 593 Cs = None 594 gc.collect() 595 self.assertEqual(len(ouch), 2) # else the callbacks didn't run 596 for x in ouch: 597 # If the callback resurrected one of these guys, the instance 598 # would be damaged, with an empty __dict__. 599 self.assertEqual(x, None) 600 601 def test_bug21435(self): 602 # This is a poor test - its only virtue is that it happened to 603 # segfault on Tim's Windows box before the patch for 21435 was 604 # applied. That's a nasty bug relying on specific pieces of cyclic 605 # trash appearing in exactly the right order in finalize_garbage()'s 606 # input list. 607 # But there's no reliable way to force that order from Python code, 608 # so over time chances are good this test won't really be testing much 609 # of anything anymore. Still, if it blows up, there's _some_ 610 # problem ;-) 611 gc.collect() 612 613 class A: 614 pass 615 616 class B: 617 def __init__(self, x): 618 self.x = x 619 620 def __del__(self): 621 self.attr = None 622 623 def do_work(): 624 a = A() 625 b = B(A()) 626 627 a.attr = b 628 b.attr = a 629 630 do_work() 631 gc.collect() # this blows up (bad C pointer) when it fails 632 633 @cpython_only 634 def test_garbage_at_shutdown(self): 635 import subprocess 636 code = """if 1: 637 import gc 638 import _testcapi 639 @_testcapi.with_tp_del 640 class X: 641 def __init__(self, name): 642 self.name = name 643 def __repr__(self): 644 return "<X %%r>" %% self.name 645 def __tp_del__(self): 646 pass 647 648 x = X('first') 649 x.x = x 650 x.y = X('second') 651 del x 652 gc.set_debug(%s) 653 """ 654 def run_command(code): 655 p = subprocess.Popen([sys.executable, "-Wd", "-c", code], 656 stdout=subprocess.PIPE, 657 stderr=subprocess.PIPE) 658 stdout, stderr = p.communicate() 659 p.stdout.close() 660 p.stderr.close() 661 self.assertEqual(p.returncode, 0) 662 self.assertEqual(stdout.strip(), b"") 663 return strip_python_stderr(stderr) 664 665 stderr = run_command(code % "0") 666 self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at " 667 b"shutdown; use", stderr) 668 self.assertNotIn(b"<X 'first'>", stderr) 669 # With DEBUG_UNCOLLECTABLE, the garbage list gets printed 670 stderr = run_command(code % "gc.DEBUG_UNCOLLECTABLE") 671 self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at " 672 b"shutdown", stderr) 673 self.assertTrue( 674 (b"[<X 'first'>, <X 'second'>]" in stderr) or 675 (b"[<X 'second'>, <X 'first'>]" in stderr), stderr) 676 # With DEBUG_SAVEALL, no additional message should get printed 677 # (because gc.garbage also contains normally reclaimable cyclic 678 # references, and its elements get printed at runtime anyway). 679 stderr = run_command(code % "gc.DEBUG_SAVEALL") 680 self.assertNotIn(b"uncollectable objects at shutdown", stderr) 681 682 @requires_type_collecting 683 def test_gc_main_module_at_shutdown(self): 684 # Create a reference cycle through the __main__ module and check 685 # it gets collected at interpreter shutdown. 686 code = """if 1: 687 class C: 688 def __del__(self): 689 print('__del__ called') 690 l = [C()] 691 l.append(l) 692 """ 693 rc, out, err = assert_python_ok('-c', code) 694 self.assertEqual(out.strip(), b'__del__ called') 695 696 @requires_type_collecting 697 def test_gc_ordinary_module_at_shutdown(self): 698 # Same as above, but with a non-__main__ module. 699 with temp_dir() as script_dir: 700 module = """if 1: 701 class C: 702 def __del__(self): 703 print('__del__ called') 704 l = [C()] 705 l.append(l) 706 """ 707 code = """if 1: 708 import sys 709 sys.path.insert(0, %r) 710 import gctest 711 """ % (script_dir,) 712 make_script(script_dir, 'gctest', module) 713 rc, out, err = assert_python_ok('-c', code) 714 self.assertEqual(out.strip(), b'__del__ called') 715 716 def test_get_stats(self): 717 stats = gc.get_stats() 718 self.assertEqual(len(stats), 3) 719 for st in stats: 720 self.assertIsInstance(st, dict) 721 self.assertEqual(set(st), 722 {"collected", "collections", "uncollectable"}) 723 self.assertGreaterEqual(st["collected"], 0) 724 self.assertGreaterEqual(st["collections"], 0) 725 self.assertGreaterEqual(st["uncollectable"], 0) 726 # Check that collection counts are incremented correctly 727 if gc.isenabled(): 728 self.addCleanup(gc.enable) 729 gc.disable() 730 old = gc.get_stats() 731 gc.collect(0) 732 new = gc.get_stats() 733 self.assertEqual(new[0]["collections"], old[0]["collections"] + 1) 734 self.assertEqual(new[1]["collections"], old[1]["collections"]) 735 self.assertEqual(new[2]["collections"], old[2]["collections"]) 736 gc.collect(2) 737 new = gc.get_stats() 738 self.assertEqual(new[0]["collections"], old[0]["collections"] + 1) 739 self.assertEqual(new[1]["collections"], old[1]["collections"]) 740 self.assertEqual(new[2]["collections"], old[2]["collections"] + 1) 741 742 743class GCCallbackTests(unittest.TestCase): 744 def setUp(self): 745 # Save gc state and disable it. 746 self.enabled = gc.isenabled() 747 gc.disable() 748 self.debug = gc.get_debug() 749 gc.set_debug(0) 750 gc.callbacks.append(self.cb1) 751 gc.callbacks.append(self.cb2) 752 self.othergarbage = [] 753 754 def tearDown(self): 755 # Restore gc state 756 del self.visit 757 gc.callbacks.remove(self.cb1) 758 gc.callbacks.remove(self.cb2) 759 gc.set_debug(self.debug) 760 if self.enabled: 761 gc.enable() 762 # destroy any uncollectables 763 gc.collect() 764 for obj in gc.garbage: 765 if isinstance(obj, Uncollectable): 766 obj.partner = None 767 del gc.garbage[:] 768 del self.othergarbage 769 gc.collect() 770 771 def preclean(self): 772 # Remove all fluff from the system. Invoke this function 773 # manually rather than through self.setUp() for maximum 774 # safety. 775 self.visit = [] 776 gc.collect() 777 garbage, gc.garbage[:] = gc.garbage[:], [] 778 self.othergarbage.append(garbage) 779 self.visit = [] 780 781 def cb1(self, phase, info): 782 self.visit.append((1, phase, dict(info))) 783 784 def cb2(self, phase, info): 785 self.visit.append((2, phase, dict(info))) 786 if phase == "stop" and hasattr(self, "cleanup"): 787 # Clean Uncollectable from garbage 788 uc = [e for e in gc.garbage if isinstance(e, Uncollectable)] 789 gc.garbage[:] = [e for e in gc.garbage 790 if not isinstance(e, Uncollectable)] 791 for e in uc: 792 e.partner = None 793 794 def test_collect(self): 795 self.preclean() 796 gc.collect() 797 # Algorithmically verify the contents of self.visit 798 # because it is long and tortuous. 799 800 # Count the number of visits to each callback 801 n = [v[0] for v in self.visit] 802 n1 = [i for i in n if i == 1] 803 n2 = [i for i in n if i == 2] 804 self.assertEqual(n1, [1]*2) 805 self.assertEqual(n2, [2]*2) 806 807 # Count that we got the right number of start and stop callbacks. 808 n = [v[1] for v in self.visit] 809 n1 = [i for i in n if i == "start"] 810 n2 = [i for i in n if i == "stop"] 811 self.assertEqual(n1, ["start"]*2) 812 self.assertEqual(n2, ["stop"]*2) 813 814 # Check that we got the right info dict for all callbacks 815 for v in self.visit: 816 info = v[2] 817 self.assertTrue("generation" in info) 818 self.assertTrue("collected" in info) 819 self.assertTrue("uncollectable" in info) 820 821 def test_collect_generation(self): 822 self.preclean() 823 gc.collect(2) 824 for v in self.visit: 825 info = v[2] 826 self.assertEqual(info["generation"], 2) 827 828 @cpython_only 829 def test_collect_garbage(self): 830 self.preclean() 831 # Each of these cause four objects to be garbage: Two 832 # Uncolectables and their instance dicts. 833 Uncollectable() 834 Uncollectable() 835 C1055820(666) 836 gc.collect() 837 for v in self.visit: 838 if v[1] != "stop": 839 continue 840 info = v[2] 841 self.assertEqual(info["collected"], 2) 842 self.assertEqual(info["uncollectable"], 8) 843 844 # We should now have the Uncollectables in gc.garbage 845 self.assertEqual(len(gc.garbage), 4) 846 for e in gc.garbage: 847 self.assertIsInstance(e, Uncollectable) 848 849 # Now, let our callback handle the Uncollectable instances 850 self.cleanup=True 851 self.visit = [] 852 gc.garbage[:] = [] 853 gc.collect() 854 for v in self.visit: 855 if v[1] != "stop": 856 continue 857 info = v[2] 858 self.assertEqual(info["collected"], 0) 859 self.assertEqual(info["uncollectable"], 4) 860 861 # Uncollectables should be gone 862 self.assertEqual(len(gc.garbage), 0) 863 864 865class GCTogglingTests(unittest.TestCase): 866 def setUp(self): 867 gc.enable() 868 869 def tearDown(self): 870 gc.disable() 871 872 def test_bug1055820c(self): 873 # Corresponds to temp2c.py in the bug report. This is pretty 874 # elaborate. 875 876 c0 = C1055820(0) 877 # Move c0 into generation 2. 878 gc.collect() 879 880 c1 = C1055820(1) 881 c1.keep_c0_alive = c0 882 del c0.loop # now only c1 keeps c0 alive 883 884 c2 = C1055820(2) 885 c2wr = weakref.ref(c2) # no callback! 886 887 ouch = [] 888 def callback(ignored): 889 ouch[:] = [c2wr()] 890 891 # The callback gets associated with a wr on an object in generation 2. 892 c0wr = weakref.ref(c0, callback) 893 894 c0 = c1 = c2 = None 895 896 # What we've set up: c0, c1, and c2 are all trash now. c0 is in 897 # generation 2. The only thing keeping it alive is that c1 points to 898 # it. c1 and c2 are in generation 0, and are in self-loops. There's a 899 # global weakref to c2 (c2wr), but that weakref has no callback. 900 # There's also a global weakref to c0 (c0wr), and that does have a 901 # callback, and that callback references c2 via c2wr(). 902 # 903 # c0 has a wr with callback, which references c2wr 904 # ^ 905 # | 906 # | Generation 2 above dots 907 #. . . . . . . .|. . . . . . . . . . . . . . . . . . . . . . . . 908 # | Generation 0 below dots 909 # | 910 # | 911 # ^->c1 ^->c2 has a wr but no callback 912 # | | | | 913 # <--v <--v 914 # 915 # So this is the nightmare: when generation 0 gets collected, we see 916 # that c2 has a callback-free weakref, and c1 doesn't even have a 917 # weakref. Collecting generation 0 doesn't see c0 at all, and c0 is 918 # the only object that has a weakref with a callback. gc clears c1 919 # and c2. Clearing c1 has the side effect of dropping the refcount on 920 # c0 to 0, so c0 goes away (despite that it's in an older generation) 921 # and c0's wr callback triggers. That in turn materializes a reference 922 # to c2 via c2wr(), but c2 gets cleared anyway by gc. 923 924 # We want to let gc happen "naturally", to preserve the distinction 925 # between generations. 926 junk = [] 927 i = 0 928 detector = GC_Detector() 929 while not detector.gc_happened: 930 i += 1 931 if i > 10000: 932 self.fail("gc didn't happen after 10000 iterations") 933 self.assertEqual(len(ouch), 0) 934 junk.append([]) # this will eventually trigger gc 935 936 self.assertEqual(len(ouch), 1) # else the callback wasn't invoked 937 for x in ouch: 938 # If the callback resurrected c2, the instance would be damaged, 939 # with an empty __dict__. 940 self.assertEqual(x, None) 941 942 def test_bug1055820d(self): 943 # Corresponds to temp2d.py in the bug report. This is very much like 944 # test_bug1055820c, but uses a __del__ method instead of a weakref 945 # callback to sneak in a resurrection of cyclic trash. 946 947 ouch = [] 948 class D(C1055820): 949 def __del__(self): 950 ouch[:] = [c2wr()] 951 952 d0 = D(0) 953 # Move all the above into generation 2. 954 gc.collect() 955 956 c1 = C1055820(1) 957 c1.keep_d0_alive = d0 958 del d0.loop # now only c1 keeps d0 alive 959 960 c2 = C1055820(2) 961 c2wr = weakref.ref(c2) # no callback! 962 963 d0 = c1 = c2 = None 964 965 # What we've set up: d0, c1, and c2 are all trash now. d0 is in 966 # generation 2. The only thing keeping it alive is that c1 points to 967 # it. c1 and c2 are in generation 0, and are in self-loops. There's 968 # a global weakref to c2 (c2wr), but that weakref has no callback. 969 # There are no other weakrefs. 970 # 971 # d0 has a __del__ method that references c2wr 972 # ^ 973 # | 974 # | Generation 2 above dots 975 #. . . . . . . .|. . . . . . . . . . . . . . . . . . . . . . . . 976 # | Generation 0 below dots 977 # | 978 # | 979 # ^->c1 ^->c2 has a wr but no callback 980 # | | | | 981 # <--v <--v 982 # 983 # So this is the nightmare: when generation 0 gets collected, we see 984 # that c2 has a callback-free weakref, and c1 doesn't even have a 985 # weakref. Collecting generation 0 doesn't see d0 at all. gc clears 986 # c1 and c2. Clearing c1 has the side effect of dropping the refcount 987 # on d0 to 0, so d0 goes away (despite that it's in an older 988 # generation) and d0's __del__ triggers. That in turn materializes 989 # a reference to c2 via c2wr(), but c2 gets cleared anyway by gc. 990 991 # We want to let gc happen "naturally", to preserve the distinction 992 # between generations. 993 detector = GC_Detector() 994 junk = [] 995 i = 0 996 while not detector.gc_happened: 997 i += 1 998 if i > 10000: 999 self.fail("gc didn't happen after 10000 iterations") 1000 self.assertEqual(len(ouch), 0) 1001 junk.append([]) # this will eventually trigger gc 1002 1003 self.assertEqual(len(ouch), 1) # else __del__ wasn't invoked 1004 for x in ouch: 1005 # If __del__ resurrected c2, the instance would be damaged, with an 1006 # empty __dict__. 1007 self.assertEqual(x, None) 1008 1009def test_main(): 1010 enabled = gc.isenabled() 1011 gc.disable() 1012 assert not gc.isenabled() 1013 debug = gc.get_debug() 1014 gc.set_debug(debug & ~gc.DEBUG_LEAK) # this test is supposed to leak 1015 1016 try: 1017 gc.collect() # Delete 2nd generation garbage 1018 run_unittest(GCTests, GCTogglingTests, GCCallbackTests) 1019 finally: 1020 gc.set_debug(debug) 1021 # test gc.enable() even if GC is disabled by default 1022 if verbose: 1023 print("restoring automatic collection") 1024 # make sure to always test gc.enable() 1025 gc.enable() 1026 assert gc.isenabled() 1027 if not enabled: 1028 gc.disable() 1029 1030if __name__ == "__main__": 1031 test_main() 1032