1import unittest
2from test.support import (verbose, refcount_test, run_unittest,
3                          strip_python_stderr, cpython_only, start_threads,
4                          temp_dir, requires_type_collecting)
5from test.support.script_helper import assert_python_ok, make_script
6
7import sys
8import time
9import gc
10import weakref
11
12try:
13    import threading
14except ImportError:
15    threading = None
16
17try:
18    from _testcapi import with_tp_del
19except ImportError:
20    def with_tp_del(cls):
21        class C(object):
22            def __new__(cls, *args, **kwargs):
23                raise TypeError('requires _testcapi.with_tp_del')
24        return C
25
26### Support code
27###############################################################################
28
29# Bug 1055820 has several tests of longstanding bugs involving weakrefs and
30# cyclic gc.
31
32# An instance of C1055820 has a self-loop, so becomes cyclic trash when
33# unreachable.
34class C1055820(object):
35    def __init__(self, i):
36        self.i = i
37        self.loop = self
38
39class GC_Detector(object):
40    # Create an instance I.  Then gc hasn't happened again so long as
41    # I.gc_happened is false.
42
43    def __init__(self):
44        self.gc_happened = False
45
46        def it_happened(ignored):
47            self.gc_happened = True
48
49        # Create a piece of cyclic trash that triggers it_happened when
50        # gc collects it.
51        self.wr = weakref.ref(C1055820(666), it_happened)
52
53@with_tp_del
54class Uncollectable(object):
55    """Create a reference cycle with multiple __del__ methods.
56
57    An object in a reference cycle will never have zero references,
58    and so must be garbage collected.  If one or more objects in the
59    cycle have __del__ methods, the gc refuses to guess an order,
60    and leaves the cycle uncollected."""
61    def __init__(self, partner=None):
62        if partner is None:
63            self.partner = Uncollectable(partner=self)
64        else:
65            self.partner = partner
66    def __tp_del__(self):
67        pass
68
69### Tests
70###############################################################################
71
72class GCTests(unittest.TestCase):
73    def test_list(self):
74        l = []
75        l.append(l)
76        gc.collect()
77        del l
78        self.assertEqual(gc.collect(), 1)
79
80    def test_dict(self):
81        d = {}
82        d[1] = d
83        gc.collect()
84        del d
85        self.assertEqual(gc.collect(), 1)
86
87    def test_tuple(self):
88        # since tuples are immutable we close the loop with a list
89        l = []
90        t = (l,)
91        l.append(t)
92        gc.collect()
93        del t
94        del l
95        self.assertEqual(gc.collect(), 2)
96
97    def test_class(self):
98        class A:
99            pass
100        A.a = A
101        gc.collect()
102        del A
103        self.assertNotEqual(gc.collect(), 0)
104
105    def test_newstyleclass(self):
106        class A(object):
107            pass
108        gc.collect()
109        del A
110        self.assertNotEqual(gc.collect(), 0)
111
112    def test_instance(self):
113        class A:
114            pass
115        a = A()
116        a.a = a
117        gc.collect()
118        del a
119        self.assertNotEqual(gc.collect(), 0)
120
121    @requires_type_collecting
122    def test_newinstance(self):
123        class A(object):
124            pass
125        a = A()
126        a.a = a
127        gc.collect()
128        del a
129        self.assertNotEqual(gc.collect(), 0)
130        class B(list):
131            pass
132        class C(B, A):
133            pass
134        a = C()
135        a.a = a
136        gc.collect()
137        del a
138        self.assertNotEqual(gc.collect(), 0)
139        del B, C
140        self.assertNotEqual(gc.collect(), 0)
141        A.a = A()
142        del A
143        self.assertNotEqual(gc.collect(), 0)
144        self.assertEqual(gc.collect(), 0)
145
146    def test_method(self):
147        # Tricky: self.__init__ is a bound method, it references the instance.
148        class A:
149            def __init__(self):
150                self.init = self.__init__
151        a = A()
152        gc.collect()
153        del a
154        self.assertNotEqual(gc.collect(), 0)
155
156    @cpython_only
157    def test_legacy_finalizer(self):
158        # A() is uncollectable if it is part of a cycle, make sure it shows up
159        # in gc.garbage.
160        @with_tp_del
161        class A:
162            def __tp_del__(self): pass
163        class B:
164            pass
165        a = A()
166        a.a = a
167        id_a = id(a)
168        b = B()
169        b.b = b
170        gc.collect()
171        del a
172        del b
173        self.assertNotEqual(gc.collect(), 0)
174        for obj in gc.garbage:
175            if id(obj) == id_a:
176                del obj.a
177                break
178        else:
179            self.fail("didn't find obj in garbage (finalizer)")
180        gc.garbage.remove(obj)
181
182    @cpython_only
183    def test_legacy_finalizer_newclass(self):
184        # A() is uncollectable if it is part of a cycle, make sure it shows up
185        # in gc.garbage.
186        @with_tp_del
187        class A(object):
188            def __tp_del__(self): pass
189        class B(object):
190            pass
191        a = A()
192        a.a = a
193        id_a = id(a)
194        b = B()
195        b.b = b
196        gc.collect()
197        del a
198        del b
199        self.assertNotEqual(gc.collect(), 0)
200        for obj in gc.garbage:
201            if id(obj) == id_a:
202                del obj.a
203                break
204        else:
205            self.fail("didn't find obj in garbage (finalizer)")
206        gc.garbage.remove(obj)
207
208    def test_function(self):
209        # Tricky: f -> d -> f, code should call d.clear() after the exec to
210        # break the cycle.
211        d = {}
212        exec("def f(): pass\n", d)
213        gc.collect()
214        del d
215        self.assertEqual(gc.collect(), 2)
216
217    @refcount_test
218    def test_frame(self):
219        def f():
220            frame = sys._getframe()
221        gc.collect()
222        f()
223        self.assertEqual(gc.collect(), 1)
224
225    def test_saveall(self):
226        # Verify that cyclic garbage like lists show up in gc.garbage if the
227        # SAVEALL option is enabled.
228
229        # First make sure we don't save away other stuff that just happens to
230        # be waiting for collection.
231        gc.collect()
232        # if this fails, someone else created immortal trash
233        self.assertEqual(gc.garbage, [])
234
235        L = []
236        L.append(L)
237        id_L = id(L)
238
239        debug = gc.get_debug()
240        gc.set_debug(debug | gc.DEBUG_SAVEALL)
241        del L
242        gc.collect()
243        gc.set_debug(debug)
244
245        self.assertEqual(len(gc.garbage), 1)
246        obj = gc.garbage.pop()
247        self.assertEqual(id(obj), id_L)
248
249    def test_del(self):
250        # __del__ methods can trigger collection, make this to happen
251        thresholds = gc.get_threshold()
252        gc.enable()
253        gc.set_threshold(1)
254
255        class A:
256            def __del__(self):
257                dir(self)
258        a = A()
259        del a
260
261        gc.disable()
262        gc.set_threshold(*thresholds)
263
264    def test_del_newclass(self):
265        # __del__ methods can trigger collection, make this to happen
266        thresholds = gc.get_threshold()
267        gc.enable()
268        gc.set_threshold(1)
269
270        class A(object):
271            def __del__(self):
272                dir(self)
273        a = A()
274        del a
275
276        gc.disable()
277        gc.set_threshold(*thresholds)
278
279    # The following two tests are fragile:
280    # They precisely count the number of allocations,
281    # which is highly implementation-dependent.
282    # For example, disposed tuples are not freed, but reused.
283    # To minimize variations, though, we first store the get_count() results
284    # and check them at the end.
285    @refcount_test
286    def test_get_count(self):
287        gc.collect()
288        a, b, c = gc.get_count()
289        x = []
290        d, e, f = gc.get_count()
291        self.assertEqual((b, c), (0, 0))
292        self.assertEqual((e, f), (0, 0))
293        # This is less fragile than asserting that a equals 0.
294        self.assertLess(a, 5)
295        # Between the two calls to get_count(), at least one object was
296        # created (the list).
297        self.assertGreater(d, a)
298
299    @refcount_test
300    def test_collect_generations(self):
301        gc.collect()
302        # This object will "trickle" into generation N + 1 after
303        # each call to collect(N)
304        x = []
305        gc.collect(0)
306        # x is now in gen 1
307        a, b, c = gc.get_count()
308        gc.collect(1)
309        # x is now in gen 2
310        d, e, f = gc.get_count()
311        gc.collect(2)
312        # x is now in gen 3
313        g, h, i = gc.get_count()
314        # We don't check a, d, g since their exact values depends on
315        # internal implementation details of the interpreter.
316        self.assertEqual((b, c), (1, 0))
317        self.assertEqual((e, f), (0, 1))
318        self.assertEqual((h, i), (0, 0))
319
320    def test_trashcan(self):
321        class Ouch:
322            n = 0
323            def __del__(self):
324                Ouch.n = Ouch.n + 1
325                if Ouch.n % 17 == 0:
326                    gc.collect()
327
328        # "trashcan" is a hack to prevent stack overflow when deallocating
329        # very deeply nested tuples etc.  It works in part by abusing the
330        # type pointer and refcount fields, and that can yield horrible
331        # problems when gc tries to traverse the structures.
332        # If this test fails (as it does in 2.0, 2.1 and 2.2), it will
333        # most likely die via segfault.
334
335        # Note:  In 2.3 the possibility for compiling without cyclic gc was
336        # removed, and that in turn allows the trashcan mechanism to work
337        # via much simpler means (e.g., it never abuses the type pointer or
338        # refcount fields anymore).  Since it's much less likely to cause a
339        # problem now, the various constants in this expensive (we force a lot
340        # of full collections) test are cut back from the 2.2 version.
341        gc.enable()
342        N = 150
343        for count in range(2):
344            t = []
345            for i in range(N):
346                t = [t, Ouch()]
347            u = []
348            for i in range(N):
349                u = [u, Ouch()]
350            v = {}
351            for i in range(N):
352                v = {1: v, 2: Ouch()}
353        gc.disable()
354
355    @unittest.skipUnless(threading, "test meaningless on builds without threads")
356    def test_trashcan_threads(self):
357        # Issue #13992: trashcan mechanism should be thread-safe
358        NESTING = 60
359        N_THREADS = 2
360
361        def sleeper_gen():
362            """A generator that releases the GIL when closed or dealloc'ed."""
363            try:
364                yield
365            finally:
366                time.sleep(0.000001)
367
368        class C(list):
369            # Appending to a list is atomic, which avoids the use of a lock.
370            inits = []
371            dels = []
372            def __init__(self, alist):
373                self[:] = alist
374                C.inits.append(None)
375            def __del__(self):
376                # This __del__ is called by subtype_dealloc().
377                C.dels.append(None)
378                # `g` will release the GIL when garbage-collected.  This
379                # helps assert subtype_dealloc's behaviour when threads
380                # switch in the middle of it.
381                g = sleeper_gen()
382                next(g)
383                # Now that __del__ is finished, subtype_dealloc will proceed
384                # to call list_dealloc, which also uses the trashcan mechanism.
385
386        def make_nested():
387            """Create a sufficiently nested container object so that the
388            trashcan mechanism is invoked when deallocating it."""
389            x = C([])
390            for i in range(NESTING):
391                x = [C([x])]
392            del x
393
394        def run_thread():
395            """Exercise make_nested() in a loop."""
396            while not exit:
397                make_nested()
398
399        old_switchinterval = sys.getswitchinterval()
400        sys.setswitchinterval(1e-5)
401        try:
402            exit = []
403            threads = []
404            for i in range(N_THREADS):
405                t = threading.Thread(target=run_thread)
406                threads.append(t)
407            with start_threads(threads, lambda: exit.append(1)):
408                time.sleep(1.0)
409        finally:
410            sys.setswitchinterval(old_switchinterval)
411        gc.collect()
412        self.assertEqual(len(C.inits), len(C.dels))
413
414    def test_boom(self):
415        class Boom:
416            def __getattr__(self, someattribute):
417                del self.attr
418                raise AttributeError
419
420        a = Boom()
421        b = Boom()
422        a.attr = b
423        b.attr = a
424
425        gc.collect()
426        garbagelen = len(gc.garbage)
427        del a, b
428        # a<->b are in a trash cycle now.  Collection will invoke
429        # Boom.__getattr__ (to see whether a and b have __del__ methods), and
430        # __getattr__ deletes the internal "attr" attributes as a side effect.
431        # That causes the trash cycle to get reclaimed via refcounts falling to
432        # 0, thus mutating the trash graph as a side effect of merely asking
433        # whether __del__ exists.  This used to (before 2.3b1) crash Python.
434        # Now __getattr__ isn't called.
435        self.assertEqual(gc.collect(), 4)
436        self.assertEqual(len(gc.garbage), garbagelen)
437
438    def test_boom2(self):
439        class Boom2:
440            def __init__(self):
441                self.x = 0
442
443            def __getattr__(self, someattribute):
444                self.x += 1
445                if self.x > 1:
446                    del self.attr
447                raise AttributeError
448
449        a = Boom2()
450        b = Boom2()
451        a.attr = b
452        b.attr = a
453
454        gc.collect()
455        garbagelen = len(gc.garbage)
456        del a, b
457        # Much like test_boom(), except that __getattr__ doesn't break the
458        # cycle until the second time gc checks for __del__.  As of 2.3b1,
459        # there isn't a second time, so this simply cleans up the trash cycle.
460        # We expect a, b, a.__dict__ and b.__dict__ (4 objects) to get
461        # reclaimed this way.
462        self.assertEqual(gc.collect(), 4)
463        self.assertEqual(len(gc.garbage), garbagelen)
464
465    def test_boom_new(self):
466        # boom__new and boom2_new are exactly like boom and boom2, except use
467        # new-style classes.
468
469        class Boom_New(object):
470            def __getattr__(self, someattribute):
471                del self.attr
472                raise AttributeError
473
474        a = Boom_New()
475        b = Boom_New()
476        a.attr = b
477        b.attr = a
478
479        gc.collect()
480        garbagelen = len(gc.garbage)
481        del a, b
482        self.assertEqual(gc.collect(), 4)
483        self.assertEqual(len(gc.garbage), garbagelen)
484
485    def test_boom2_new(self):
486        class Boom2_New(object):
487            def __init__(self):
488                self.x = 0
489
490            def __getattr__(self, someattribute):
491                self.x += 1
492                if self.x > 1:
493                    del self.attr
494                raise AttributeError
495
496        a = Boom2_New()
497        b = Boom2_New()
498        a.attr = b
499        b.attr = a
500
501        gc.collect()
502        garbagelen = len(gc.garbage)
503        del a, b
504        self.assertEqual(gc.collect(), 4)
505        self.assertEqual(len(gc.garbage), garbagelen)
506
507    def test_get_referents(self):
508        alist = [1, 3, 5]
509        got = gc.get_referents(alist)
510        got.sort()
511        self.assertEqual(got, alist)
512
513        atuple = tuple(alist)
514        got = gc.get_referents(atuple)
515        got.sort()
516        self.assertEqual(got, alist)
517
518        adict = {1: 3, 5: 7}
519        expected = [1, 3, 5, 7]
520        got = gc.get_referents(adict)
521        got.sort()
522        self.assertEqual(got, expected)
523
524        got = gc.get_referents([1, 2], {3: 4}, (0, 0, 0))
525        got.sort()
526        self.assertEqual(got, [0, 0] + list(range(5)))
527
528        self.assertEqual(gc.get_referents(1, 'a', 4j), [])
529
530    def test_is_tracked(self):
531        # Atomic built-in types are not tracked, user-defined objects and
532        # mutable containers are.
533        # NOTE: types with special optimizations (e.g. tuple) have tests
534        # in their own test files instead.
535        self.assertFalse(gc.is_tracked(None))
536        self.assertFalse(gc.is_tracked(1))
537        self.assertFalse(gc.is_tracked(1.0))
538        self.assertFalse(gc.is_tracked(1.0 + 5.0j))
539        self.assertFalse(gc.is_tracked(True))
540        self.assertFalse(gc.is_tracked(False))
541        self.assertFalse(gc.is_tracked(b"a"))
542        self.assertFalse(gc.is_tracked("a"))
543        self.assertFalse(gc.is_tracked(bytearray(b"a")))
544        self.assertFalse(gc.is_tracked(type))
545        self.assertFalse(gc.is_tracked(int))
546        self.assertFalse(gc.is_tracked(object))
547        self.assertFalse(gc.is_tracked(object()))
548
549        class UserClass:
550            pass
551
552        class UserInt(int):
553            pass
554
555        # Base class is object; no extra fields.
556        class UserClassSlots:
557            __slots__ = ()
558
559        # Base class is fixed size larger than object; no extra fields.
560        class UserFloatSlots(float):
561            __slots__ = ()
562
563        # Base class is variable size; no extra fields.
564        class UserIntSlots(int):
565            __slots__ = ()
566
567        self.assertTrue(gc.is_tracked(gc))
568        self.assertTrue(gc.is_tracked(UserClass))
569        self.assertTrue(gc.is_tracked(UserClass()))
570        self.assertTrue(gc.is_tracked(UserInt()))
571        self.assertTrue(gc.is_tracked([]))
572        self.assertTrue(gc.is_tracked(set()))
573        self.assertFalse(gc.is_tracked(UserClassSlots()))
574        self.assertFalse(gc.is_tracked(UserFloatSlots()))
575        self.assertFalse(gc.is_tracked(UserIntSlots()))
576
577    def test_bug1055820b(self):
578        # Corresponds to temp2b.py in the bug report.
579
580        ouch = []
581        def callback(ignored):
582            ouch[:] = [wr() for wr in WRs]
583
584        Cs = [C1055820(i) for i in range(2)]
585        WRs = [weakref.ref(c, callback) for c in Cs]
586        c = None
587
588        gc.collect()
589        self.assertEqual(len(ouch), 0)
590        # Make the two instances trash, and collect again.  The bug was that
591        # the callback materialized a strong reference to an instance, but gc
592        # cleared the instance's dict anyway.
593        Cs = None
594        gc.collect()
595        self.assertEqual(len(ouch), 2)  # else the callbacks didn't run
596        for x in ouch:
597            # If the callback resurrected one of these guys, the instance
598            # would be damaged, with an empty __dict__.
599            self.assertEqual(x, None)
600
601    def test_bug21435(self):
602        # This is a poor test - its only virtue is that it happened to
603        # segfault on Tim's Windows box before the patch for 21435 was
604        # applied.  That's a nasty bug relying on specific pieces of cyclic
605        # trash appearing in exactly the right order in finalize_garbage()'s
606        # input list.
607        # But there's no reliable way to force that order from Python code,
608        # so over time chances are good this test won't really be testing much
609        # of anything anymore.  Still, if it blows up, there's _some_
610        # problem ;-)
611        gc.collect()
612
613        class A:
614            pass
615
616        class B:
617            def __init__(self, x):
618                self.x = x
619
620            def __del__(self):
621                self.attr = None
622
623        def do_work():
624            a = A()
625            b = B(A())
626
627            a.attr = b
628            b.attr = a
629
630        do_work()
631        gc.collect() # this blows up (bad C pointer) when it fails
632
633    @cpython_only
634    def test_garbage_at_shutdown(self):
635        import subprocess
636        code = """if 1:
637            import gc
638            import _testcapi
639            @_testcapi.with_tp_del
640            class X:
641                def __init__(self, name):
642                    self.name = name
643                def __repr__(self):
644                    return "<X %%r>" %% self.name
645                def __tp_del__(self):
646                    pass
647
648            x = X('first')
649            x.x = x
650            x.y = X('second')
651            del x
652            gc.set_debug(%s)
653        """
654        def run_command(code):
655            p = subprocess.Popen([sys.executable, "-Wd", "-c", code],
656                stdout=subprocess.PIPE,
657                stderr=subprocess.PIPE)
658            stdout, stderr = p.communicate()
659            p.stdout.close()
660            p.stderr.close()
661            self.assertEqual(p.returncode, 0)
662            self.assertEqual(stdout.strip(), b"")
663            return strip_python_stderr(stderr)
664
665        stderr = run_command(code % "0")
666        self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at "
667                      b"shutdown; use", stderr)
668        self.assertNotIn(b"<X 'first'>", stderr)
669        # With DEBUG_UNCOLLECTABLE, the garbage list gets printed
670        stderr = run_command(code % "gc.DEBUG_UNCOLLECTABLE")
671        self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at "
672                      b"shutdown", stderr)
673        self.assertTrue(
674            (b"[<X 'first'>, <X 'second'>]" in stderr) or
675            (b"[<X 'second'>, <X 'first'>]" in stderr), stderr)
676        # With DEBUG_SAVEALL, no additional message should get printed
677        # (because gc.garbage also contains normally reclaimable cyclic
678        # references, and its elements get printed at runtime anyway).
679        stderr = run_command(code % "gc.DEBUG_SAVEALL")
680        self.assertNotIn(b"uncollectable objects at shutdown", stderr)
681
682    @requires_type_collecting
683    def test_gc_main_module_at_shutdown(self):
684        # Create a reference cycle through the __main__ module and check
685        # it gets collected at interpreter shutdown.
686        code = """if 1:
687            class C:
688                def __del__(self):
689                    print('__del__ called')
690            l = [C()]
691            l.append(l)
692            """
693        rc, out, err = assert_python_ok('-c', code)
694        self.assertEqual(out.strip(), b'__del__ called')
695
696    @requires_type_collecting
697    def test_gc_ordinary_module_at_shutdown(self):
698        # Same as above, but with a non-__main__ module.
699        with temp_dir() as script_dir:
700            module = """if 1:
701                class C:
702                    def __del__(self):
703                        print('__del__ called')
704                l = [C()]
705                l.append(l)
706                """
707            code = """if 1:
708                import sys
709                sys.path.insert(0, %r)
710                import gctest
711                """ % (script_dir,)
712            make_script(script_dir, 'gctest', module)
713            rc, out, err = assert_python_ok('-c', code)
714            self.assertEqual(out.strip(), b'__del__ called')
715
716    def test_get_stats(self):
717        stats = gc.get_stats()
718        self.assertEqual(len(stats), 3)
719        for st in stats:
720            self.assertIsInstance(st, dict)
721            self.assertEqual(set(st),
722                             {"collected", "collections", "uncollectable"})
723            self.assertGreaterEqual(st["collected"], 0)
724            self.assertGreaterEqual(st["collections"], 0)
725            self.assertGreaterEqual(st["uncollectable"], 0)
726        # Check that collection counts are incremented correctly
727        if gc.isenabled():
728            self.addCleanup(gc.enable)
729            gc.disable()
730        old = gc.get_stats()
731        gc.collect(0)
732        new = gc.get_stats()
733        self.assertEqual(new[0]["collections"], old[0]["collections"] + 1)
734        self.assertEqual(new[1]["collections"], old[1]["collections"])
735        self.assertEqual(new[2]["collections"], old[2]["collections"])
736        gc.collect(2)
737        new = gc.get_stats()
738        self.assertEqual(new[0]["collections"], old[0]["collections"] + 1)
739        self.assertEqual(new[1]["collections"], old[1]["collections"])
740        self.assertEqual(new[2]["collections"], old[2]["collections"] + 1)
741
742
743class GCCallbackTests(unittest.TestCase):
744    def setUp(self):
745        # Save gc state and disable it.
746        self.enabled = gc.isenabled()
747        gc.disable()
748        self.debug = gc.get_debug()
749        gc.set_debug(0)
750        gc.callbacks.append(self.cb1)
751        gc.callbacks.append(self.cb2)
752        self.othergarbage = []
753
754    def tearDown(self):
755        # Restore gc state
756        del self.visit
757        gc.callbacks.remove(self.cb1)
758        gc.callbacks.remove(self.cb2)
759        gc.set_debug(self.debug)
760        if self.enabled:
761            gc.enable()
762        # destroy any uncollectables
763        gc.collect()
764        for obj in gc.garbage:
765            if isinstance(obj, Uncollectable):
766                obj.partner = None
767        del gc.garbage[:]
768        del self.othergarbage
769        gc.collect()
770
771    def preclean(self):
772        # Remove all fluff from the system.  Invoke this function
773        # manually rather than through self.setUp() for maximum
774        # safety.
775        self.visit = []
776        gc.collect()
777        garbage, gc.garbage[:] = gc.garbage[:], []
778        self.othergarbage.append(garbage)
779        self.visit = []
780
781    def cb1(self, phase, info):
782        self.visit.append((1, phase, dict(info)))
783
784    def cb2(self, phase, info):
785        self.visit.append((2, phase, dict(info)))
786        if phase == "stop" and hasattr(self, "cleanup"):
787            # Clean Uncollectable from garbage
788            uc = [e for e in gc.garbage if isinstance(e, Uncollectable)]
789            gc.garbage[:] = [e for e in gc.garbage
790                             if not isinstance(e, Uncollectable)]
791            for e in uc:
792                e.partner = None
793
794    def test_collect(self):
795        self.preclean()
796        gc.collect()
797        # Algorithmically verify the contents of self.visit
798        # because it is long and tortuous.
799
800        # Count the number of visits to each callback
801        n = [v[0] for v in self.visit]
802        n1 = [i for i in n if i == 1]
803        n2 = [i for i in n if i == 2]
804        self.assertEqual(n1, [1]*2)
805        self.assertEqual(n2, [2]*2)
806
807        # Count that we got the right number of start and stop callbacks.
808        n = [v[1] for v in self.visit]
809        n1 = [i for i in n if i == "start"]
810        n2 = [i for i in n if i == "stop"]
811        self.assertEqual(n1, ["start"]*2)
812        self.assertEqual(n2, ["stop"]*2)
813
814        # Check that we got the right info dict for all callbacks
815        for v in self.visit:
816            info = v[2]
817            self.assertTrue("generation" in info)
818            self.assertTrue("collected" in info)
819            self.assertTrue("uncollectable" in info)
820
821    def test_collect_generation(self):
822        self.preclean()
823        gc.collect(2)
824        for v in self.visit:
825            info = v[2]
826            self.assertEqual(info["generation"], 2)
827
828    @cpython_only
829    def test_collect_garbage(self):
830        self.preclean()
831        # Each of these cause four objects to be garbage: Two
832        # Uncolectables and their instance dicts.
833        Uncollectable()
834        Uncollectable()
835        C1055820(666)
836        gc.collect()
837        for v in self.visit:
838            if v[1] != "stop":
839                continue
840            info = v[2]
841            self.assertEqual(info["collected"], 2)
842            self.assertEqual(info["uncollectable"], 8)
843
844        # We should now have the Uncollectables in gc.garbage
845        self.assertEqual(len(gc.garbage), 4)
846        for e in gc.garbage:
847            self.assertIsInstance(e, Uncollectable)
848
849        # Now, let our callback handle the Uncollectable instances
850        self.cleanup=True
851        self.visit = []
852        gc.garbage[:] = []
853        gc.collect()
854        for v in self.visit:
855            if v[1] != "stop":
856                continue
857            info = v[2]
858            self.assertEqual(info["collected"], 0)
859            self.assertEqual(info["uncollectable"], 4)
860
861        # Uncollectables should be gone
862        self.assertEqual(len(gc.garbage), 0)
863
864
865class GCTogglingTests(unittest.TestCase):
866    def setUp(self):
867        gc.enable()
868
869    def tearDown(self):
870        gc.disable()
871
872    def test_bug1055820c(self):
873        # Corresponds to temp2c.py in the bug report.  This is pretty
874        # elaborate.
875
876        c0 = C1055820(0)
877        # Move c0 into generation 2.
878        gc.collect()
879
880        c1 = C1055820(1)
881        c1.keep_c0_alive = c0
882        del c0.loop # now only c1 keeps c0 alive
883
884        c2 = C1055820(2)
885        c2wr = weakref.ref(c2) # no callback!
886
887        ouch = []
888        def callback(ignored):
889            ouch[:] = [c2wr()]
890
891        # The callback gets associated with a wr on an object in generation 2.
892        c0wr = weakref.ref(c0, callback)
893
894        c0 = c1 = c2 = None
895
896        # What we've set up:  c0, c1, and c2 are all trash now.  c0 is in
897        # generation 2.  The only thing keeping it alive is that c1 points to
898        # it. c1 and c2 are in generation 0, and are in self-loops.  There's a
899        # global weakref to c2 (c2wr), but that weakref has no callback.
900        # There's also a global weakref to c0 (c0wr), and that does have a
901        # callback, and that callback references c2 via c2wr().
902        #
903        #               c0 has a wr with callback, which references c2wr
904        #               ^
905        #               |
906        #               |     Generation 2 above dots
907        #. . . . . . . .|. . . . . . . . . . . . . . . . . . . . . . . .
908        #               |     Generation 0 below dots
909        #               |
910        #               |
911        #            ^->c1   ^->c2 has a wr but no callback
912        #            |  |    |  |
913        #            <--v    <--v
914        #
915        # So this is the nightmare:  when generation 0 gets collected, we see
916        # that c2 has a callback-free weakref, and c1 doesn't even have a
917        # weakref.  Collecting generation 0 doesn't see c0 at all, and c0 is
918        # the only object that has a weakref with a callback.  gc clears c1
919        # and c2.  Clearing c1 has the side effect of dropping the refcount on
920        # c0 to 0, so c0 goes away (despite that it's in an older generation)
921        # and c0's wr callback triggers.  That in turn materializes a reference
922        # to c2 via c2wr(), but c2 gets cleared anyway by gc.
923
924        # We want to let gc happen "naturally", to preserve the distinction
925        # between generations.
926        junk = []
927        i = 0
928        detector = GC_Detector()
929        while not detector.gc_happened:
930            i += 1
931            if i > 10000:
932                self.fail("gc didn't happen after 10000 iterations")
933            self.assertEqual(len(ouch), 0)
934            junk.append([])  # this will eventually trigger gc
935
936        self.assertEqual(len(ouch), 1)  # else the callback wasn't invoked
937        for x in ouch:
938            # If the callback resurrected c2, the instance would be damaged,
939            # with an empty __dict__.
940            self.assertEqual(x, None)
941
942    def test_bug1055820d(self):
943        # Corresponds to temp2d.py in the bug report.  This is very much like
944        # test_bug1055820c, but uses a __del__ method instead of a weakref
945        # callback to sneak in a resurrection of cyclic trash.
946
947        ouch = []
948        class D(C1055820):
949            def __del__(self):
950                ouch[:] = [c2wr()]
951
952        d0 = D(0)
953        # Move all the above into generation 2.
954        gc.collect()
955
956        c1 = C1055820(1)
957        c1.keep_d0_alive = d0
958        del d0.loop # now only c1 keeps d0 alive
959
960        c2 = C1055820(2)
961        c2wr = weakref.ref(c2) # no callback!
962
963        d0 = c1 = c2 = None
964
965        # What we've set up:  d0, c1, and c2 are all trash now.  d0 is in
966        # generation 2.  The only thing keeping it alive is that c1 points to
967        # it.  c1 and c2 are in generation 0, and are in self-loops.  There's
968        # a global weakref to c2 (c2wr), but that weakref has no callback.
969        # There are no other weakrefs.
970        #
971        #               d0 has a __del__ method that references c2wr
972        #               ^
973        #               |
974        #               |     Generation 2 above dots
975        #. . . . . . . .|. . . . . . . . . . . . . . . . . . . . . . . .
976        #               |     Generation 0 below dots
977        #               |
978        #               |
979        #            ^->c1   ^->c2 has a wr but no callback
980        #            |  |    |  |
981        #            <--v    <--v
982        #
983        # So this is the nightmare:  when generation 0 gets collected, we see
984        # that c2 has a callback-free weakref, and c1 doesn't even have a
985        # weakref.  Collecting generation 0 doesn't see d0 at all.  gc clears
986        # c1 and c2.  Clearing c1 has the side effect of dropping the refcount
987        # on d0 to 0, so d0 goes away (despite that it's in an older
988        # generation) and d0's __del__ triggers.  That in turn materializes
989        # a reference to c2 via c2wr(), but c2 gets cleared anyway by gc.
990
991        # We want to let gc happen "naturally", to preserve the distinction
992        # between generations.
993        detector = GC_Detector()
994        junk = []
995        i = 0
996        while not detector.gc_happened:
997            i += 1
998            if i > 10000:
999                self.fail("gc didn't happen after 10000 iterations")
1000            self.assertEqual(len(ouch), 0)
1001            junk.append([])  # this will eventually trigger gc
1002
1003        self.assertEqual(len(ouch), 1)  # else __del__ wasn't invoked
1004        for x in ouch:
1005            # If __del__ resurrected c2, the instance would be damaged, with an
1006            # empty __dict__.
1007            self.assertEqual(x, None)
1008
1009def test_main():
1010    enabled = gc.isenabled()
1011    gc.disable()
1012    assert not gc.isenabled()
1013    debug = gc.get_debug()
1014    gc.set_debug(debug & ~gc.DEBUG_LEAK) # this test is supposed to leak
1015
1016    try:
1017        gc.collect() # Delete 2nd generation garbage
1018        run_unittest(GCTests, GCTogglingTests, GCCallbackTests)
1019    finally:
1020        gc.set_debug(debug)
1021        # test gc.enable() even if GC is disabled by default
1022        if verbose:
1023            print("restoring automatic collection")
1024        # make sure to always test gc.enable()
1025        gc.enable()
1026        assert gc.isenabled()
1027        if not enabled:
1028            gc.disable()
1029
1030if __name__ == "__main__":
1031    test_main()
1032