1import gc
2import sys
3import unittest
4import UserList
5import weakref
6import operator
7
8from test import test_support
9
10# Used in ReferencesTestCase.test_ref_created_during_del() .
11ref_from_del = None
12
13class C:
14    def method(self):
15        pass
16
17
18class Callable:
19    bar = None
20
21    def __call__(self, x):
22        self.bar = x
23
24
25def create_function():
26    def f(): pass
27    return f
28
29def create_bound_method():
30    return C().method
31
32def create_unbound_method():
33    return C.method
34
35
36class Object:
37    def __init__(self, arg):
38        self.arg = arg
39    def __repr__(self):
40        return "<Object %r>" % self.arg
41    def __eq__(self, other):
42        if isinstance(other, Object):
43            return self.arg == other.arg
44        return NotImplemented
45    def __ne__(self, other):
46        if isinstance(other, Object):
47            return self.arg != other.arg
48        return NotImplemented
49    def __hash__(self):
50        return hash(self.arg)
51
52class RefCycle:
53    def __init__(self):
54        self.cycle = self
55
56
57class TestBase(unittest.TestCase):
58
59    def setUp(self):
60        self.cbcalled = 0
61
62    def callback(self, ref):
63        self.cbcalled += 1
64
65
66class ReferencesTestCase(TestBase):
67
68    def test_basic_ref(self):
69        self.check_basic_ref(C)
70        self.check_basic_ref(create_function)
71        self.check_basic_ref(create_bound_method)
72        self.check_basic_ref(create_unbound_method)
73
74        # Just make sure the tp_repr handler doesn't raise an exception.
75        # Live reference:
76        o = C()
77        wr = weakref.ref(o)
78        repr(wr)
79        # Dead reference:
80        del o
81        repr(wr)
82
83    def test_basic_callback(self):
84        self.check_basic_callback(C)
85        self.check_basic_callback(create_function)
86        self.check_basic_callback(create_bound_method)
87        self.check_basic_callback(create_unbound_method)
88
89    def test_multiple_callbacks(self):
90        o = C()
91        ref1 = weakref.ref(o, self.callback)
92        ref2 = weakref.ref(o, self.callback)
93        del o
94        self.assertTrue(ref1() is None,
95                     "expected reference to be invalidated")
96        self.assertTrue(ref2() is None,
97                     "expected reference to be invalidated")
98        self.assertTrue(self.cbcalled == 2,
99                     "callback not called the right number of times")
100
101    def test_multiple_selfref_callbacks(self):
102        # Make sure all references are invalidated before callbacks are called
103        #
104        # What's important here is that we're using the first
105        # reference in the callback invoked on the second reference
106        # (the most recently created ref is cleaned up first).  This
107        # tests that all references to the object are invalidated
108        # before any of the callbacks are invoked, so that we only
109        # have one invocation of _weakref.c:cleanup_helper() active
110        # for a particular object at a time.
111        #
112        def callback(object, self=self):
113            self.ref()
114        c = C()
115        self.ref = weakref.ref(c, callback)
116        ref1 = weakref.ref(c, callback)
117        del c
118
119    def test_proxy_ref(self):
120        o = C()
121        o.bar = 1
122        ref1 = weakref.proxy(o, self.callback)
123        ref2 = weakref.proxy(o, self.callback)
124        del o
125
126        def check(proxy):
127            proxy.bar
128
129        self.assertRaises(weakref.ReferenceError, check, ref1)
130        self.assertRaises(weakref.ReferenceError, check, ref2)
131        self.assertRaises(weakref.ReferenceError, bool, weakref.proxy(C()))
132        self.assertTrue(self.cbcalled == 2)
133
134    def check_basic_ref(self, factory):
135        o = factory()
136        ref = weakref.ref(o)
137        self.assertTrue(ref() is not None,
138                     "weak reference to live object should be live")
139        o2 = ref()
140        self.assertTrue(o is o2,
141                     "<ref>() should return original object if live")
142
143    def check_basic_callback(self, factory):
144        self.cbcalled = 0
145        o = factory()
146        ref = weakref.ref(o, self.callback)
147        del o
148        self.assertTrue(self.cbcalled == 1,
149                     "callback did not properly set 'cbcalled'")
150        self.assertTrue(ref() is None,
151                     "ref2 should be dead after deleting object reference")
152
153    def test_ref_reuse(self):
154        o = C()
155        ref1 = weakref.ref(o)
156        # create a proxy to make sure that there's an intervening creation
157        # between these two; it should make no difference
158        proxy = weakref.proxy(o)
159        ref2 = weakref.ref(o)
160        self.assertTrue(ref1 is ref2,
161                     "reference object w/out callback should be re-used")
162
163        o = C()
164        proxy = weakref.proxy(o)
165        ref1 = weakref.ref(o)
166        ref2 = weakref.ref(o)
167        self.assertTrue(ref1 is ref2,
168                     "reference object w/out callback should be re-used")
169        self.assertTrue(weakref.getweakrefcount(o) == 2,
170                     "wrong weak ref count for object")
171        del proxy
172        self.assertTrue(weakref.getweakrefcount(o) == 1,
173                     "wrong weak ref count for object after deleting proxy")
174
175    def test_proxy_reuse(self):
176        o = C()
177        proxy1 = weakref.proxy(o)
178        ref = weakref.ref(o)
179        proxy2 = weakref.proxy(o)
180        self.assertTrue(proxy1 is proxy2,
181                     "proxy object w/out callback should have been re-used")
182
183    def test_basic_proxy(self):
184        o = C()
185        self.check_proxy(o, weakref.proxy(o))
186
187        L = UserList.UserList()
188        p = weakref.proxy(L)
189        self.assertFalse(p, "proxy for empty UserList should be false")
190        p.append(12)
191        self.assertEqual(len(L), 1)
192        self.assertTrue(p, "proxy for non-empty UserList should be true")
193        with test_support.check_py3k_warnings():
194            p[:] = [2, 3]
195        self.assertEqual(len(L), 2)
196        self.assertEqual(len(p), 2)
197        self.assertIn(3, p, "proxy didn't support __contains__() properly")
198        p[1] = 5
199        self.assertEqual(L[1], 5)
200        self.assertEqual(p[1], 5)
201        L2 = UserList.UserList(L)
202        p2 = weakref.proxy(L2)
203        self.assertEqual(p, p2)
204        ## self.assertEqual(repr(L2), repr(p2))
205        L3 = UserList.UserList(range(10))
206        p3 = weakref.proxy(L3)
207        with test_support.check_py3k_warnings():
208            self.assertEqual(L3[:], p3[:])
209            self.assertEqual(L3[5:], p3[5:])
210            self.assertEqual(L3[:5], p3[:5])
211            self.assertEqual(L3[2:5], p3[2:5])
212
213    def test_proxy_unicode(self):
214        # See bug 5037
215        class C(object):
216            def __str__(self):
217                return "string"
218            def __unicode__(self):
219                return u"unicode"
220        instance = C()
221        self.assertIn("__unicode__", dir(weakref.proxy(instance)))
222        self.assertEqual(unicode(weakref.proxy(instance)), u"unicode")
223
224    def test_proxy_index(self):
225        class C:
226            def __index__(self):
227                return 10
228        o = C()
229        p = weakref.proxy(o)
230        self.assertEqual(operator.index(p), 10)
231
232    def test_proxy_div(self):
233        class C:
234            def __floordiv__(self, other):
235                return 42
236            def __ifloordiv__(self, other):
237                return 21
238        o = C()
239        p = weakref.proxy(o)
240        self.assertEqual(p // 5, 42)
241        p //= 5
242        self.assertEqual(p, 21)
243
244    # The PyWeakref_* C API is documented as allowing either NULL or
245    # None as the value for the callback, where either means "no
246    # callback".  The "no callback" ref and proxy objects are supposed
247    # to be shared so long as they exist by all callers so long as
248    # they are active.  In Python 2.3.3 and earlier, this guarantee
249    # was not honored, and was broken in different ways for
250    # PyWeakref_NewRef() and PyWeakref_NewProxy().  (Two tests.)
251
252    def test_shared_ref_without_callback(self):
253        self.check_shared_without_callback(weakref.ref)
254
255    def test_shared_proxy_without_callback(self):
256        self.check_shared_without_callback(weakref.proxy)
257
258    def check_shared_without_callback(self, makeref):
259        o = Object(1)
260        p1 = makeref(o, None)
261        p2 = makeref(o, None)
262        self.assertTrue(p1 is p2, "both callbacks were None in the C API")
263        del p1, p2
264        p1 = makeref(o)
265        p2 = makeref(o, None)
266        self.assertTrue(p1 is p2, "callbacks were NULL, None in the C API")
267        del p1, p2
268        p1 = makeref(o)
269        p2 = makeref(o)
270        self.assertTrue(p1 is p2, "both callbacks were NULL in the C API")
271        del p1, p2
272        p1 = makeref(o, None)
273        p2 = makeref(o)
274        self.assertTrue(p1 is p2, "callbacks were None, NULL in the C API")
275
276    def test_callable_proxy(self):
277        o = Callable()
278        ref1 = weakref.proxy(o)
279
280        self.check_proxy(o, ref1)
281
282        self.assertTrue(type(ref1) is weakref.CallableProxyType,
283                     "proxy is not of callable type")
284        ref1('twinkies!')
285        self.assertTrue(o.bar == 'twinkies!',
286                     "call through proxy not passed through to original")
287        ref1(x='Splat.')
288        self.assertTrue(o.bar == 'Splat.',
289                     "call through proxy not passed through to original")
290
291        # expect due to too few args
292        self.assertRaises(TypeError, ref1)
293
294        # expect due to too many args
295        self.assertRaises(TypeError, ref1, 1, 2, 3)
296
297    def check_proxy(self, o, proxy):
298        o.foo = 1
299        self.assertTrue(proxy.foo == 1,
300                     "proxy does not reflect attribute addition")
301        o.foo = 2
302        self.assertTrue(proxy.foo == 2,
303                     "proxy does not reflect attribute modification")
304        del o.foo
305        self.assertTrue(not hasattr(proxy, 'foo'),
306                     "proxy does not reflect attribute removal")
307
308        proxy.foo = 1
309        self.assertTrue(o.foo == 1,
310                     "object does not reflect attribute addition via proxy")
311        proxy.foo = 2
312        self.assertTrue(
313            o.foo == 2,
314            "object does not reflect attribute modification via proxy")
315        del proxy.foo
316        self.assertTrue(not hasattr(o, 'foo'),
317                     "object does not reflect attribute removal via proxy")
318
319    def test_proxy_deletion(self):
320        # Test clearing of SF bug #762891
321        class Foo:
322            result = None
323            def __delitem__(self, accessor):
324                self.result = accessor
325        g = Foo()
326        f = weakref.proxy(g)
327        del f[0]
328        self.assertEqual(f.result, 0)
329
330    def test_proxy_bool(self):
331        # Test clearing of SF bug #1170766
332        class List(list): pass
333        lyst = List()
334        self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst))
335
336    def test_getweakrefcount(self):
337        o = C()
338        ref1 = weakref.ref(o)
339        ref2 = weakref.ref(o, self.callback)
340        self.assertTrue(weakref.getweakrefcount(o) == 2,
341                     "got wrong number of weak reference objects")
342
343        proxy1 = weakref.proxy(o)
344        proxy2 = weakref.proxy(o, self.callback)
345        self.assertTrue(weakref.getweakrefcount(o) == 4,
346                     "got wrong number of weak reference objects")
347
348        del ref1, ref2, proxy1, proxy2
349        self.assertTrue(weakref.getweakrefcount(o) == 0,
350                     "weak reference objects not unlinked from"
351                     " referent when discarded.")
352
353        # assumes ints do not support weakrefs
354        self.assertTrue(weakref.getweakrefcount(1) == 0,
355                     "got wrong number of weak reference objects for int")
356
357    def test_getweakrefs(self):
358        o = C()
359        ref1 = weakref.ref(o, self.callback)
360        ref2 = weakref.ref(o, self.callback)
361        del ref1
362        self.assertTrue(weakref.getweakrefs(o) == [ref2],
363                     "list of refs does not match")
364
365        o = C()
366        ref1 = weakref.ref(o, self.callback)
367        ref2 = weakref.ref(o, self.callback)
368        del ref2
369        self.assertTrue(weakref.getweakrefs(o) == [ref1],
370                     "list of refs does not match")
371
372        del ref1
373        self.assertTrue(weakref.getweakrefs(o) == [],
374                     "list of refs not cleared")
375
376        # assumes ints do not support weakrefs
377        self.assertTrue(weakref.getweakrefs(1) == [],
378                     "list of refs does not match for int")
379
380    def test_newstyle_number_ops(self):
381        class F(float):
382            pass
383        f = F(2.0)
384        p = weakref.proxy(f)
385        self.assertTrue(p + 1.0 == 3.0)
386        self.assertTrue(1.0 + p == 3.0)  # this used to SEGV
387
388    def test_callbacks_protected(self):
389        # Callbacks protected from already-set exceptions?
390        # Regression test for SF bug #478534.
391        class BogusError(Exception):
392            pass
393        data = {}
394        def remove(k):
395            del data[k]
396        def encapsulate():
397            f = lambda : ()
398            data[weakref.ref(f, remove)] = None
399            raise BogusError
400        try:
401            encapsulate()
402        except BogusError:
403            pass
404        else:
405            self.fail("exception not properly restored")
406        try:
407            encapsulate()
408        except BogusError:
409            pass
410        else:
411            self.fail("exception not properly restored")
412
413    def test_sf_bug_840829(self):
414        # "weakref callbacks and gc corrupt memory"
415        # subtype_dealloc erroneously exposed a new-style instance
416        # already in the process of getting deallocated to gc,
417        # causing double-deallocation if the instance had a weakref
418        # callback that triggered gc.
419        # If the bug exists, there probably won't be an obvious symptom
420        # in a release build.  In a debug build, a segfault will occur
421        # when the second attempt to remove the instance from the "list
422        # of all objects" occurs.
423
424        import gc
425
426        class C(object):
427            pass
428
429        c = C()
430        wr = weakref.ref(c, lambda ignore: gc.collect())
431        del c
432
433        # There endeth the first part.  It gets worse.
434        del wr
435
436        c1 = C()
437        c1.i = C()
438        wr = weakref.ref(c1.i, lambda ignore: gc.collect())
439
440        c2 = C()
441        c2.c1 = c1
442        del c1  # still alive because c2 points to it
443
444        # Now when subtype_dealloc gets called on c2, it's not enough just
445        # that c2 is immune from gc while the weakref callbacks associated
446        # with c2 execute (there are none in this 2nd half of the test, btw).
447        # subtype_dealloc goes on to call the base classes' deallocs too,
448        # so any gc triggered by weakref callbacks associated with anything
449        # torn down by a base class dealloc can also trigger double
450        # deallocation of c2.
451        del c2
452
453    def test_callback_in_cycle_1(self):
454        import gc
455
456        class J(object):
457            pass
458
459        class II(object):
460            def acallback(self, ignore):
461                self.J
462
463        I = II()
464        I.J = J
465        I.wr = weakref.ref(J, I.acallback)
466
467        # Now J and II are each in a self-cycle (as all new-style class
468        # objects are, since their __mro__ points back to them).  I holds
469        # both a weak reference (I.wr) and a strong reference (I.J) to class
470        # J.  I is also in a cycle (I.wr points to a weakref that references
471        # I.acallback).  When we del these three, they all become trash, but
472        # the cycles prevent any of them from getting cleaned up immediately.
473        # Instead they have to wait for cyclic gc to deduce that they're
474        # trash.
475        #
476        # gc used to call tp_clear on all of them, and the order in which
477        # it does that is pretty accidental.  The exact order in which we
478        # built up these things manages to provoke gc into running tp_clear
479        # in just the right order (I last).  Calling tp_clear on II leaves
480        # behind an insane class object (its __mro__ becomes NULL).  Calling
481        # tp_clear on J breaks its self-cycle, but J doesn't get deleted
482        # just then because of the strong reference from I.J.  Calling
483        # tp_clear on I starts to clear I's __dict__, and just happens to
484        # clear I.J first -- I.wr is still intact.  That removes the last
485        # reference to J, which triggers the weakref callback.  The callback
486        # tries to do "self.J", and instances of new-style classes look up
487        # attributes ("J") in the class dict first.  The class (II) wants to
488        # search II.__mro__, but that's NULL.   The result was a segfault in
489        # a release build, and an assert failure in a debug build.
490        del I, J, II
491        gc.collect()
492
493    def test_callback_in_cycle_2(self):
494        import gc
495
496        # This is just like test_callback_in_cycle_1, except that II is an
497        # old-style class.  The symptom is different then:  an instance of an
498        # old-style class looks in its own __dict__ first.  'J' happens to
499        # get cleared from I.__dict__ before 'wr', and 'J' was never in II's
500        # __dict__, so the attribute isn't found.  The difference is that
501        # the old-style II doesn't have a NULL __mro__ (it doesn't have any
502        # __mro__), so no segfault occurs.  Instead it got:
503        #    test_callback_in_cycle_2 (__main__.ReferencesTestCase) ...
504        #    Exception exceptions.AttributeError:
505        #   "II instance has no attribute 'J'" in <bound method II.acallback
506        #       of <?.II instance at 0x00B9B4B8>> ignored
507
508        class J(object):
509            pass
510
511        class II:
512            def acallback(self, ignore):
513                self.J
514
515        I = II()
516        I.J = J
517        I.wr = weakref.ref(J, I.acallback)
518
519        del I, J, II
520        gc.collect()
521
522    def test_callback_in_cycle_3(self):
523        import gc
524
525        # This one broke the first patch that fixed the last two.  In this
526        # case, the objects reachable from the callback aren't also reachable
527        # from the object (c1) *triggering* the callback:  you can get to
528        # c1 from c2, but not vice-versa.  The result was that c2's __dict__
529        # got tp_clear'ed by the time the c2.cb callback got invoked.
530
531        class C:
532            def cb(self, ignore):
533                self.me
534                self.c1
535                self.wr
536
537        c1, c2 = C(), C()
538
539        c2.me = c2
540        c2.c1 = c1
541        c2.wr = weakref.ref(c1, c2.cb)
542
543        del c1, c2
544        gc.collect()
545
546    def test_callback_in_cycle_4(self):
547        import gc
548
549        # Like test_callback_in_cycle_3, except c2 and c1 have different
550        # classes.  c2's class (C) isn't reachable from c1 then, so protecting
551        # objects reachable from the dying object (c1) isn't enough to stop
552        # c2's class (C) from getting tp_clear'ed before c2.cb is invoked.
553        # The result was a segfault (C.__mro__ was NULL when the callback
554        # tried to look up self.me).
555
556        class C(object):
557            def cb(self, ignore):
558                self.me
559                self.c1
560                self.wr
561
562        class D:
563            pass
564
565        c1, c2 = D(), C()
566
567        c2.me = c2
568        c2.c1 = c1
569        c2.wr = weakref.ref(c1, c2.cb)
570
571        del c1, c2, C, D
572        gc.collect()
573
574    def test_callback_in_cycle_resurrection(self):
575        import gc
576
577        # Do something nasty in a weakref callback:  resurrect objects
578        # from dead cycles.  For this to be attempted, the weakref and
579        # its callback must also be part of the cyclic trash (else the
580        # objects reachable via the callback couldn't be in cyclic trash
581        # to begin with -- the callback would act like an external root).
582        # But gc clears trash weakrefs with callbacks early now, which
583        # disables the callbacks, so the callbacks shouldn't get called
584        # at all (and so nothing actually gets resurrected).
585
586        alist = []
587        class C(object):
588            def __init__(self, value):
589                self.attribute = value
590
591            def acallback(self, ignore):
592                alist.append(self.c)
593
594        c1, c2 = C(1), C(2)
595        c1.c = c2
596        c2.c = c1
597        c1.wr = weakref.ref(c2, c1.acallback)
598        c2.wr = weakref.ref(c1, c2.acallback)
599
600        def C_went_away(ignore):
601            alist.append("C went away")
602        wr = weakref.ref(C, C_went_away)
603
604        del c1, c2, C   # make them all trash
605        self.assertEqual(alist, [])  # del isn't enough to reclaim anything
606
607        gc.collect()
608        # c1.wr and c2.wr were part of the cyclic trash, so should have
609        # been cleared without their callbacks executing.  OTOH, the weakref
610        # to C is bound to a function local (wr), and wasn't trash, so that
611        # callback should have been invoked when C went away.
612        self.assertEqual(alist, ["C went away"])
613        # The remaining weakref should be dead now (its callback ran).
614        self.assertEqual(wr(), None)
615
616        del alist[:]
617        gc.collect()
618        self.assertEqual(alist, [])
619
620    def test_callbacks_on_callback(self):
621        import gc
622
623        # Set up weakref callbacks *on* weakref callbacks.
624        alist = []
625        def safe_callback(ignore):
626            alist.append("safe_callback called")
627
628        class C(object):
629            def cb(self, ignore):
630                alist.append("cb called")
631
632        c, d = C(), C()
633        c.other = d
634        d.other = c
635        callback = c.cb
636        c.wr = weakref.ref(d, callback)     # this won't trigger
637        d.wr = weakref.ref(callback, d.cb)  # ditto
638        external_wr = weakref.ref(callback, safe_callback)  # but this will
639        self.assertTrue(external_wr() is callback)
640
641        # The weakrefs attached to c and d should get cleared, so that
642        # C.cb is never called.  But external_wr isn't part of the cyclic
643        # trash, and no cyclic trash is reachable from it, so safe_callback
644        # should get invoked when the bound method object callback (c.cb)
645        # -- which is itself a callback, and also part of the cyclic trash --
646        # gets reclaimed at the end of gc.
647
648        del callback, c, d, C
649        self.assertEqual(alist, [])  # del isn't enough to clean up cycles
650        gc.collect()
651        self.assertEqual(alist, ["safe_callback called"])
652        self.assertEqual(external_wr(), None)
653
654        del alist[:]
655        gc.collect()
656        self.assertEqual(alist, [])
657
658    def test_gc_during_ref_creation(self):
659        self.check_gc_during_creation(weakref.ref)
660
661    def test_gc_during_proxy_creation(self):
662        self.check_gc_during_creation(weakref.proxy)
663
664    def check_gc_during_creation(self, makeref):
665        thresholds = gc.get_threshold()
666        gc.set_threshold(1, 1, 1)
667        gc.collect()
668        class A:
669            pass
670
671        def callback(*args):
672            pass
673
674        referenced = A()
675
676        a = A()
677        a.a = a
678        a.wr = makeref(referenced)
679
680        try:
681            # now make sure the object and the ref get labeled as
682            # cyclic trash:
683            a = A()
684            weakref.ref(referenced, callback)
685
686        finally:
687            gc.set_threshold(*thresholds)
688
689    def test_ref_created_during_del(self):
690        # Bug #1377858
691        # A weakref created in an object's __del__() would crash the
692        # interpreter when the weakref was cleaned up since it would refer to
693        # non-existent memory.  This test should not segfault the interpreter.
694        class Target(object):
695            def __del__(self):
696                global ref_from_del
697                ref_from_del = weakref.ref(self)
698
699        w = Target()
700
701    def test_init(self):
702        # Issue 3634
703        # <weakref to class>.__init__() doesn't check errors correctly
704        r = weakref.ref(Exception)
705        self.assertRaises(TypeError, r.__init__, 0, 0, 0, 0, 0)
706        # No exception should be raised here
707        gc.collect()
708
709    def test_classes(self):
710        # Check that both old-style classes and new-style classes
711        # are weakrefable.
712        class A(object):
713            pass
714        class B:
715            pass
716        l = []
717        weakref.ref(int)
718        a = weakref.ref(A, l.append)
719        A = None
720        gc.collect()
721        self.assertEqual(a(), None)
722        self.assertEqual(l, [a])
723        b = weakref.ref(B, l.append)
724        B = None
725        gc.collect()
726        self.assertEqual(b(), None)
727        self.assertEqual(l, [a, b])
728
729    def test_equality(self):
730        # Alive weakrefs defer equality testing to their underlying object.
731        x = Object(1)
732        y = Object(1)
733        z = Object(2)
734        a = weakref.ref(x)
735        b = weakref.ref(y)
736        c = weakref.ref(z)
737        d = weakref.ref(x)
738        # Note how we directly test the operators here, to stress both
739        # __eq__ and __ne__.
740        self.assertTrue(a == b)
741        self.assertFalse(a != b)
742        self.assertFalse(a == c)
743        self.assertTrue(a != c)
744        self.assertTrue(a == d)
745        self.assertFalse(a != d)
746        del x, y, z
747        gc.collect()
748        for r in a, b, c:
749            # Sanity check
750            self.assertIs(r(), None)
751        # Dead weakrefs compare by identity: whether `a` and `d` are the
752        # same weakref object is an implementation detail, since they pointed
753        # to the same original object and didn't have a callback.
754        # (see issue #16453).
755        self.assertFalse(a == b)
756        self.assertTrue(a != b)
757        self.assertFalse(a == c)
758        self.assertTrue(a != c)
759        self.assertEqual(a == d, a is d)
760        self.assertEqual(a != d, a is not d)
761
762    def test_hashing(self):
763        # Alive weakrefs hash the same as the underlying object
764        x = Object(42)
765        y = Object(42)
766        a = weakref.ref(x)
767        b = weakref.ref(y)
768        self.assertEqual(hash(a), hash(42))
769        del x, y
770        gc.collect()
771        # Dead weakrefs:
772        # - retain their hash is they were hashed when alive;
773        # - otherwise, cannot be hashed.
774        self.assertEqual(hash(a), hash(42))
775        self.assertRaises(TypeError, hash, b)
776
777    def test_trashcan_16602(self):
778        # Issue #16602: when a weakref's target was part of a long
779        # deallocation chain, the trashcan mechanism could delay clearing
780        # of the weakref and make the target object visible from outside
781        # code even though its refcount had dropped to 0.  A crash ensued.
782        class C(object):
783            def __init__(self, parent):
784                if not parent:
785                    return
786                wself = weakref.ref(self)
787                def cb(wparent):
788                    o = wself()
789                self.wparent = weakref.ref(parent, cb)
790
791        d = weakref.WeakKeyDictionary()
792        root = c = C(None)
793        for n in range(100):
794            d[c] = c = C(c)
795        del root
796        gc.collect()
797
798
799class SubclassableWeakrefTestCase(TestBase):
800
801    def test_subclass_refs(self):
802        class MyRef(weakref.ref):
803            def __init__(self, ob, callback=None, value=42):
804                self.value = value
805                super(MyRef, self).__init__(ob, callback)
806            def __call__(self):
807                self.called = True
808                return super(MyRef, self).__call__()
809        o = Object("foo")
810        mr = MyRef(o, value=24)
811        self.assertTrue(mr() is o)
812        self.assertTrue(mr.called)
813        self.assertEqual(mr.value, 24)
814        del o
815        self.assertTrue(mr() is None)
816        self.assertTrue(mr.called)
817
818    def test_subclass_refs_dont_replace_standard_refs(self):
819        class MyRef(weakref.ref):
820            pass
821        o = Object(42)
822        r1 = MyRef(o)
823        r2 = weakref.ref(o)
824        self.assertTrue(r1 is not r2)
825        self.assertEqual(weakref.getweakrefs(o), [r2, r1])
826        self.assertEqual(weakref.getweakrefcount(o), 2)
827        r3 = MyRef(o)
828        self.assertEqual(weakref.getweakrefcount(o), 3)
829        refs = weakref.getweakrefs(o)
830        self.assertEqual(len(refs), 3)
831        self.assertTrue(r2 is refs[0])
832        self.assertIn(r1, refs[1:])
833        self.assertIn(r3, refs[1:])
834
835    def test_subclass_refs_dont_conflate_callbacks(self):
836        class MyRef(weakref.ref):
837            pass
838        o = Object(42)
839        r1 = MyRef(o, id)
840        r2 = MyRef(o, str)
841        self.assertTrue(r1 is not r2)
842        refs = weakref.getweakrefs(o)
843        self.assertIn(r1, refs)
844        self.assertIn(r2, refs)
845
846    def test_subclass_refs_with_slots(self):
847        class MyRef(weakref.ref):
848            __slots__ = "slot1", "slot2"
849            def __new__(type, ob, callback, slot1, slot2):
850                return weakref.ref.__new__(type, ob, callback)
851            def __init__(self, ob, callback, slot1, slot2):
852                self.slot1 = slot1
853                self.slot2 = slot2
854            def meth(self):
855                return self.slot1 + self.slot2
856        o = Object(42)
857        r = MyRef(o, None, "abc", "def")
858        self.assertEqual(r.slot1, "abc")
859        self.assertEqual(r.slot2, "def")
860        self.assertEqual(r.meth(), "abcdef")
861        self.assertFalse(hasattr(r, "__dict__"))
862
863    def test_subclass_refs_with_cycle(self):
864        # Bug #3110
865        # An instance of a weakref subclass can have attributes.
866        # If such a weakref holds the only strong reference to the object,
867        # deleting the weakref will delete the object. In this case,
868        # the callback must not be called, because the ref object is
869        # being deleted.
870        class MyRef(weakref.ref):
871            pass
872
873        # Use a local callback, for "regrtest -R::"
874        # to detect refcounting problems
875        def callback(w):
876            self.cbcalled += 1
877
878        o = C()
879        r1 = MyRef(o, callback)
880        r1.o = o
881        del o
882
883        del r1 # Used to crash here
884
885        self.assertEqual(self.cbcalled, 0)
886
887        # Same test, with two weakrefs to the same object
888        # (since code paths are different)
889        o = C()
890        r1 = MyRef(o, callback)
891        r2 = MyRef(o, callback)
892        r1.r = r2
893        r2.o = o
894        del o
895        del r2
896
897        del r1 # Used to crash here
898
899        self.assertEqual(self.cbcalled, 0)
900
901
902class MappingTestCase(TestBase):
903
904    COUNT = 10
905
906    def check_len_cycles(self, dict_type, cons):
907        N = 20
908        items = [RefCycle() for i in range(N)]
909        dct = dict_type(cons(o) for o in items)
910        # Keep an iterator alive
911        it = dct.iteritems()
912        try:
913            next(it)
914        except StopIteration:
915            pass
916        del items
917        gc.collect()
918        n1 = len(dct)
919        del it
920        gc.collect()
921        n2 = len(dct)
922        # one item may be kept alive inside the iterator
923        self.assertIn(n1, (0, 1))
924        self.assertEqual(n2, 0)
925
926    def test_weak_keyed_len_cycles(self):
927        self.check_len_cycles(weakref.WeakKeyDictionary, lambda k: (k, 1))
928
929    def test_weak_valued_len_cycles(self):
930        self.check_len_cycles(weakref.WeakValueDictionary, lambda k: (1, k))
931
932    def check_len_race(self, dict_type, cons):
933        # Extended sanity checks for len() in the face of cyclic collection
934        self.addCleanup(gc.set_threshold, *gc.get_threshold())
935        for th in range(1, 100):
936            N = 20
937            gc.collect(0)
938            gc.set_threshold(th, th, th)
939            items = [RefCycle() for i in range(N)]
940            dct = dict_type(cons(o) for o in items)
941            del items
942            # All items will be collected at next garbage collection pass
943            it = dct.iteritems()
944            try:
945                next(it)
946            except StopIteration:
947                pass
948            n1 = len(dct)
949            del it
950            n2 = len(dct)
951            self.assertGreaterEqual(n1, 0)
952            self.assertLessEqual(n1, N)
953            self.assertGreaterEqual(n2, 0)
954            self.assertLessEqual(n2, n1)
955
956    def test_weak_keyed_len_race(self):
957        self.check_len_race(weakref.WeakKeyDictionary, lambda k: (k, 1))
958
959    def test_weak_valued_len_race(self):
960        self.check_len_race(weakref.WeakValueDictionary, lambda k: (1, k))
961
962    def test_weak_values(self):
963        #
964        #  This exercises d.copy(), d.items(), d[], del d[], len(d).
965        #
966        dict, objects = self.make_weak_valued_dict()
967        for o in objects:
968            self.assertTrue(weakref.getweakrefcount(o) == 1,
969                         "wrong number of weak references to %r!" % o)
970            self.assertTrue(o is dict[o.arg],
971                         "wrong object returned by weak dict!")
972        items1 = dict.items()
973        items2 = dict.copy().items()
974        items1.sort()
975        items2.sort()
976        self.assertTrue(items1 == items2,
977                     "cloning of weak-valued dictionary did not work!")
978        del items1, items2
979        self.assertTrue(len(dict) == self.COUNT)
980        del objects[0]
981        self.assertTrue(len(dict) == (self.COUNT - 1),
982                     "deleting object did not cause dictionary update")
983        del objects, o
984        self.assertTrue(len(dict) == 0,
985                     "deleting the values did not clear the dictionary")
986        # regression on SF bug #447152:
987        dict = weakref.WeakValueDictionary()
988        self.assertRaises(KeyError, dict.__getitem__, 1)
989        dict[2] = C()
990        self.assertRaises(KeyError, dict.__getitem__, 2)
991
992    def test_weak_keys(self):
993        #
994        #  This exercises d.copy(), d.items(), d[] = v, d[], del d[],
995        #  len(d), in d.
996        #
997        dict, objects = self.make_weak_keyed_dict()
998        for o in objects:
999            self.assertTrue(weakref.getweakrefcount(o) == 1,
1000                         "wrong number of weak references to %r!" % o)
1001            self.assertTrue(o.arg is dict[o],
1002                         "wrong object returned by weak dict!")
1003        items1 = dict.items()
1004        items2 = dict.copy().items()
1005        self.assertTrue(set(items1) == set(items2),
1006                     "cloning of weak-keyed dictionary did not work!")
1007        del items1, items2
1008        self.assertTrue(len(dict) == self.COUNT)
1009        del objects[0]
1010        self.assertTrue(len(dict) == (self.COUNT - 1),
1011                     "deleting object did not cause dictionary update")
1012        del objects, o
1013        self.assertTrue(len(dict) == 0,
1014                     "deleting the keys did not clear the dictionary")
1015        o = Object(42)
1016        dict[o] = "What is the meaning of the universe?"
1017        self.assertIn(o, dict)
1018        self.assertNotIn(34, dict)
1019
1020    def test_weak_keyed_iters(self):
1021        dict, objects = self.make_weak_keyed_dict()
1022        self.check_iters(dict)
1023
1024        # Test keyrefs()
1025        refs = dict.keyrefs()
1026        self.assertEqual(len(refs), len(objects))
1027        objects2 = list(objects)
1028        for wr in refs:
1029            ob = wr()
1030            self.assertIn(ob, dict)
1031            self.assertEqual(ob.arg, dict[ob])
1032            objects2.remove(ob)
1033        self.assertEqual(len(objects2), 0)
1034
1035        # Test iterkeyrefs()
1036        objects2 = list(objects)
1037        self.assertEqual(len(list(dict.iterkeyrefs())), len(objects))
1038        for wr in dict.iterkeyrefs():
1039            ob = wr()
1040            self.assertIn(ob, dict)
1041            self.assertEqual(ob.arg, dict[ob])
1042            objects2.remove(ob)
1043        self.assertEqual(len(objects2), 0)
1044
1045    def test_weak_valued_iters(self):
1046        dict, objects = self.make_weak_valued_dict()
1047        self.check_iters(dict)
1048
1049        # Test valuerefs()
1050        refs = dict.valuerefs()
1051        self.assertEqual(len(refs), len(objects))
1052        objects2 = list(objects)
1053        for wr in refs:
1054            ob = wr()
1055            self.assertEqual(ob, dict[ob.arg])
1056            self.assertEqual(ob.arg, dict[ob.arg].arg)
1057            objects2.remove(ob)
1058        self.assertEqual(len(objects2), 0)
1059
1060        # Test itervaluerefs()
1061        objects2 = list(objects)
1062        self.assertEqual(len(list(dict.itervaluerefs())), len(objects))
1063        for wr in dict.itervaluerefs():
1064            ob = wr()
1065            self.assertEqual(ob, dict[ob.arg])
1066            self.assertEqual(ob.arg, dict[ob.arg].arg)
1067            objects2.remove(ob)
1068        self.assertEqual(len(objects2), 0)
1069
1070    def check_iters(self, dict):
1071        # item iterator:
1072        items = dict.items()
1073        for item in dict.iteritems():
1074            items.remove(item)
1075        self.assertTrue(len(items) == 0, "iteritems() did not touch all items")
1076
1077        # key iterator, via __iter__():
1078        keys = dict.keys()
1079        for k in dict:
1080            keys.remove(k)
1081        self.assertTrue(len(keys) == 0, "__iter__() did not touch all keys")
1082
1083        # key iterator, via iterkeys():
1084        keys = dict.keys()
1085        for k in dict.iterkeys():
1086            keys.remove(k)
1087        self.assertTrue(len(keys) == 0, "iterkeys() did not touch all keys")
1088
1089        # value iterator:
1090        values = dict.values()
1091        for v in dict.itervalues():
1092            values.remove(v)
1093        self.assertTrue(len(values) == 0,
1094                     "itervalues() did not touch all values")
1095
1096    def test_make_weak_keyed_dict_from_dict(self):
1097        o = Object(3)
1098        dict = weakref.WeakKeyDictionary({o:364})
1099        self.assertTrue(dict[o] == 364)
1100
1101    def test_make_weak_keyed_dict_from_weak_keyed_dict(self):
1102        o = Object(3)
1103        dict = weakref.WeakKeyDictionary({o:364})
1104        dict2 = weakref.WeakKeyDictionary(dict)
1105        self.assertTrue(dict[o] == 364)
1106
1107    def make_weak_keyed_dict(self):
1108        dict = weakref.WeakKeyDictionary()
1109        objects = map(Object, range(self.COUNT))
1110        for o in objects:
1111            dict[o] = o.arg
1112        return dict, objects
1113
1114    def make_weak_valued_dict(self):
1115        dict = weakref.WeakValueDictionary()
1116        objects = map(Object, range(self.COUNT))
1117        for o in objects:
1118            dict[o.arg] = o
1119        return dict, objects
1120
1121    def check_popitem(self, klass, key1, value1, key2, value2):
1122        weakdict = klass()
1123        weakdict[key1] = value1
1124        weakdict[key2] = value2
1125        self.assertTrue(len(weakdict) == 2)
1126        k, v = weakdict.popitem()
1127        self.assertTrue(len(weakdict) == 1)
1128        if k is key1:
1129            self.assertTrue(v is value1)
1130        else:
1131            self.assertTrue(v is value2)
1132        k, v = weakdict.popitem()
1133        self.assertTrue(len(weakdict) == 0)
1134        if k is key1:
1135            self.assertTrue(v is value1)
1136        else:
1137            self.assertTrue(v is value2)
1138
1139    def test_weak_valued_dict_popitem(self):
1140        self.check_popitem(weakref.WeakValueDictionary,
1141                           "key1", C(), "key2", C())
1142
1143    def test_weak_keyed_dict_popitem(self):
1144        self.check_popitem(weakref.WeakKeyDictionary,
1145                           C(), "value 1", C(), "value 2")
1146
1147    def check_setdefault(self, klass, key, value1, value2):
1148        self.assertTrue(value1 is not value2,
1149                     "invalid test"
1150                     " -- value parameters must be distinct objects")
1151        weakdict = klass()
1152        o = weakdict.setdefault(key, value1)
1153        self.assertIs(o, value1)
1154        self.assertIn(key, weakdict)
1155        self.assertIs(weakdict.get(key), value1)
1156        self.assertIs(weakdict[key], value1)
1157
1158        o = weakdict.setdefault(key, value2)
1159        self.assertIs(o, value1)
1160        self.assertIn(key, weakdict)
1161        self.assertIs(weakdict.get(key), value1)
1162        self.assertIs(weakdict[key], value1)
1163
1164    def test_weak_valued_dict_setdefault(self):
1165        self.check_setdefault(weakref.WeakValueDictionary,
1166                              "key", C(), C())
1167
1168    def test_weak_keyed_dict_setdefault(self):
1169        self.check_setdefault(weakref.WeakKeyDictionary,
1170                              C(), "value 1", "value 2")
1171
1172    def check_update(self, klass, dict):
1173        #
1174        #  This exercises d.update(), len(d), d.keys(), in d,
1175        #  d.get(), d[].
1176        #
1177        weakdict = klass()
1178        weakdict.update(dict)
1179        self.assertEqual(len(weakdict), len(dict))
1180        for k in weakdict.keys():
1181            self.assertIn(k, dict,
1182                         "mysterious new key appeared in weak dict")
1183            v = dict.get(k)
1184            self.assertIs(v, weakdict[k])
1185            self.assertIs(v, weakdict.get(k))
1186        for k in dict.keys():
1187            self.assertIn(k, weakdict,
1188                         "original key disappeared in weak dict")
1189            v = dict[k]
1190            self.assertIs(v, weakdict[k])
1191            self.assertIs(v, weakdict.get(k))
1192
1193    def test_weak_valued_dict_update(self):
1194        self.check_update(weakref.WeakValueDictionary,
1195                          {1: C(), 'a': C(), C(): C()})
1196
1197    def test_weak_keyed_dict_update(self):
1198        self.check_update(weakref.WeakKeyDictionary,
1199                          {C(): 1, C(): 2, C(): 3})
1200
1201    def test_weak_keyed_delitem(self):
1202        d = weakref.WeakKeyDictionary()
1203        o1 = Object('1')
1204        o2 = Object('2')
1205        d[o1] = 'something'
1206        d[o2] = 'something'
1207        self.assertTrue(len(d) == 2)
1208        del d[o1]
1209        self.assertTrue(len(d) == 1)
1210        self.assertTrue(d.keys() == [o2])
1211
1212    def test_weak_valued_delitem(self):
1213        d = weakref.WeakValueDictionary()
1214        o1 = Object('1')
1215        o2 = Object('2')
1216        d['something'] = o1
1217        d['something else'] = o2
1218        self.assertTrue(len(d) == 2)
1219        del d['something']
1220        self.assertTrue(len(d) == 1)
1221        self.assertTrue(d.items() == [('something else', o2)])
1222
1223    def test_weak_keyed_bad_delitem(self):
1224        d = weakref.WeakKeyDictionary()
1225        o = Object('1')
1226        # An attempt to delete an object that isn't there should raise
1227        # KeyError.  It didn't before 2.3.
1228        self.assertRaises(KeyError, d.__delitem__, o)
1229        self.assertRaises(KeyError, d.__getitem__, o)
1230
1231        # If a key isn't of a weakly referencable type, __getitem__ and
1232        # __setitem__ raise TypeError.  __delitem__ should too.
1233        self.assertRaises(TypeError, d.__delitem__,  13)
1234        self.assertRaises(TypeError, d.__getitem__,  13)
1235        self.assertRaises(TypeError, d.__setitem__,  13, 13)
1236
1237    def test_weak_keyed_cascading_deletes(self):
1238        # SF bug 742860.  For some reason, before 2.3 __delitem__ iterated
1239        # over the keys via self.data.iterkeys().  If things vanished from
1240        # the dict during this (or got added), that caused a RuntimeError.
1241
1242        d = weakref.WeakKeyDictionary()
1243        mutate = False
1244
1245        class C(object):
1246            def __init__(self, i):
1247                self.value = i
1248            def __hash__(self):
1249                return hash(self.value)
1250            def __eq__(self, other):
1251                if mutate:
1252                    # Side effect that mutates the dict, by removing the
1253                    # last strong reference to a key.
1254                    del objs[-1]
1255                return self.value == other.value
1256
1257        objs = [C(i) for i in range(4)]
1258        for o in objs:
1259            d[o] = o.value
1260        del o   # now the only strong references to keys are in objs
1261        # Find the order in which iterkeys sees the keys.
1262        objs = d.keys()
1263        # Reverse it, so that the iteration implementation of __delitem__
1264        # has to keep looping to find the first object we delete.
1265        objs.reverse()
1266
1267        # Turn on mutation in C.__eq__.  The first time thru the loop,
1268        # under the iterkeys() business the first comparison will delete
1269        # the last item iterkeys() would see, and that causes a
1270        #     RuntimeError: dictionary changed size during iteration
1271        # when the iterkeys() loop goes around to try comparing the next
1272        # key.  After this was fixed, it just deletes the last object *our*
1273        # "for o in obj" loop would have gotten to.
1274        mutate = True
1275        count = 0
1276        for o in objs:
1277            count += 1
1278            del d[o]
1279        self.assertEqual(len(d), 0)
1280        self.assertEqual(count, 2)
1281
1282from test import mapping_tests
1283
1284class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
1285    """Check that WeakValueDictionary conforms to the mapping protocol"""
1286    __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)}
1287    type2test = weakref.WeakValueDictionary
1288    def _reference(self):
1289        return self.__ref.copy()
1290
1291class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
1292    """Check that WeakKeyDictionary conforms to the mapping protocol"""
1293    __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3}
1294    type2test = weakref.WeakKeyDictionary
1295    def _reference(self):
1296        return self.__ref.copy()
1297
1298libreftest = """ Doctest for examples in the library reference: weakref.rst
1299
1300>>> import weakref
1301>>> class Dict(dict):
1302...     pass
1303...
1304>>> obj = Dict(red=1, green=2, blue=3)   # this object is weak referencable
1305>>> r = weakref.ref(obj)
1306>>> print r() is obj
1307True
1308
1309>>> import weakref
1310>>> class Object:
1311...     pass
1312...
1313>>> o = Object()
1314>>> r = weakref.ref(o)
1315>>> o2 = r()
1316>>> o is o2
1317True
1318>>> del o, o2
1319>>> print r()
1320None
1321
1322>>> import weakref
1323>>> class ExtendedRef(weakref.ref):
1324...     def __init__(self, ob, callback=None, **annotations):
1325...         super(ExtendedRef, self).__init__(ob, callback)
1326...         self.__counter = 0
1327...         for k, v in annotations.iteritems():
1328...             setattr(self, k, v)
1329...     def __call__(self):
1330...         '''Return a pair containing the referent and the number of
1331...         times the reference has been called.
1332...         '''
1333...         ob = super(ExtendedRef, self).__call__()
1334...         if ob is not None:
1335...             self.__counter += 1
1336...             ob = (ob, self.__counter)
1337...         return ob
1338...
1339>>> class A:   # not in docs from here, just testing the ExtendedRef
1340...     pass
1341...
1342>>> a = A()
1343>>> r = ExtendedRef(a, foo=1, bar="baz")
1344>>> r.foo
13451
1346>>> r.bar
1347'baz'
1348>>> r()[1]
13491
1350>>> r()[1]
13512
1352>>> r()[0] is a
1353True
1354
1355
1356>>> import weakref
1357>>> _id2obj_dict = weakref.WeakValueDictionary()
1358>>> def remember(obj):
1359...     oid = id(obj)
1360...     _id2obj_dict[oid] = obj
1361...     return oid
1362...
1363>>> def id2obj(oid):
1364...     return _id2obj_dict[oid]
1365...
1366>>> a = A()             # from here, just testing
1367>>> a_id = remember(a)
1368>>> id2obj(a_id) is a
1369True
1370>>> del a
1371>>> try:
1372...     id2obj(a_id)
1373... except KeyError:
1374...     print 'OK'
1375... else:
1376...     print 'WeakValueDictionary error'
1377OK
1378
1379"""
1380
1381__test__ = {'libreftest' : libreftest}
1382
1383def test_main():
1384    test_support.run_unittest(
1385        ReferencesTestCase,
1386        MappingTestCase,
1387        WeakValueDictionaryTestCase,
1388        WeakKeyDictionaryTestCase,
1389        SubclassableWeakrefTestCase,
1390        )
1391    test_support.run_doctest(sys.modules[__name__])
1392
1393
1394if __name__ == "__main__":
1395    test_main()
1396