1import gc
2import sys
3import unittest
4import collections
5import weakref
6import operator
7import contextlib
8import copy
9import time
10
11from test import support
12from test.support import script_helper
13
14# Used in ReferencesTestCase.test_ref_created_during_del() .
15ref_from_del = None
16
17# Used by FinalizeTestCase as a global that may be replaced by None
18# when the interpreter shuts down.
19_global_var = 'foobar'
20
21class C:
22    def method(self):
23        pass
24
25
26class Callable:
27    bar = None
28
29    def __call__(self, x):
30        self.bar = x
31
32
33def create_function():
34    def f(): pass
35    return f
36
37def create_bound_method():
38    return C().method
39
40
41class Object:
42    def __init__(self, arg):
43        self.arg = arg
44    def __repr__(self):
45        return "<Object %r>" % self.arg
46    def __eq__(self, other):
47        if isinstance(other, Object):
48            return self.arg == other.arg
49        return NotImplemented
50    def __lt__(self, other):
51        if isinstance(other, Object):
52            return self.arg < other.arg
53        return NotImplemented
54    def __hash__(self):
55        return hash(self.arg)
56    def some_method(self):
57        return 4
58    def other_method(self):
59        return 5
60
61
62class RefCycle:
63    def __init__(self):
64        self.cycle = self
65
66
67class TestBase(unittest.TestCase):
68
69    def setUp(self):
70        self.cbcalled = 0
71
72    def callback(self, ref):
73        self.cbcalled += 1
74
75
76@contextlib.contextmanager
77def collect_in_thread(period=0.0001):
78    """
79    Ensure GC collections happen in a different thread, at a high frequency.
80    """
81    threading = support.import_module('threading')
82    please_stop = False
83
84    def collect():
85        while not please_stop:
86            time.sleep(period)
87            gc.collect()
88
89    with support.disable_gc():
90        t = threading.Thread(target=collect)
91        t.start()
92        try:
93            yield
94        finally:
95            please_stop = True
96            t.join()
97
98
99class ReferencesTestCase(TestBase):
100
101    def test_basic_ref(self):
102        self.check_basic_ref(C)
103        self.check_basic_ref(create_function)
104        self.check_basic_ref(create_bound_method)
105
106        # Just make sure the tp_repr handler doesn't raise an exception.
107        # Live reference:
108        o = C()
109        wr = weakref.ref(o)
110        repr(wr)
111        # Dead reference:
112        del o
113        repr(wr)
114
115    def test_basic_callback(self):
116        self.check_basic_callback(C)
117        self.check_basic_callback(create_function)
118        self.check_basic_callback(create_bound_method)
119
120    @support.cpython_only
121    def test_cfunction(self):
122        import _testcapi
123        create_cfunction = _testcapi.create_cfunction
124        f = create_cfunction()
125        wr = weakref.ref(f)
126        self.assertIs(wr(), f)
127        del f
128        self.assertIsNone(wr())
129        self.check_basic_ref(create_cfunction)
130        self.check_basic_callback(create_cfunction)
131
132    def test_multiple_callbacks(self):
133        o = C()
134        ref1 = weakref.ref(o, self.callback)
135        ref2 = weakref.ref(o, self.callback)
136        del o
137        self.assertIsNone(ref1(), "expected reference to be invalidated")
138        self.assertIsNone(ref2(), "expected reference to be invalidated")
139        self.assertEqual(self.cbcalled, 2,
140                     "callback not called the right number of times")
141
142    def test_multiple_selfref_callbacks(self):
143        # Make sure all references are invalidated before callbacks are called
144        #
145        # What's important here is that we're using the first
146        # reference in the callback invoked on the second reference
147        # (the most recently created ref is cleaned up first).  This
148        # tests that all references to the object are invalidated
149        # before any of the callbacks are invoked, so that we only
150        # have one invocation of _weakref.c:cleanup_helper() active
151        # for a particular object at a time.
152        #
153        def callback(object, self=self):
154            self.ref()
155        c = C()
156        self.ref = weakref.ref(c, callback)
157        ref1 = weakref.ref(c, callback)
158        del c
159
160    def test_constructor_kwargs(self):
161        c = C()
162        self.assertRaises(TypeError, weakref.ref, c, callback=None)
163
164    def test_proxy_ref(self):
165        o = C()
166        o.bar = 1
167        ref1 = weakref.proxy(o, self.callback)
168        ref2 = weakref.proxy(o, self.callback)
169        del o
170
171        def check(proxy):
172            proxy.bar
173
174        self.assertRaises(ReferenceError, check, ref1)
175        self.assertRaises(ReferenceError, check, ref2)
176        self.assertRaises(ReferenceError, bool, weakref.proxy(C()))
177        self.assertEqual(self.cbcalled, 2)
178
179    def check_basic_ref(self, factory):
180        o = factory()
181        ref = weakref.ref(o)
182        self.assertIsNotNone(ref(),
183                     "weak reference to live object should be live")
184        o2 = ref()
185        self.assertIs(o, o2,
186                     "<ref>() should return original object if live")
187
188    def check_basic_callback(self, factory):
189        self.cbcalled = 0
190        o = factory()
191        ref = weakref.ref(o, self.callback)
192        del o
193        self.assertEqual(self.cbcalled, 1,
194                     "callback did not properly set 'cbcalled'")
195        self.assertIsNone(ref(),
196                     "ref2 should be dead after deleting object reference")
197
198    def test_ref_reuse(self):
199        o = C()
200        ref1 = weakref.ref(o)
201        # create a proxy to make sure that there's an intervening creation
202        # between these two; it should make no difference
203        proxy = weakref.proxy(o)
204        ref2 = weakref.ref(o)
205        self.assertIs(ref1, ref2,
206                     "reference object w/out callback should be re-used")
207
208        o = C()
209        proxy = weakref.proxy(o)
210        ref1 = weakref.ref(o)
211        ref2 = weakref.ref(o)
212        self.assertIs(ref1, ref2,
213                     "reference object w/out callback should be re-used")
214        self.assertEqual(weakref.getweakrefcount(o), 2,
215                     "wrong weak ref count for object")
216        del proxy
217        self.assertEqual(weakref.getweakrefcount(o), 1,
218                     "wrong weak ref count for object after deleting proxy")
219
220    def test_proxy_reuse(self):
221        o = C()
222        proxy1 = weakref.proxy(o)
223        ref = weakref.ref(o)
224        proxy2 = weakref.proxy(o)
225        self.assertIs(proxy1, proxy2,
226                     "proxy object w/out callback should have been re-used")
227
228    def test_basic_proxy(self):
229        o = C()
230        self.check_proxy(o, weakref.proxy(o))
231
232        L = collections.UserList()
233        p = weakref.proxy(L)
234        self.assertFalse(p, "proxy for empty UserList should be false")
235        p.append(12)
236        self.assertEqual(len(L), 1)
237        self.assertTrue(p, "proxy for non-empty UserList should be true")
238        p[:] = [2, 3]
239        self.assertEqual(len(L), 2)
240        self.assertEqual(len(p), 2)
241        self.assertIn(3, p, "proxy didn't support __contains__() properly")
242        p[1] = 5
243        self.assertEqual(L[1], 5)
244        self.assertEqual(p[1], 5)
245        L2 = collections.UserList(L)
246        p2 = weakref.proxy(L2)
247        self.assertEqual(p, p2)
248        ## self.assertEqual(repr(L2), repr(p2))
249        L3 = collections.UserList(range(10))
250        p3 = weakref.proxy(L3)
251        self.assertEqual(L3[:], p3[:])
252        self.assertEqual(L3[5:], p3[5:])
253        self.assertEqual(L3[:5], p3[:5])
254        self.assertEqual(L3[2:5], p3[2:5])
255
256    def test_proxy_unicode(self):
257        # See bug 5037
258        class C(object):
259            def __str__(self):
260                return "string"
261            def __bytes__(self):
262                return b"bytes"
263        instance = C()
264        self.assertIn("__bytes__", dir(weakref.proxy(instance)))
265        self.assertEqual(bytes(weakref.proxy(instance)), b"bytes")
266
267    def test_proxy_index(self):
268        class C:
269            def __index__(self):
270                return 10
271        o = C()
272        p = weakref.proxy(o)
273        self.assertEqual(operator.index(p), 10)
274
275    def test_proxy_div(self):
276        class C:
277            def __floordiv__(self, other):
278                return 42
279            def __ifloordiv__(self, other):
280                return 21
281        o = C()
282        p = weakref.proxy(o)
283        self.assertEqual(p // 5, 42)
284        p //= 5
285        self.assertEqual(p, 21)
286
287    # The PyWeakref_* C API is documented as allowing either NULL or
288    # None as the value for the callback, where either means "no
289    # callback".  The "no callback" ref and proxy objects are supposed
290    # to be shared so long as they exist by all callers so long as
291    # they are active.  In Python 2.3.3 and earlier, this guarantee
292    # was not honored, and was broken in different ways for
293    # PyWeakref_NewRef() and PyWeakref_NewProxy().  (Two tests.)
294
295    def test_shared_ref_without_callback(self):
296        self.check_shared_without_callback(weakref.ref)
297
298    def test_shared_proxy_without_callback(self):
299        self.check_shared_without_callback(weakref.proxy)
300
301    def check_shared_without_callback(self, makeref):
302        o = Object(1)
303        p1 = makeref(o, None)
304        p2 = makeref(o, None)
305        self.assertIs(p1, p2, "both callbacks were None in the C API")
306        del p1, p2
307        p1 = makeref(o)
308        p2 = makeref(o, None)
309        self.assertIs(p1, p2, "callbacks were NULL, None in the C API")
310        del p1, p2
311        p1 = makeref(o)
312        p2 = makeref(o)
313        self.assertIs(p1, p2, "both callbacks were NULL in the C API")
314        del p1, p2
315        p1 = makeref(o, None)
316        p2 = makeref(o)
317        self.assertIs(p1, p2, "callbacks were None, NULL in the C API")
318
319    def test_callable_proxy(self):
320        o = Callable()
321        ref1 = weakref.proxy(o)
322
323        self.check_proxy(o, ref1)
324
325        self.assertIs(type(ref1), weakref.CallableProxyType,
326                     "proxy is not of callable type")
327        ref1('twinkies!')
328        self.assertEqual(o.bar, 'twinkies!',
329                     "call through proxy not passed through to original")
330        ref1(x='Splat.')
331        self.assertEqual(o.bar, 'Splat.',
332                     "call through proxy not passed through to original")
333
334        # expect due to too few args
335        self.assertRaises(TypeError, ref1)
336
337        # expect due to too many args
338        self.assertRaises(TypeError, ref1, 1, 2, 3)
339
340    def check_proxy(self, o, proxy):
341        o.foo = 1
342        self.assertEqual(proxy.foo, 1,
343                     "proxy does not reflect attribute addition")
344        o.foo = 2
345        self.assertEqual(proxy.foo, 2,
346                     "proxy does not reflect attribute modification")
347        del o.foo
348        self.assertFalse(hasattr(proxy, 'foo'),
349                     "proxy does not reflect attribute removal")
350
351        proxy.foo = 1
352        self.assertEqual(o.foo, 1,
353                     "object does not reflect attribute addition via proxy")
354        proxy.foo = 2
355        self.assertEqual(o.foo, 2,
356            "object does not reflect attribute modification via proxy")
357        del proxy.foo
358        self.assertFalse(hasattr(o, 'foo'),
359                     "object does not reflect attribute removal via proxy")
360
361    def test_proxy_deletion(self):
362        # Test clearing of SF bug #762891
363        class Foo:
364            result = None
365            def __delitem__(self, accessor):
366                self.result = accessor
367        g = Foo()
368        f = weakref.proxy(g)
369        del f[0]
370        self.assertEqual(f.result, 0)
371
372    def test_proxy_bool(self):
373        # Test clearing of SF bug #1170766
374        class List(list): pass
375        lyst = List()
376        self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst))
377
378    def test_getweakrefcount(self):
379        o = C()
380        ref1 = weakref.ref(o)
381        ref2 = weakref.ref(o, self.callback)
382        self.assertEqual(weakref.getweakrefcount(o), 2,
383                     "got wrong number of weak reference objects")
384
385        proxy1 = weakref.proxy(o)
386        proxy2 = weakref.proxy(o, self.callback)
387        self.assertEqual(weakref.getweakrefcount(o), 4,
388                     "got wrong number of weak reference objects")
389
390        del ref1, ref2, proxy1, proxy2
391        self.assertEqual(weakref.getweakrefcount(o), 0,
392                     "weak reference objects not unlinked from"
393                     " referent when discarded.")
394
395        # assumes ints do not support weakrefs
396        self.assertEqual(weakref.getweakrefcount(1), 0,
397                     "got wrong number of weak reference objects for int")
398
399    def test_getweakrefs(self):
400        o = C()
401        ref1 = weakref.ref(o, self.callback)
402        ref2 = weakref.ref(o, self.callback)
403        del ref1
404        self.assertEqual(weakref.getweakrefs(o), [ref2],
405                     "list of refs does not match")
406
407        o = C()
408        ref1 = weakref.ref(o, self.callback)
409        ref2 = weakref.ref(o, self.callback)
410        del ref2
411        self.assertEqual(weakref.getweakrefs(o), [ref1],
412                     "list of refs does not match")
413
414        del ref1
415        self.assertEqual(weakref.getweakrefs(o), [],
416                     "list of refs not cleared")
417
418        # assumes ints do not support weakrefs
419        self.assertEqual(weakref.getweakrefs(1), [],
420                     "list of refs does not match for int")
421
422    def test_newstyle_number_ops(self):
423        class F(float):
424            pass
425        f = F(2.0)
426        p = weakref.proxy(f)
427        self.assertEqual(p + 1.0, 3.0)
428        self.assertEqual(1.0 + p, 3.0)  # this used to SEGV
429
430    def test_callbacks_protected(self):
431        # Callbacks protected from already-set exceptions?
432        # Regression test for SF bug #478534.
433        class BogusError(Exception):
434            pass
435        data = {}
436        def remove(k):
437            del data[k]
438        def encapsulate():
439            f = lambda : ()
440            data[weakref.ref(f, remove)] = None
441            raise BogusError
442        try:
443            encapsulate()
444        except BogusError:
445            pass
446        else:
447            self.fail("exception not properly restored")
448        try:
449            encapsulate()
450        except BogusError:
451            pass
452        else:
453            self.fail("exception not properly restored")
454
455    def test_sf_bug_840829(self):
456        # "weakref callbacks and gc corrupt memory"
457        # subtype_dealloc erroneously exposed a new-style instance
458        # already in the process of getting deallocated to gc,
459        # causing double-deallocation if the instance had a weakref
460        # callback that triggered gc.
461        # If the bug exists, there probably won't be an obvious symptom
462        # in a release build.  In a debug build, a segfault will occur
463        # when the second attempt to remove the instance from the "list
464        # of all objects" occurs.
465
466        import gc
467
468        class C(object):
469            pass
470
471        c = C()
472        wr = weakref.ref(c, lambda ignore: gc.collect())
473        del c
474
475        # There endeth the first part.  It gets worse.
476        del wr
477
478        c1 = C()
479        c1.i = C()
480        wr = weakref.ref(c1.i, lambda ignore: gc.collect())
481
482        c2 = C()
483        c2.c1 = c1
484        del c1  # still alive because c2 points to it
485
486        # Now when subtype_dealloc gets called on c2, it's not enough just
487        # that c2 is immune from gc while the weakref callbacks associated
488        # with c2 execute (there are none in this 2nd half of the test, btw).
489        # subtype_dealloc goes on to call the base classes' deallocs too,
490        # so any gc triggered by weakref callbacks associated with anything
491        # torn down by a base class dealloc can also trigger double
492        # deallocation of c2.
493        del c2
494
495    def test_callback_in_cycle_1(self):
496        import gc
497
498        class J(object):
499            pass
500
501        class II(object):
502            def acallback(self, ignore):
503                self.J
504
505        I = II()
506        I.J = J
507        I.wr = weakref.ref(J, I.acallback)
508
509        # Now J and II are each in a self-cycle (as all new-style class
510        # objects are, since their __mro__ points back to them).  I holds
511        # both a weak reference (I.wr) and a strong reference (I.J) to class
512        # J.  I is also in a cycle (I.wr points to a weakref that references
513        # I.acallback).  When we del these three, they all become trash, but
514        # the cycles prevent any of them from getting cleaned up immediately.
515        # Instead they have to wait for cyclic gc to deduce that they're
516        # trash.
517        #
518        # gc used to call tp_clear on all of them, and the order in which
519        # it does that is pretty accidental.  The exact order in which we
520        # built up these things manages to provoke gc into running tp_clear
521        # in just the right order (I last).  Calling tp_clear on II leaves
522        # behind an insane class object (its __mro__ becomes NULL).  Calling
523        # tp_clear on J breaks its self-cycle, but J doesn't get deleted
524        # just then because of the strong reference from I.J.  Calling
525        # tp_clear on I starts to clear I's __dict__, and just happens to
526        # clear I.J first -- I.wr is still intact.  That removes the last
527        # reference to J, which triggers the weakref callback.  The callback
528        # tries to do "self.J", and instances of new-style classes look up
529        # attributes ("J") in the class dict first.  The class (II) wants to
530        # search II.__mro__, but that's NULL.   The result was a segfault in
531        # a release build, and an assert failure in a debug build.
532        del I, J, II
533        gc.collect()
534
535    def test_callback_in_cycle_2(self):
536        import gc
537
538        # This is just like test_callback_in_cycle_1, except that II is an
539        # old-style class.  The symptom is different then:  an instance of an
540        # old-style class looks in its own __dict__ first.  'J' happens to
541        # get cleared from I.__dict__ before 'wr', and 'J' was never in II's
542        # __dict__, so the attribute isn't found.  The difference is that
543        # the old-style II doesn't have a NULL __mro__ (it doesn't have any
544        # __mro__), so no segfault occurs.  Instead it got:
545        #    test_callback_in_cycle_2 (__main__.ReferencesTestCase) ...
546        #    Exception exceptions.AttributeError:
547        #   "II instance has no attribute 'J'" in <bound method II.acallback
548        #       of <?.II instance at 0x00B9B4B8>> ignored
549
550        class J(object):
551            pass
552
553        class II:
554            def acallback(self, ignore):
555                self.J
556
557        I = II()
558        I.J = J
559        I.wr = weakref.ref(J, I.acallback)
560
561        del I, J, II
562        gc.collect()
563
564    def test_callback_in_cycle_3(self):
565        import gc
566
567        # This one broke the first patch that fixed the last two.  In this
568        # case, the objects reachable from the callback aren't also reachable
569        # from the object (c1) *triggering* the callback:  you can get to
570        # c1 from c2, but not vice-versa.  The result was that c2's __dict__
571        # got tp_clear'ed by the time the c2.cb callback got invoked.
572
573        class C:
574            def cb(self, ignore):
575                self.me
576                self.c1
577                self.wr
578
579        c1, c2 = C(), C()
580
581        c2.me = c2
582        c2.c1 = c1
583        c2.wr = weakref.ref(c1, c2.cb)
584
585        del c1, c2
586        gc.collect()
587
588    def test_callback_in_cycle_4(self):
589        import gc
590
591        # Like test_callback_in_cycle_3, except c2 and c1 have different
592        # classes.  c2's class (C) isn't reachable from c1 then, so protecting
593        # objects reachable from the dying object (c1) isn't enough to stop
594        # c2's class (C) from getting tp_clear'ed before c2.cb is invoked.
595        # The result was a segfault (C.__mro__ was NULL when the callback
596        # tried to look up self.me).
597
598        class C(object):
599            def cb(self, ignore):
600                self.me
601                self.c1
602                self.wr
603
604        class D:
605            pass
606
607        c1, c2 = D(), C()
608
609        c2.me = c2
610        c2.c1 = c1
611        c2.wr = weakref.ref(c1, c2.cb)
612
613        del c1, c2, C, D
614        gc.collect()
615
616    @support.requires_type_collecting
617    def test_callback_in_cycle_resurrection(self):
618        import gc
619
620        # Do something nasty in a weakref callback:  resurrect objects
621        # from dead cycles.  For this to be attempted, the weakref and
622        # its callback must also be part of the cyclic trash (else the
623        # objects reachable via the callback couldn't be in cyclic trash
624        # to begin with -- the callback would act like an external root).
625        # But gc clears trash weakrefs with callbacks early now, which
626        # disables the callbacks, so the callbacks shouldn't get called
627        # at all (and so nothing actually gets resurrected).
628
629        alist = []
630        class C(object):
631            def __init__(self, value):
632                self.attribute = value
633
634            def acallback(self, ignore):
635                alist.append(self.c)
636
637        c1, c2 = C(1), C(2)
638        c1.c = c2
639        c2.c = c1
640        c1.wr = weakref.ref(c2, c1.acallback)
641        c2.wr = weakref.ref(c1, c2.acallback)
642
643        def C_went_away(ignore):
644            alist.append("C went away")
645        wr = weakref.ref(C, C_went_away)
646
647        del c1, c2, C   # make them all trash
648        self.assertEqual(alist, [])  # del isn't enough to reclaim anything
649
650        gc.collect()
651        # c1.wr and c2.wr were part of the cyclic trash, so should have
652        # been cleared without their callbacks executing.  OTOH, the weakref
653        # to C is bound to a function local (wr), and wasn't trash, so that
654        # callback should have been invoked when C went away.
655        self.assertEqual(alist, ["C went away"])
656        # The remaining weakref should be dead now (its callback ran).
657        self.assertEqual(wr(), None)
658
659        del alist[:]
660        gc.collect()
661        self.assertEqual(alist, [])
662
663    def test_callbacks_on_callback(self):
664        import gc
665
666        # Set up weakref callbacks *on* weakref callbacks.
667        alist = []
668        def safe_callback(ignore):
669            alist.append("safe_callback called")
670
671        class C(object):
672            def cb(self, ignore):
673                alist.append("cb called")
674
675        c, d = C(), C()
676        c.other = d
677        d.other = c
678        callback = c.cb
679        c.wr = weakref.ref(d, callback)     # this won't trigger
680        d.wr = weakref.ref(callback, d.cb)  # ditto
681        external_wr = weakref.ref(callback, safe_callback)  # but this will
682        self.assertIs(external_wr(), callback)
683
684        # The weakrefs attached to c and d should get cleared, so that
685        # C.cb is never called.  But external_wr isn't part of the cyclic
686        # trash, and no cyclic trash is reachable from it, so safe_callback
687        # should get invoked when the bound method object callback (c.cb)
688        # -- which is itself a callback, and also part of the cyclic trash --
689        # gets reclaimed at the end of gc.
690
691        del callback, c, d, C
692        self.assertEqual(alist, [])  # del isn't enough to clean up cycles
693        gc.collect()
694        self.assertEqual(alist, ["safe_callback called"])
695        self.assertEqual(external_wr(), None)
696
697        del alist[:]
698        gc.collect()
699        self.assertEqual(alist, [])
700
701    def test_gc_during_ref_creation(self):
702        self.check_gc_during_creation(weakref.ref)
703
704    def test_gc_during_proxy_creation(self):
705        self.check_gc_during_creation(weakref.proxy)
706
707    def check_gc_during_creation(self, makeref):
708        thresholds = gc.get_threshold()
709        gc.set_threshold(1, 1, 1)
710        gc.collect()
711        class A:
712            pass
713
714        def callback(*args):
715            pass
716
717        referenced = A()
718
719        a = A()
720        a.a = a
721        a.wr = makeref(referenced)
722
723        try:
724            # now make sure the object and the ref get labeled as
725            # cyclic trash:
726            a = A()
727            weakref.ref(referenced, callback)
728
729        finally:
730            gc.set_threshold(*thresholds)
731
732    def test_ref_created_during_del(self):
733        # Bug #1377858
734        # A weakref created in an object's __del__() would crash the
735        # interpreter when the weakref was cleaned up since it would refer to
736        # non-existent memory.  This test should not segfault the interpreter.
737        class Target(object):
738            def __del__(self):
739                global ref_from_del
740                ref_from_del = weakref.ref(self)
741
742        w = Target()
743
744    def test_init(self):
745        # Issue 3634
746        # <weakref to class>.__init__() doesn't check errors correctly
747        r = weakref.ref(Exception)
748        self.assertRaises(TypeError, r.__init__, 0, 0, 0, 0, 0)
749        # No exception should be raised here
750        gc.collect()
751
752    def test_classes(self):
753        # Check that classes are weakrefable.
754        class A(object):
755            pass
756        l = []
757        weakref.ref(int)
758        a = weakref.ref(A, l.append)
759        A = None
760        gc.collect()
761        self.assertEqual(a(), None)
762        self.assertEqual(l, [a])
763
764    def test_equality(self):
765        # Alive weakrefs defer equality testing to their underlying object.
766        x = Object(1)
767        y = Object(1)
768        z = Object(2)
769        a = weakref.ref(x)
770        b = weakref.ref(y)
771        c = weakref.ref(z)
772        d = weakref.ref(x)
773        # Note how we directly test the operators here, to stress both
774        # __eq__ and __ne__.
775        self.assertTrue(a == b)
776        self.assertFalse(a != b)
777        self.assertFalse(a == c)
778        self.assertTrue(a != c)
779        self.assertTrue(a == d)
780        self.assertFalse(a != d)
781        del x, y, z
782        gc.collect()
783        for r in a, b, c:
784            # Sanity check
785            self.assertIs(r(), None)
786        # Dead weakrefs compare by identity: whether `a` and `d` are the
787        # same weakref object is an implementation detail, since they pointed
788        # to the same original object and didn't have a callback.
789        # (see issue #16453).
790        self.assertFalse(a == b)
791        self.assertTrue(a != b)
792        self.assertFalse(a == c)
793        self.assertTrue(a != c)
794        self.assertEqual(a == d, a is d)
795        self.assertEqual(a != d, a is not d)
796
797    def test_ordering(self):
798        # weakrefs cannot be ordered, even if the underlying objects can.
799        ops = [operator.lt, operator.gt, operator.le, operator.ge]
800        x = Object(1)
801        y = Object(1)
802        a = weakref.ref(x)
803        b = weakref.ref(y)
804        for op in ops:
805            self.assertRaises(TypeError, op, a, b)
806        # Same when dead.
807        del x, y
808        gc.collect()
809        for op in ops:
810            self.assertRaises(TypeError, op, a, b)
811
812    def test_hashing(self):
813        # Alive weakrefs hash the same as the underlying object
814        x = Object(42)
815        y = Object(42)
816        a = weakref.ref(x)
817        b = weakref.ref(y)
818        self.assertEqual(hash(a), hash(42))
819        del x, y
820        gc.collect()
821        # Dead weakrefs:
822        # - retain their hash is they were hashed when alive;
823        # - otherwise, cannot be hashed.
824        self.assertEqual(hash(a), hash(42))
825        self.assertRaises(TypeError, hash, b)
826
827    def test_trashcan_16602(self):
828        # Issue #16602: when a weakref's target was part of a long
829        # deallocation chain, the trashcan mechanism could delay clearing
830        # of the weakref and make the target object visible from outside
831        # code even though its refcount had dropped to 0.  A crash ensued.
832        class C:
833            def __init__(self, parent):
834                if not parent:
835                    return
836                wself = weakref.ref(self)
837                def cb(wparent):
838                    o = wself()
839                self.wparent = weakref.ref(parent, cb)
840
841        d = weakref.WeakKeyDictionary()
842        root = c = C(None)
843        for n in range(100):
844            d[c] = c = C(c)
845        del root
846        gc.collect()
847
848    def test_callback_attribute(self):
849        x = Object(1)
850        callback = lambda ref: None
851        ref1 = weakref.ref(x, callback)
852        self.assertIs(ref1.__callback__, callback)
853
854        ref2 = weakref.ref(x)
855        self.assertIsNone(ref2.__callback__)
856
857    def test_callback_attribute_after_deletion(self):
858        x = Object(1)
859        ref = weakref.ref(x, self.callback)
860        self.assertIsNotNone(ref.__callback__)
861        del x
862        support.gc_collect()
863        self.assertIsNone(ref.__callback__)
864
865    def test_set_callback_attribute(self):
866        x = Object(1)
867        callback = lambda ref: None
868        ref1 = weakref.ref(x, callback)
869        with self.assertRaises(AttributeError):
870            ref1.__callback__ = lambda ref: None
871
872    def test_callback_gcs(self):
873        class ObjectWithDel(Object):
874            def __del__(self): pass
875        x = ObjectWithDel(1)
876        ref1 = weakref.ref(x, lambda ref: support.gc_collect())
877        del x
878        support.gc_collect()
879
880
881class SubclassableWeakrefTestCase(TestBase):
882
883    def test_subclass_refs(self):
884        class MyRef(weakref.ref):
885            def __init__(self, ob, callback=None, value=42):
886                self.value = value
887                super().__init__(ob, callback)
888            def __call__(self):
889                self.called = True
890                return super().__call__()
891        o = Object("foo")
892        mr = MyRef(o, value=24)
893        self.assertIs(mr(), o)
894        self.assertTrue(mr.called)
895        self.assertEqual(mr.value, 24)
896        del o
897        self.assertIsNone(mr())
898        self.assertTrue(mr.called)
899
900    def test_subclass_refs_dont_replace_standard_refs(self):
901        class MyRef(weakref.ref):
902            pass
903        o = Object(42)
904        r1 = MyRef(o)
905        r2 = weakref.ref(o)
906        self.assertIsNot(r1, r2)
907        self.assertEqual(weakref.getweakrefs(o), [r2, r1])
908        self.assertEqual(weakref.getweakrefcount(o), 2)
909        r3 = MyRef(o)
910        self.assertEqual(weakref.getweakrefcount(o), 3)
911        refs = weakref.getweakrefs(o)
912        self.assertEqual(len(refs), 3)
913        self.assertIs(r2, refs[0])
914        self.assertIn(r1, refs[1:])
915        self.assertIn(r3, refs[1:])
916
917    def test_subclass_refs_dont_conflate_callbacks(self):
918        class MyRef(weakref.ref):
919            pass
920        o = Object(42)
921        r1 = MyRef(o, id)
922        r2 = MyRef(o, str)
923        self.assertIsNot(r1, r2)
924        refs = weakref.getweakrefs(o)
925        self.assertIn(r1, refs)
926        self.assertIn(r2, refs)
927
928    def test_subclass_refs_with_slots(self):
929        class MyRef(weakref.ref):
930            __slots__ = "slot1", "slot2"
931            def __new__(type, ob, callback, slot1, slot2):
932                return weakref.ref.__new__(type, ob, callback)
933            def __init__(self, ob, callback, slot1, slot2):
934                self.slot1 = slot1
935                self.slot2 = slot2
936            def meth(self):
937                return self.slot1 + self.slot2
938        o = Object(42)
939        r = MyRef(o, None, "abc", "def")
940        self.assertEqual(r.slot1, "abc")
941        self.assertEqual(r.slot2, "def")
942        self.assertEqual(r.meth(), "abcdef")
943        self.assertFalse(hasattr(r, "__dict__"))
944
945    def test_subclass_refs_with_cycle(self):
946        """Confirm https://bugs.python.org/issue3100 is fixed."""
947        # An instance of a weakref subclass can have attributes.
948        # If such a weakref holds the only strong reference to the object,
949        # deleting the weakref will delete the object. In this case,
950        # the callback must not be called, because the ref object is
951        # being deleted.
952        class MyRef(weakref.ref):
953            pass
954
955        # Use a local callback, for "regrtest -R::"
956        # to detect refcounting problems
957        def callback(w):
958            self.cbcalled += 1
959
960        o = C()
961        r1 = MyRef(o, callback)
962        r1.o = o
963        del o
964
965        del r1 # Used to crash here
966
967        self.assertEqual(self.cbcalled, 0)
968
969        # Same test, with two weakrefs to the same object
970        # (since code paths are different)
971        o = C()
972        r1 = MyRef(o, callback)
973        r2 = MyRef(o, callback)
974        r1.r = r2
975        r2.o = o
976        del o
977        del r2
978
979        del r1 # Used to crash here
980
981        self.assertEqual(self.cbcalled, 0)
982
983
984class WeakMethodTestCase(unittest.TestCase):
985
986    def _subclass(self):
987        """Return an Object subclass overriding `some_method`."""
988        class C(Object):
989            def some_method(self):
990                return 6
991        return C
992
993    def test_alive(self):
994        o = Object(1)
995        r = weakref.WeakMethod(o.some_method)
996        self.assertIsInstance(r, weakref.ReferenceType)
997        self.assertIsInstance(r(), type(o.some_method))
998        self.assertIs(r().__self__, o)
999        self.assertIs(r().__func__, o.some_method.__func__)
1000        self.assertEqual(r()(), 4)
1001
1002    def test_object_dead(self):
1003        o = Object(1)
1004        r = weakref.WeakMethod(o.some_method)
1005        del o
1006        gc.collect()
1007        self.assertIs(r(), None)
1008
1009    def test_method_dead(self):
1010        C = self._subclass()
1011        o = C(1)
1012        r = weakref.WeakMethod(o.some_method)
1013        del C.some_method
1014        gc.collect()
1015        self.assertIs(r(), None)
1016
1017    def test_callback_when_object_dead(self):
1018        # Test callback behaviour when object dies first.
1019        C = self._subclass()
1020        calls = []
1021        def cb(arg):
1022            calls.append(arg)
1023        o = C(1)
1024        r = weakref.WeakMethod(o.some_method, cb)
1025        del o
1026        gc.collect()
1027        self.assertEqual(calls, [r])
1028        # Callback is only called once.
1029        C.some_method = Object.some_method
1030        gc.collect()
1031        self.assertEqual(calls, [r])
1032
1033    def test_callback_when_method_dead(self):
1034        # Test callback behaviour when method dies first.
1035        C = self._subclass()
1036        calls = []
1037        def cb(arg):
1038            calls.append(arg)
1039        o = C(1)
1040        r = weakref.WeakMethod(o.some_method, cb)
1041        del C.some_method
1042        gc.collect()
1043        self.assertEqual(calls, [r])
1044        # Callback is only called once.
1045        del o
1046        gc.collect()
1047        self.assertEqual(calls, [r])
1048
1049    @support.cpython_only
1050    def test_no_cycles(self):
1051        # A WeakMethod doesn't create any reference cycle to itself.
1052        o = Object(1)
1053        def cb(_):
1054            pass
1055        r = weakref.WeakMethod(o.some_method, cb)
1056        wr = weakref.ref(r)
1057        del r
1058        self.assertIs(wr(), None)
1059
1060    def test_equality(self):
1061        def _eq(a, b):
1062            self.assertTrue(a == b)
1063            self.assertFalse(a != b)
1064        def _ne(a, b):
1065            self.assertTrue(a != b)
1066            self.assertFalse(a == b)
1067        x = Object(1)
1068        y = Object(1)
1069        a = weakref.WeakMethod(x.some_method)
1070        b = weakref.WeakMethod(y.some_method)
1071        c = weakref.WeakMethod(x.other_method)
1072        d = weakref.WeakMethod(y.other_method)
1073        # Objects equal, same method
1074        _eq(a, b)
1075        _eq(c, d)
1076        # Objects equal, different method
1077        _ne(a, c)
1078        _ne(a, d)
1079        _ne(b, c)
1080        _ne(b, d)
1081        # Objects unequal, same or different method
1082        z = Object(2)
1083        e = weakref.WeakMethod(z.some_method)
1084        f = weakref.WeakMethod(z.other_method)
1085        _ne(a, e)
1086        _ne(a, f)
1087        _ne(b, e)
1088        _ne(b, f)
1089        del x, y, z
1090        gc.collect()
1091        # Dead WeakMethods compare by identity
1092        refs = a, b, c, d, e, f
1093        for q in refs:
1094            for r in refs:
1095                self.assertEqual(q == r, q is r)
1096                self.assertEqual(q != r, q is not r)
1097
1098    def test_hashing(self):
1099        # Alive WeakMethods are hashable if the underlying object is
1100        # hashable.
1101        x = Object(1)
1102        y = Object(1)
1103        a = weakref.WeakMethod(x.some_method)
1104        b = weakref.WeakMethod(y.some_method)
1105        c = weakref.WeakMethod(y.other_method)
1106        # Since WeakMethod objects are equal, the hashes should be equal.
1107        self.assertEqual(hash(a), hash(b))
1108        ha = hash(a)
1109        # Dead WeakMethods retain their old hash value
1110        del x, y
1111        gc.collect()
1112        self.assertEqual(hash(a), ha)
1113        self.assertEqual(hash(b), ha)
1114        # If it wasn't hashed when alive, a dead WeakMethod cannot be hashed.
1115        self.assertRaises(TypeError, hash, c)
1116
1117
1118class MappingTestCase(TestBase):
1119
1120    COUNT = 10
1121
1122    def check_len_cycles(self, dict_type, cons):
1123        N = 20
1124        items = [RefCycle() for i in range(N)]
1125        dct = dict_type(cons(o) for o in items)
1126        # Keep an iterator alive
1127        it = dct.items()
1128        try:
1129            next(it)
1130        except StopIteration:
1131            pass
1132        del items
1133        gc.collect()
1134        n1 = len(dct)
1135        del it
1136        gc.collect()
1137        n2 = len(dct)
1138        # one item may be kept alive inside the iterator
1139        self.assertIn(n1, (0, 1))
1140        self.assertEqual(n2, 0)
1141
1142    def test_weak_keyed_len_cycles(self):
1143        self.check_len_cycles(weakref.WeakKeyDictionary, lambda k: (k, 1))
1144
1145    def test_weak_valued_len_cycles(self):
1146        self.check_len_cycles(weakref.WeakValueDictionary, lambda k: (1, k))
1147
1148    def check_len_race(self, dict_type, cons):
1149        # Extended sanity checks for len() in the face of cyclic collection
1150        self.addCleanup(gc.set_threshold, *gc.get_threshold())
1151        for th in range(1, 100):
1152            N = 20
1153            gc.collect(0)
1154            gc.set_threshold(th, th, th)
1155            items = [RefCycle() for i in range(N)]
1156            dct = dict_type(cons(o) for o in items)
1157            del items
1158            # All items will be collected at next garbage collection pass
1159            it = dct.items()
1160            try:
1161                next(it)
1162            except StopIteration:
1163                pass
1164            n1 = len(dct)
1165            del it
1166            n2 = len(dct)
1167            self.assertGreaterEqual(n1, 0)
1168            self.assertLessEqual(n1, N)
1169            self.assertGreaterEqual(n2, 0)
1170            self.assertLessEqual(n2, n1)
1171
1172    def test_weak_keyed_len_race(self):
1173        self.check_len_race(weakref.WeakKeyDictionary, lambda k: (k, 1))
1174
1175    def test_weak_valued_len_race(self):
1176        self.check_len_race(weakref.WeakValueDictionary, lambda k: (1, k))
1177
1178    def test_weak_values(self):
1179        #
1180        #  This exercises d.copy(), d.items(), d[], del d[], len(d).
1181        #
1182        dict, objects = self.make_weak_valued_dict()
1183        for o in objects:
1184            self.assertEqual(weakref.getweakrefcount(o), 1)
1185            self.assertIs(o, dict[o.arg],
1186                         "wrong object returned by weak dict!")
1187        items1 = list(dict.items())
1188        items2 = list(dict.copy().items())
1189        items1.sort()
1190        items2.sort()
1191        self.assertEqual(items1, items2,
1192                     "cloning of weak-valued dictionary did not work!")
1193        del items1, items2
1194        self.assertEqual(len(dict), self.COUNT)
1195        del objects[0]
1196        self.assertEqual(len(dict), self.COUNT - 1,
1197                     "deleting object did not cause dictionary update")
1198        del objects, o
1199        self.assertEqual(len(dict), 0,
1200                     "deleting the values did not clear the dictionary")
1201        # regression on SF bug #447152:
1202        dict = weakref.WeakValueDictionary()
1203        self.assertRaises(KeyError, dict.__getitem__, 1)
1204        dict[2] = C()
1205        self.assertRaises(KeyError, dict.__getitem__, 2)
1206
1207    def test_weak_keys(self):
1208        #
1209        #  This exercises d.copy(), d.items(), d[] = v, d[], del d[],
1210        #  len(d), k in d.
1211        #
1212        dict, objects = self.make_weak_keyed_dict()
1213        for o in objects:
1214            self.assertEqual(weakref.getweakrefcount(o), 1,
1215                         "wrong number of weak references to %r!" % o)
1216            self.assertIs(o.arg, dict[o],
1217                         "wrong object returned by weak dict!")
1218        items1 = dict.items()
1219        items2 = dict.copy().items()
1220        self.assertEqual(set(items1), set(items2),
1221                     "cloning of weak-keyed dictionary did not work!")
1222        del items1, items2
1223        self.assertEqual(len(dict), self.COUNT)
1224        del objects[0]
1225        self.assertEqual(len(dict), (self.COUNT - 1),
1226                     "deleting object did not cause dictionary update")
1227        del objects, o
1228        self.assertEqual(len(dict), 0,
1229                     "deleting the keys did not clear the dictionary")
1230        o = Object(42)
1231        dict[o] = "What is the meaning of the universe?"
1232        self.assertIn(o, dict)
1233        self.assertNotIn(34, dict)
1234
1235    def test_weak_keyed_iters(self):
1236        dict, objects = self.make_weak_keyed_dict()
1237        self.check_iters(dict)
1238
1239        # Test keyrefs()
1240        refs = dict.keyrefs()
1241        self.assertEqual(len(refs), len(objects))
1242        objects2 = list(objects)
1243        for wr in refs:
1244            ob = wr()
1245            self.assertIn(ob, dict)
1246            self.assertIn(ob, dict)
1247            self.assertEqual(ob.arg, dict[ob])
1248            objects2.remove(ob)
1249        self.assertEqual(len(objects2), 0)
1250
1251        # Test iterkeyrefs()
1252        objects2 = list(objects)
1253        self.assertEqual(len(list(dict.keyrefs())), len(objects))
1254        for wr in dict.keyrefs():
1255            ob = wr()
1256            self.assertIn(ob, dict)
1257            self.assertIn(ob, dict)
1258            self.assertEqual(ob.arg, dict[ob])
1259            objects2.remove(ob)
1260        self.assertEqual(len(objects2), 0)
1261
1262    def test_weak_valued_iters(self):
1263        dict, objects = self.make_weak_valued_dict()
1264        self.check_iters(dict)
1265
1266        # Test valuerefs()
1267        refs = dict.valuerefs()
1268        self.assertEqual(len(refs), len(objects))
1269        objects2 = list(objects)
1270        for wr in refs:
1271            ob = wr()
1272            self.assertEqual(ob, dict[ob.arg])
1273            self.assertEqual(ob.arg, dict[ob.arg].arg)
1274            objects2.remove(ob)
1275        self.assertEqual(len(objects2), 0)
1276
1277        # Test itervaluerefs()
1278        objects2 = list(objects)
1279        self.assertEqual(len(list(dict.itervaluerefs())), len(objects))
1280        for wr in dict.itervaluerefs():
1281            ob = wr()
1282            self.assertEqual(ob, dict[ob.arg])
1283            self.assertEqual(ob.arg, dict[ob.arg].arg)
1284            objects2.remove(ob)
1285        self.assertEqual(len(objects2), 0)
1286
1287    def check_iters(self, dict):
1288        # item iterator:
1289        items = list(dict.items())
1290        for item in dict.items():
1291            items.remove(item)
1292        self.assertFalse(items, "items() did not touch all items")
1293
1294        # key iterator, via __iter__():
1295        keys = list(dict.keys())
1296        for k in dict:
1297            keys.remove(k)
1298        self.assertFalse(keys, "__iter__() did not touch all keys")
1299
1300        # key iterator, via iterkeys():
1301        keys = list(dict.keys())
1302        for k in dict.keys():
1303            keys.remove(k)
1304        self.assertFalse(keys, "iterkeys() did not touch all keys")
1305
1306        # value iterator:
1307        values = list(dict.values())
1308        for v in dict.values():
1309            values.remove(v)
1310        self.assertFalse(values,
1311                     "itervalues() did not touch all values")
1312
1313    def check_weak_destroy_while_iterating(self, dict, objects, iter_name):
1314        n = len(dict)
1315        it = iter(getattr(dict, iter_name)())
1316        next(it)             # Trigger internal iteration
1317        # Destroy an object
1318        del objects[-1]
1319        gc.collect()    # just in case
1320        # We have removed either the first consumed object, or another one
1321        self.assertIn(len(list(it)), [len(objects), len(objects) - 1])
1322        del it
1323        # The removal has been committed
1324        self.assertEqual(len(dict), n - 1)
1325
1326    def check_weak_destroy_and_mutate_while_iterating(self, dict, testcontext):
1327        # Check that we can explicitly mutate the weak dict without
1328        # interfering with delayed removal.
1329        # `testcontext` should create an iterator, destroy one of the
1330        # weakref'ed objects and then return a new key/value pair corresponding
1331        # to the destroyed object.
1332        with testcontext() as (k, v):
1333            self.assertNotIn(k, dict)
1334        with testcontext() as (k, v):
1335            self.assertRaises(KeyError, dict.__delitem__, k)
1336        self.assertNotIn(k, dict)
1337        with testcontext() as (k, v):
1338            self.assertRaises(KeyError, dict.pop, k)
1339        self.assertNotIn(k, dict)
1340        with testcontext() as (k, v):
1341            dict[k] = v
1342        self.assertEqual(dict[k], v)
1343        ddict = copy.copy(dict)
1344        with testcontext() as (k, v):
1345            dict.update(ddict)
1346        self.assertEqual(dict, ddict)
1347        with testcontext() as (k, v):
1348            dict.clear()
1349        self.assertEqual(len(dict), 0)
1350
1351    def check_weak_del_and_len_while_iterating(self, dict, testcontext):
1352        # Check that len() works when both iterating and removing keys
1353        # explicitly through various means (.pop(), .clear()...), while
1354        # implicit mutation is deferred because an iterator is alive.
1355        # (each call to testcontext() should schedule one item for removal
1356        #  for this test to work properly)
1357        o = Object(123456)
1358        with testcontext():
1359            n = len(dict)
1360            # Since underlaying dict is ordered, first item is popped
1361            dict.pop(next(dict.keys()))
1362            self.assertEqual(len(dict), n - 1)
1363            dict[o] = o
1364            self.assertEqual(len(dict), n)
1365        # last item in objects is removed from dict in context shutdown
1366        with testcontext():
1367            self.assertEqual(len(dict), n - 1)
1368            # Then, (o, o) is popped
1369            dict.popitem()
1370            self.assertEqual(len(dict), n - 2)
1371        with testcontext():
1372            self.assertEqual(len(dict), n - 3)
1373            del dict[next(dict.keys())]
1374            self.assertEqual(len(dict), n - 4)
1375        with testcontext():
1376            self.assertEqual(len(dict), n - 5)
1377            dict.popitem()
1378            self.assertEqual(len(dict), n - 6)
1379        with testcontext():
1380            dict.clear()
1381            self.assertEqual(len(dict), 0)
1382        self.assertEqual(len(dict), 0)
1383
1384    def test_weak_keys_destroy_while_iterating(self):
1385        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
1386        dict, objects = self.make_weak_keyed_dict()
1387        self.check_weak_destroy_while_iterating(dict, objects, 'keys')
1388        self.check_weak_destroy_while_iterating(dict, objects, 'items')
1389        self.check_weak_destroy_while_iterating(dict, objects, 'values')
1390        self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs')
1391        dict, objects = self.make_weak_keyed_dict()
1392        @contextlib.contextmanager
1393        def testcontext():
1394            try:
1395                it = iter(dict.items())
1396                next(it)
1397                # Schedule a key/value for removal and recreate it
1398                v = objects.pop().arg
1399                gc.collect()      # just in case
1400                yield Object(v), v
1401            finally:
1402                it = None           # should commit all removals
1403                gc.collect()
1404        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
1405        # Issue #21173: len() fragile when keys are both implicitly and
1406        # explicitly removed.
1407        dict, objects = self.make_weak_keyed_dict()
1408        self.check_weak_del_and_len_while_iterating(dict, testcontext)
1409
1410    def test_weak_values_destroy_while_iterating(self):
1411        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
1412        dict, objects = self.make_weak_valued_dict()
1413        self.check_weak_destroy_while_iterating(dict, objects, 'keys')
1414        self.check_weak_destroy_while_iterating(dict, objects, 'items')
1415        self.check_weak_destroy_while_iterating(dict, objects, 'values')
1416        self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
1417        self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs')
1418        dict, objects = self.make_weak_valued_dict()
1419        @contextlib.contextmanager
1420        def testcontext():
1421            try:
1422                it = iter(dict.items())
1423                next(it)
1424                # Schedule a key/value for removal and recreate it
1425                k = objects.pop().arg
1426                gc.collect()      # just in case
1427                yield k, Object(k)
1428            finally:
1429                it = None           # should commit all removals
1430                gc.collect()
1431        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
1432        dict, objects = self.make_weak_valued_dict()
1433        self.check_weak_del_and_len_while_iterating(dict, testcontext)
1434
1435    def test_make_weak_keyed_dict_from_dict(self):
1436        o = Object(3)
1437        dict = weakref.WeakKeyDictionary({o:364})
1438        self.assertEqual(dict[o], 364)
1439
1440    def test_make_weak_keyed_dict_from_weak_keyed_dict(self):
1441        o = Object(3)
1442        dict = weakref.WeakKeyDictionary({o:364})
1443        dict2 = weakref.WeakKeyDictionary(dict)
1444        self.assertEqual(dict[o], 364)
1445
1446    def make_weak_keyed_dict(self):
1447        dict = weakref.WeakKeyDictionary()
1448        objects = list(map(Object, range(self.COUNT)))
1449        for o in objects:
1450            dict[o] = o.arg
1451        return dict, objects
1452
1453    def test_make_weak_valued_dict_from_dict(self):
1454        o = Object(3)
1455        dict = weakref.WeakValueDictionary({364:o})
1456        self.assertEqual(dict[364], o)
1457
1458    def test_make_weak_valued_dict_from_weak_valued_dict(self):
1459        o = Object(3)
1460        dict = weakref.WeakValueDictionary({364:o})
1461        dict2 = weakref.WeakValueDictionary(dict)
1462        self.assertEqual(dict[364], o)
1463
1464    def test_make_weak_valued_dict_misc(self):
1465        # errors
1466        self.assertRaises(TypeError, weakref.WeakValueDictionary.__init__)
1467        self.assertRaises(TypeError, weakref.WeakValueDictionary, {}, {})
1468        self.assertRaises(TypeError, weakref.WeakValueDictionary, (), ())
1469        # special keyword arguments
1470        o = Object(3)
1471        for kw in 'self', 'dict', 'other', 'iterable':
1472            d = weakref.WeakValueDictionary(**{kw: o})
1473            self.assertEqual(list(d.keys()), [kw])
1474            self.assertEqual(d[kw], o)
1475
1476    def make_weak_valued_dict(self):
1477        dict = weakref.WeakValueDictionary()
1478        objects = list(map(Object, range(self.COUNT)))
1479        for o in objects:
1480            dict[o.arg] = o
1481        return dict, objects
1482
1483    def check_popitem(self, klass, key1, value1, key2, value2):
1484        weakdict = klass()
1485        weakdict[key1] = value1
1486        weakdict[key2] = value2
1487        self.assertEqual(len(weakdict), 2)
1488        k, v = weakdict.popitem()
1489        self.assertEqual(len(weakdict), 1)
1490        if k is key1:
1491            self.assertIs(v, value1)
1492        else:
1493            self.assertIs(v, value2)
1494        k, v = weakdict.popitem()
1495        self.assertEqual(len(weakdict), 0)
1496        if k is key1:
1497            self.assertIs(v, value1)
1498        else:
1499            self.assertIs(v, value2)
1500
1501    def test_weak_valued_dict_popitem(self):
1502        self.check_popitem(weakref.WeakValueDictionary,
1503                           "key1", C(), "key2", C())
1504
1505    def test_weak_keyed_dict_popitem(self):
1506        self.check_popitem(weakref.WeakKeyDictionary,
1507                           C(), "value 1", C(), "value 2")
1508
1509    def check_setdefault(self, klass, key, value1, value2):
1510        self.assertIsNot(value1, value2,
1511                     "invalid test"
1512                     " -- value parameters must be distinct objects")
1513        weakdict = klass()
1514        o = weakdict.setdefault(key, value1)
1515        self.assertIs(o, value1)
1516        self.assertIn(key, weakdict)
1517        self.assertIs(weakdict.get(key), value1)
1518        self.assertIs(weakdict[key], value1)
1519
1520        o = weakdict.setdefault(key, value2)
1521        self.assertIs(o, value1)
1522        self.assertIn(key, weakdict)
1523        self.assertIs(weakdict.get(key), value1)
1524        self.assertIs(weakdict[key], value1)
1525
1526    def test_weak_valued_dict_setdefault(self):
1527        self.check_setdefault(weakref.WeakValueDictionary,
1528                              "key", C(), C())
1529
1530    def test_weak_keyed_dict_setdefault(self):
1531        self.check_setdefault(weakref.WeakKeyDictionary,
1532                              C(), "value 1", "value 2")
1533
1534    def check_update(self, klass, dict):
1535        #
1536        #  This exercises d.update(), len(d), d.keys(), k in d,
1537        #  d.get(), d[].
1538        #
1539        weakdict = klass()
1540        weakdict.update(dict)
1541        self.assertEqual(len(weakdict), len(dict))
1542        for k in weakdict.keys():
1543            self.assertIn(k, dict, "mysterious new key appeared in weak dict")
1544            v = dict.get(k)
1545            self.assertIs(v, weakdict[k])
1546            self.assertIs(v, weakdict.get(k))
1547        for k in dict.keys():
1548            self.assertIn(k, weakdict, "original key disappeared in weak dict")
1549            v = dict[k]
1550            self.assertIs(v, weakdict[k])
1551            self.assertIs(v, weakdict.get(k))
1552
1553    def test_weak_valued_dict_update(self):
1554        self.check_update(weakref.WeakValueDictionary,
1555                          {1: C(), 'a': C(), C(): C()})
1556        # errors
1557        self.assertRaises(TypeError, weakref.WeakValueDictionary.update)
1558        d = weakref.WeakValueDictionary()
1559        self.assertRaises(TypeError, d.update, {}, {})
1560        self.assertRaises(TypeError, d.update, (), ())
1561        self.assertEqual(list(d.keys()), [])
1562        # special keyword arguments
1563        o = Object(3)
1564        for kw in 'self', 'dict', 'other', 'iterable':
1565            d = weakref.WeakValueDictionary()
1566            d.update(**{kw: o})
1567            self.assertEqual(list(d.keys()), [kw])
1568            self.assertEqual(d[kw], o)
1569
1570    def test_weak_keyed_dict_update(self):
1571        self.check_update(weakref.WeakKeyDictionary,
1572                          {C(): 1, C(): 2, C(): 3})
1573
1574    def test_weak_keyed_delitem(self):
1575        d = weakref.WeakKeyDictionary()
1576        o1 = Object('1')
1577        o2 = Object('2')
1578        d[o1] = 'something'
1579        d[o2] = 'something'
1580        self.assertEqual(len(d), 2)
1581        del d[o1]
1582        self.assertEqual(len(d), 1)
1583        self.assertEqual(list(d.keys()), [o2])
1584
1585    def test_weak_valued_delitem(self):
1586        d = weakref.WeakValueDictionary()
1587        o1 = Object('1')
1588        o2 = Object('2')
1589        d['something'] = o1
1590        d['something else'] = o2
1591        self.assertEqual(len(d), 2)
1592        del d['something']
1593        self.assertEqual(len(d), 1)
1594        self.assertEqual(list(d.items()), [('something else', o2)])
1595
1596    def test_weak_keyed_bad_delitem(self):
1597        d = weakref.WeakKeyDictionary()
1598        o = Object('1')
1599        # An attempt to delete an object that isn't there should raise
1600        # KeyError.  It didn't before 2.3.
1601        self.assertRaises(KeyError, d.__delitem__, o)
1602        self.assertRaises(KeyError, d.__getitem__, o)
1603
1604        # If a key isn't of a weakly referencable type, __getitem__ and
1605        # __setitem__ raise TypeError.  __delitem__ should too.
1606        self.assertRaises(TypeError, d.__delitem__,  13)
1607        self.assertRaises(TypeError, d.__getitem__,  13)
1608        self.assertRaises(TypeError, d.__setitem__,  13, 13)
1609
1610    def test_weak_keyed_cascading_deletes(self):
1611        # SF bug 742860.  For some reason, before 2.3 __delitem__ iterated
1612        # over the keys via self.data.iterkeys().  If things vanished from
1613        # the dict during this (or got added), that caused a RuntimeError.
1614
1615        d = weakref.WeakKeyDictionary()
1616        mutate = False
1617
1618        class C(object):
1619            def __init__(self, i):
1620                self.value = i
1621            def __hash__(self):
1622                return hash(self.value)
1623            def __eq__(self, other):
1624                if mutate:
1625                    # Side effect that mutates the dict, by removing the
1626                    # last strong reference to a key.
1627                    del objs[-1]
1628                return self.value == other.value
1629
1630        objs = [C(i) for i in range(4)]
1631        for o in objs:
1632            d[o] = o.value
1633        del o   # now the only strong references to keys are in objs
1634        # Find the order in which iterkeys sees the keys.
1635        objs = list(d.keys())
1636        # Reverse it, so that the iteration implementation of __delitem__
1637        # has to keep looping to find the first object we delete.
1638        objs.reverse()
1639
1640        # Turn on mutation in C.__eq__.  The first time thru the loop,
1641        # under the iterkeys() business the first comparison will delete
1642        # the last item iterkeys() would see, and that causes a
1643        #     RuntimeError: dictionary changed size during iteration
1644        # when the iterkeys() loop goes around to try comparing the next
1645        # key.  After this was fixed, it just deletes the last object *our*
1646        # "for o in obj" loop would have gotten to.
1647        mutate = True
1648        count = 0
1649        for o in objs:
1650            count += 1
1651            del d[o]
1652        self.assertEqual(len(d), 0)
1653        self.assertEqual(count, 2)
1654
1655    def test_make_weak_valued_dict_repr(self):
1656        dict = weakref.WeakValueDictionary()
1657        self.assertRegex(repr(dict), '<WeakValueDictionary at 0x.*>')
1658
1659    def test_make_weak_keyed_dict_repr(self):
1660        dict = weakref.WeakKeyDictionary()
1661        self.assertRegex(repr(dict), '<WeakKeyDictionary at 0x.*>')
1662
1663    def test_threaded_weak_valued_setdefault(self):
1664        d = weakref.WeakValueDictionary()
1665        with collect_in_thread():
1666            for i in range(100000):
1667                x = d.setdefault(10, RefCycle())
1668                self.assertIsNot(x, None)  # we never put None in there!
1669                del x
1670
1671    def test_threaded_weak_valued_pop(self):
1672        d = weakref.WeakValueDictionary()
1673        with collect_in_thread():
1674            for i in range(100000):
1675                d[10] = RefCycle()
1676                x = d.pop(10, 10)
1677                self.assertIsNot(x, None)  # we never put None in there!
1678
1679    def test_threaded_weak_valued_consistency(self):
1680        # Issue #28427: old keys should not remove new values from
1681        # WeakValueDictionary when collecting from another thread.
1682        d = weakref.WeakValueDictionary()
1683        with collect_in_thread():
1684            for i in range(200000):
1685                o = RefCycle()
1686                d[10] = o
1687                # o is still alive, so the dict can't be empty
1688                self.assertEqual(len(d), 1)
1689                o = None  # lose ref
1690
1691
1692from test import mapping_tests
1693
1694class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
1695    """Check that WeakValueDictionary conforms to the mapping protocol"""
1696    __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)}
1697    type2test = weakref.WeakValueDictionary
1698    def _reference(self):
1699        return self.__ref.copy()
1700
1701class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
1702    """Check that WeakKeyDictionary conforms to the mapping protocol"""
1703    __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3}
1704    type2test = weakref.WeakKeyDictionary
1705    def _reference(self):
1706        return self.__ref.copy()
1707
1708
1709class FinalizeTestCase(unittest.TestCase):
1710
1711    class A:
1712        pass
1713
1714    def _collect_if_necessary(self):
1715        # we create no ref-cycles so in CPython no gc should be needed
1716        if sys.implementation.name != 'cpython':
1717            support.gc_collect()
1718
1719    def test_finalize(self):
1720        def add(x,y,z):
1721            res.append(x + y + z)
1722            return x + y + z
1723
1724        a = self.A()
1725
1726        res = []
1727        f = weakref.finalize(a, add, 67, 43, z=89)
1728        self.assertEqual(f.alive, True)
1729        self.assertEqual(f.peek(), (a, add, (67,43), {'z':89}))
1730        self.assertEqual(f(), 199)
1731        self.assertEqual(f(), None)
1732        self.assertEqual(f(), None)
1733        self.assertEqual(f.peek(), None)
1734        self.assertEqual(f.detach(), None)
1735        self.assertEqual(f.alive, False)
1736        self.assertEqual(res, [199])
1737
1738        res = []
1739        f = weakref.finalize(a, add, 67, 43, 89)
1740        self.assertEqual(f.peek(), (a, add, (67,43,89), {}))
1741        self.assertEqual(f.detach(), (a, add, (67,43,89), {}))
1742        self.assertEqual(f(), None)
1743        self.assertEqual(f(), None)
1744        self.assertEqual(f.peek(), None)
1745        self.assertEqual(f.detach(), None)
1746        self.assertEqual(f.alive, False)
1747        self.assertEqual(res, [])
1748
1749        res = []
1750        f = weakref.finalize(a, add, x=67, y=43, z=89)
1751        del a
1752        self._collect_if_necessary()
1753        self.assertEqual(f(), None)
1754        self.assertEqual(f(), None)
1755        self.assertEqual(f.peek(), None)
1756        self.assertEqual(f.detach(), None)
1757        self.assertEqual(f.alive, False)
1758        self.assertEqual(res, [199])
1759
1760    def test_order(self):
1761        a = self.A()
1762        res = []
1763
1764        f1 = weakref.finalize(a, res.append, 'f1')
1765        f2 = weakref.finalize(a, res.append, 'f2')
1766        f3 = weakref.finalize(a, res.append, 'f3')
1767        f4 = weakref.finalize(a, res.append, 'f4')
1768        f5 = weakref.finalize(a, res.append, 'f5')
1769
1770        # make sure finalizers can keep themselves alive
1771        del f1, f4
1772
1773        self.assertTrue(f2.alive)
1774        self.assertTrue(f3.alive)
1775        self.assertTrue(f5.alive)
1776
1777        self.assertTrue(f5.detach())
1778        self.assertFalse(f5.alive)
1779
1780        f5()                       # nothing because previously unregistered
1781        res.append('A')
1782        f3()                       # => res.append('f3')
1783        self.assertFalse(f3.alive)
1784        res.append('B')
1785        f3()                       # nothing because previously called
1786        res.append('C')
1787        del a
1788        self._collect_if_necessary()
1789                                   # => res.append('f4')
1790                                   # => res.append('f2')
1791                                   # => res.append('f1')
1792        self.assertFalse(f2.alive)
1793        res.append('D')
1794        f2()                       # nothing because previously called by gc
1795
1796        expected = ['A', 'f3', 'B', 'C', 'f4', 'f2', 'f1', 'D']
1797        self.assertEqual(res, expected)
1798
1799    def test_all_freed(self):
1800        # we want a weakrefable subclass of weakref.finalize
1801        class MyFinalizer(weakref.finalize):
1802            pass
1803
1804        a = self.A()
1805        res = []
1806        def callback():
1807            res.append(123)
1808        f = MyFinalizer(a, callback)
1809
1810        wr_callback = weakref.ref(callback)
1811        wr_f = weakref.ref(f)
1812        del callback, f
1813
1814        self.assertIsNotNone(wr_callback())
1815        self.assertIsNotNone(wr_f())
1816
1817        del a
1818        self._collect_if_necessary()
1819
1820        self.assertIsNone(wr_callback())
1821        self.assertIsNone(wr_f())
1822        self.assertEqual(res, [123])
1823
1824    @classmethod
1825    def run_in_child(cls):
1826        def error():
1827            # Create an atexit finalizer from inside a finalizer called
1828            # at exit.  This should be the next to be run.
1829            g1 = weakref.finalize(cls, print, 'g1')
1830            print('f3 error')
1831            1/0
1832
1833        # cls should stay alive till atexit callbacks run
1834        f1 = weakref.finalize(cls, print, 'f1', _global_var)
1835        f2 = weakref.finalize(cls, print, 'f2', _global_var)
1836        f3 = weakref.finalize(cls, error)
1837        f4 = weakref.finalize(cls, print, 'f4', _global_var)
1838
1839        assert f1.atexit == True
1840        f2.atexit = False
1841        assert f3.atexit == True
1842        assert f4.atexit == True
1843
1844    def test_atexit(self):
1845        prog = ('from test.test_weakref import FinalizeTestCase;'+
1846                'FinalizeTestCase.run_in_child()')
1847        rc, out, err = script_helper.assert_python_ok('-c', prog)
1848        out = out.decode('ascii').splitlines()
1849        self.assertEqual(out, ['f4 foobar', 'f3 error', 'g1', 'f1 foobar'])
1850        self.assertTrue(b'ZeroDivisionError' in err)
1851
1852
1853libreftest = """ Doctest for examples in the library reference: weakref.rst
1854
1855>>> import weakref
1856>>> class Dict(dict):
1857...     pass
1858...
1859>>> obj = Dict(red=1, green=2, blue=3)   # this object is weak referencable
1860>>> r = weakref.ref(obj)
1861>>> print(r() is obj)
1862True
1863
1864>>> import weakref
1865>>> class Object:
1866...     pass
1867...
1868>>> o = Object()
1869>>> r = weakref.ref(o)
1870>>> o2 = r()
1871>>> o is o2
1872True
1873>>> del o, o2
1874>>> print(r())
1875None
1876
1877>>> import weakref
1878>>> class ExtendedRef(weakref.ref):
1879...     def __init__(self, ob, callback=None, **annotations):
1880...         super().__init__(ob, callback)
1881...         self.__counter = 0
1882...         for k, v in annotations.items():
1883...             setattr(self, k, v)
1884...     def __call__(self):
1885...         '''Return a pair containing the referent and the number of
1886...         times the reference has been called.
1887...         '''
1888...         ob = super().__call__()
1889...         if ob is not None:
1890...             self.__counter += 1
1891...             ob = (ob, self.__counter)
1892...         return ob
1893...
1894>>> class A:   # not in docs from here, just testing the ExtendedRef
1895...     pass
1896...
1897>>> a = A()
1898>>> r = ExtendedRef(a, foo=1, bar="baz")
1899>>> r.foo
19001
1901>>> r.bar
1902'baz'
1903>>> r()[1]
19041
1905>>> r()[1]
19062
1907>>> r()[0] is a
1908True
1909
1910
1911>>> import weakref
1912>>> _id2obj_dict = weakref.WeakValueDictionary()
1913>>> def remember(obj):
1914...     oid = id(obj)
1915...     _id2obj_dict[oid] = obj
1916...     return oid
1917...
1918>>> def id2obj(oid):
1919...     return _id2obj_dict[oid]
1920...
1921>>> a = A()             # from here, just testing
1922>>> a_id = remember(a)
1923>>> id2obj(a_id) is a
1924True
1925>>> del a
1926>>> try:
1927...     id2obj(a_id)
1928... except KeyError:
1929...     print('OK')
1930... else:
1931...     print('WeakValueDictionary error')
1932OK
1933
1934"""
1935
1936__test__ = {'libreftest' : libreftest}
1937
1938def test_main():
1939    support.run_unittest(
1940        ReferencesTestCase,
1941        WeakMethodTestCase,
1942        MappingTestCase,
1943        WeakValueDictionaryTestCase,
1944        WeakKeyDictionaryTestCase,
1945        SubclassableWeakrefTestCase,
1946        FinalizeTestCase,
1947        )
1948    support.run_doctest(sys.modules[__name__])
1949
1950
1951if __name__ == "__main__":
1952    test_main()
1953