1"""Unit tests for contextlib.py, and other context managers."""
2
3import io
4import sys
5import tempfile
6import unittest
7from contextlib import *  # Tests __all__
8from test import support
9try:
10    import threading
11except ImportError:
12    threading = None
13
14
15class TestAbstractContextManager(unittest.TestCase):
16
17    def test_enter(self):
18        class DefaultEnter(AbstractContextManager):
19            def __exit__(self, *args):
20                super().__exit__(*args)
21
22        manager = DefaultEnter()
23        self.assertIs(manager.__enter__(), manager)
24
25    def test_exit_is_abstract(self):
26        class MissingExit(AbstractContextManager):
27            pass
28
29        with self.assertRaises(TypeError):
30            MissingExit()
31
32    def test_structural_subclassing(self):
33        class ManagerFromScratch:
34            def __enter__(self):
35                return self
36            def __exit__(self, exc_type, exc_value, traceback):
37                return None
38
39        self.assertTrue(issubclass(ManagerFromScratch, AbstractContextManager))
40
41        class DefaultEnter(AbstractContextManager):
42            def __exit__(self, *args):
43                super().__exit__(*args)
44
45        self.assertTrue(issubclass(DefaultEnter, AbstractContextManager))
46
47
48class ContextManagerTestCase(unittest.TestCase):
49
50    def test_contextmanager_plain(self):
51        state = []
52        @contextmanager
53        def woohoo():
54            state.append(1)
55            yield 42
56            state.append(999)
57        with woohoo() as x:
58            self.assertEqual(state, [1])
59            self.assertEqual(x, 42)
60            state.append(x)
61        self.assertEqual(state, [1, 42, 999])
62
63    def test_contextmanager_finally(self):
64        state = []
65        @contextmanager
66        def woohoo():
67            state.append(1)
68            try:
69                yield 42
70            finally:
71                state.append(999)
72        with self.assertRaises(ZeroDivisionError):
73            with woohoo() as x:
74                self.assertEqual(state, [1])
75                self.assertEqual(x, 42)
76                state.append(x)
77                raise ZeroDivisionError()
78        self.assertEqual(state, [1, 42, 999])
79
80    def test_contextmanager_no_reraise(self):
81        @contextmanager
82        def whee():
83            yield
84        ctx = whee()
85        ctx.__enter__()
86        # Calling __exit__ should not result in an exception
87        self.assertFalse(ctx.__exit__(TypeError, TypeError("foo"), None))
88
89    def test_contextmanager_trap_yield_after_throw(self):
90        @contextmanager
91        def whoo():
92            try:
93                yield
94            except:
95                yield
96        ctx = whoo()
97        ctx.__enter__()
98        self.assertRaises(
99            RuntimeError, ctx.__exit__, TypeError, TypeError("foo"), None
100        )
101
102    def test_contextmanager_except(self):
103        state = []
104        @contextmanager
105        def woohoo():
106            state.append(1)
107            try:
108                yield 42
109            except ZeroDivisionError as e:
110                state.append(e.args[0])
111                self.assertEqual(state, [1, 42, 999])
112        with woohoo() as x:
113            self.assertEqual(state, [1])
114            self.assertEqual(x, 42)
115            state.append(x)
116            raise ZeroDivisionError(999)
117        self.assertEqual(state, [1, 42, 999])
118
119    def test_contextmanager_except_stopiter(self):
120        stop_exc = StopIteration('spam')
121        @contextmanager
122        def woohoo():
123            yield
124        try:
125            with self.assertWarnsRegex(DeprecationWarning,
126                                       "StopIteration"):
127                with woohoo():
128                    raise stop_exc
129        except Exception as ex:
130            self.assertIs(ex, stop_exc)
131        else:
132            self.fail('StopIteration was suppressed')
133
134    def test_contextmanager_except_pep479(self):
135        code = """\
136from __future__ import generator_stop
137from contextlib import contextmanager
138@contextmanager
139def woohoo():
140    yield
141"""
142        locals = {}
143        exec(code, locals, locals)
144        woohoo = locals['woohoo']
145
146        stop_exc = StopIteration('spam')
147        try:
148            with woohoo():
149                raise stop_exc
150        except Exception as ex:
151            self.assertIs(ex, stop_exc)
152        else:
153            self.fail('StopIteration was suppressed')
154
155    def _create_contextmanager_attribs(self):
156        def attribs(**kw):
157            def decorate(func):
158                for k,v in kw.items():
159                    setattr(func,k,v)
160                return func
161            return decorate
162        @contextmanager
163        @attribs(foo='bar')
164        def baz(spam):
165            """Whee!"""
166        return baz
167
168    def test_contextmanager_attribs(self):
169        baz = self._create_contextmanager_attribs()
170        self.assertEqual(baz.__name__,'baz')
171        self.assertEqual(baz.foo, 'bar')
172
173    @support.requires_docstrings
174    def test_contextmanager_doc_attrib(self):
175        baz = self._create_contextmanager_attribs()
176        self.assertEqual(baz.__doc__, "Whee!")
177
178    @support.requires_docstrings
179    def test_instance_docstring_given_cm_docstring(self):
180        baz = self._create_contextmanager_attribs()(None)
181        self.assertEqual(baz.__doc__, "Whee!")
182
183    def test_keywords(self):
184        # Ensure no keyword arguments are inhibited
185        @contextmanager
186        def woohoo(self, func, args, kwds):
187            yield (self, func, args, kwds)
188        with woohoo(self=11, func=22, args=33, kwds=44) as target:
189            self.assertEqual(target, (11, 22, 33, 44))
190
191
192class ClosingTestCase(unittest.TestCase):
193
194    @support.requires_docstrings
195    def test_instance_docs(self):
196        # Issue 19330: ensure context manager instances have good docstrings
197        cm_docstring = closing.__doc__
198        obj = closing(None)
199        self.assertEqual(obj.__doc__, cm_docstring)
200
201    def test_closing(self):
202        state = []
203        class C:
204            def close(self):
205                state.append(1)
206        x = C()
207        self.assertEqual(state, [])
208        with closing(x) as y:
209            self.assertEqual(x, y)
210        self.assertEqual(state, [1])
211
212    def test_closing_error(self):
213        state = []
214        class C:
215            def close(self):
216                state.append(1)
217        x = C()
218        self.assertEqual(state, [])
219        with self.assertRaises(ZeroDivisionError):
220            with closing(x) as y:
221                self.assertEqual(x, y)
222                1 / 0
223        self.assertEqual(state, [1])
224
225class FileContextTestCase(unittest.TestCase):
226
227    def testWithOpen(self):
228        tfn = tempfile.mktemp()
229        try:
230            f = None
231            with open(tfn, "w") as f:
232                self.assertFalse(f.closed)
233                f.write("Booh\n")
234            self.assertTrue(f.closed)
235            f = None
236            with self.assertRaises(ZeroDivisionError):
237                with open(tfn, "r") as f:
238                    self.assertFalse(f.closed)
239                    self.assertEqual(f.read(), "Booh\n")
240                    1 / 0
241            self.assertTrue(f.closed)
242        finally:
243            support.unlink(tfn)
244
245@unittest.skipUnless(threading, 'Threading required for this test.')
246class LockContextTestCase(unittest.TestCase):
247
248    def boilerPlate(self, lock, locked):
249        self.assertFalse(locked())
250        with lock:
251            self.assertTrue(locked())
252        self.assertFalse(locked())
253        with self.assertRaises(ZeroDivisionError):
254            with lock:
255                self.assertTrue(locked())
256                1 / 0
257        self.assertFalse(locked())
258
259    def testWithLock(self):
260        lock = threading.Lock()
261        self.boilerPlate(lock, lock.locked)
262
263    def testWithRLock(self):
264        lock = threading.RLock()
265        self.boilerPlate(lock, lock._is_owned)
266
267    def testWithCondition(self):
268        lock = threading.Condition()
269        def locked():
270            return lock._is_owned()
271        self.boilerPlate(lock, locked)
272
273    def testWithSemaphore(self):
274        lock = threading.Semaphore()
275        def locked():
276            if lock.acquire(False):
277                lock.release()
278                return False
279            else:
280                return True
281        self.boilerPlate(lock, locked)
282
283    def testWithBoundedSemaphore(self):
284        lock = threading.BoundedSemaphore()
285        def locked():
286            if lock.acquire(False):
287                lock.release()
288                return False
289            else:
290                return True
291        self.boilerPlate(lock, locked)
292
293
294class mycontext(ContextDecorator):
295    """Example decoration-compatible context manager for testing"""
296    started = False
297    exc = None
298    catch = False
299
300    def __enter__(self):
301        self.started = True
302        return self
303
304    def __exit__(self, *exc):
305        self.exc = exc
306        return self.catch
307
308
309class TestContextDecorator(unittest.TestCase):
310
311    @support.requires_docstrings
312    def test_instance_docs(self):
313        # Issue 19330: ensure context manager instances have good docstrings
314        cm_docstring = mycontext.__doc__
315        obj = mycontext()
316        self.assertEqual(obj.__doc__, cm_docstring)
317
318    def test_contextdecorator(self):
319        context = mycontext()
320        with context as result:
321            self.assertIs(result, context)
322            self.assertTrue(context.started)
323
324        self.assertEqual(context.exc, (None, None, None))
325
326
327    def test_contextdecorator_with_exception(self):
328        context = mycontext()
329
330        with self.assertRaisesRegex(NameError, 'foo'):
331            with context:
332                raise NameError('foo')
333        self.assertIsNotNone(context.exc)
334        self.assertIs(context.exc[0], NameError)
335
336        context = mycontext()
337        context.catch = True
338        with context:
339            raise NameError('foo')
340        self.assertIsNotNone(context.exc)
341        self.assertIs(context.exc[0], NameError)
342
343
344    def test_decorator(self):
345        context = mycontext()
346
347        @context
348        def test():
349            self.assertIsNone(context.exc)
350            self.assertTrue(context.started)
351        test()
352        self.assertEqual(context.exc, (None, None, None))
353
354
355    def test_decorator_with_exception(self):
356        context = mycontext()
357
358        @context
359        def test():
360            self.assertIsNone(context.exc)
361            self.assertTrue(context.started)
362            raise NameError('foo')
363
364        with self.assertRaisesRegex(NameError, 'foo'):
365            test()
366        self.assertIsNotNone(context.exc)
367        self.assertIs(context.exc[0], NameError)
368
369
370    def test_decorating_method(self):
371        context = mycontext()
372
373        class Test(object):
374
375            @context
376            def method(self, a, b, c=None):
377                self.a = a
378                self.b = b
379                self.c = c
380
381        # these tests are for argument passing when used as a decorator
382        test = Test()
383        test.method(1, 2)
384        self.assertEqual(test.a, 1)
385        self.assertEqual(test.b, 2)
386        self.assertEqual(test.c, None)
387
388        test = Test()
389        test.method('a', 'b', 'c')
390        self.assertEqual(test.a, 'a')
391        self.assertEqual(test.b, 'b')
392        self.assertEqual(test.c, 'c')
393
394        test = Test()
395        test.method(a=1, b=2)
396        self.assertEqual(test.a, 1)
397        self.assertEqual(test.b, 2)
398
399
400    def test_typo_enter(self):
401        class mycontext(ContextDecorator):
402            def __unter__(self):
403                pass
404            def __exit__(self, *exc):
405                pass
406
407        with self.assertRaises(AttributeError):
408            with mycontext():
409                pass
410
411
412    def test_typo_exit(self):
413        class mycontext(ContextDecorator):
414            def __enter__(self):
415                pass
416            def __uxit__(self, *exc):
417                pass
418
419        with self.assertRaises(AttributeError):
420            with mycontext():
421                pass
422
423
424    def test_contextdecorator_as_mixin(self):
425        class somecontext(object):
426            started = False
427            exc = None
428
429            def __enter__(self):
430                self.started = True
431                return self
432
433            def __exit__(self, *exc):
434                self.exc = exc
435
436        class mycontext(somecontext, ContextDecorator):
437            pass
438
439        context = mycontext()
440        @context
441        def test():
442            self.assertIsNone(context.exc)
443            self.assertTrue(context.started)
444        test()
445        self.assertEqual(context.exc, (None, None, None))
446
447
448    def test_contextmanager_as_decorator(self):
449        @contextmanager
450        def woohoo(y):
451            state.append(y)
452            yield
453            state.append(999)
454
455        state = []
456        @woohoo(1)
457        def test(x):
458            self.assertEqual(state, [1])
459            state.append(x)
460        test('something')
461        self.assertEqual(state, [1, 'something', 999])
462
463        # Issue #11647: Ensure the decorated function is 'reusable'
464        state = []
465        test('something else')
466        self.assertEqual(state, [1, 'something else', 999])
467
468
469class TestExitStack(unittest.TestCase):
470
471    @support.requires_docstrings
472    def test_instance_docs(self):
473        # Issue 19330: ensure context manager instances have good docstrings
474        cm_docstring = ExitStack.__doc__
475        obj = ExitStack()
476        self.assertEqual(obj.__doc__, cm_docstring)
477
478    def test_no_resources(self):
479        with ExitStack():
480            pass
481
482    def test_callback(self):
483        expected = [
484            ((), {}),
485            ((1,), {}),
486            ((1,2), {}),
487            ((), dict(example=1)),
488            ((1,), dict(example=1)),
489            ((1,2), dict(example=1)),
490        ]
491        result = []
492        def _exit(*args, **kwds):
493            """Test metadata propagation"""
494            result.append((args, kwds))
495        with ExitStack() as stack:
496            for args, kwds in reversed(expected):
497                if args and kwds:
498                    f = stack.callback(_exit, *args, **kwds)
499                elif args:
500                    f = stack.callback(_exit, *args)
501                elif kwds:
502                    f = stack.callback(_exit, **kwds)
503                else:
504                    f = stack.callback(_exit)
505                self.assertIs(f, _exit)
506            for wrapper in stack._exit_callbacks:
507                self.assertIs(wrapper.__wrapped__, _exit)
508                self.assertNotEqual(wrapper.__name__, _exit.__name__)
509                self.assertIsNone(wrapper.__doc__, _exit.__doc__)
510        self.assertEqual(result, expected)
511
512    def test_push(self):
513        exc_raised = ZeroDivisionError
514        def _expect_exc(exc_type, exc, exc_tb):
515            self.assertIs(exc_type, exc_raised)
516        def _suppress_exc(*exc_details):
517            return True
518        def _expect_ok(exc_type, exc, exc_tb):
519            self.assertIsNone(exc_type)
520            self.assertIsNone(exc)
521            self.assertIsNone(exc_tb)
522        class ExitCM(object):
523            def __init__(self, check_exc):
524                self.check_exc = check_exc
525            def __enter__(self):
526                self.fail("Should not be called!")
527            def __exit__(self, *exc_details):
528                self.check_exc(*exc_details)
529        with ExitStack() as stack:
530            stack.push(_expect_ok)
531            self.assertIs(stack._exit_callbacks[-1], _expect_ok)
532            cm = ExitCM(_expect_ok)
533            stack.push(cm)
534            self.assertIs(stack._exit_callbacks[-1].__self__, cm)
535            stack.push(_suppress_exc)
536            self.assertIs(stack._exit_callbacks[-1], _suppress_exc)
537            cm = ExitCM(_expect_exc)
538            stack.push(cm)
539            self.assertIs(stack._exit_callbacks[-1].__self__, cm)
540            stack.push(_expect_exc)
541            self.assertIs(stack._exit_callbacks[-1], _expect_exc)
542            stack.push(_expect_exc)
543            self.assertIs(stack._exit_callbacks[-1], _expect_exc)
544            1/0
545
546    def test_enter_context(self):
547        class TestCM(object):
548            def __enter__(self):
549                result.append(1)
550            def __exit__(self, *exc_details):
551                result.append(3)
552
553        result = []
554        cm = TestCM()
555        with ExitStack() as stack:
556            @stack.callback  # Registered first => cleaned up last
557            def _exit():
558                result.append(4)
559            self.assertIsNotNone(_exit)
560            stack.enter_context(cm)
561            self.assertIs(stack._exit_callbacks[-1].__self__, cm)
562            result.append(2)
563        self.assertEqual(result, [1, 2, 3, 4])
564
565    def test_close(self):
566        result = []
567        with ExitStack() as stack:
568            @stack.callback
569            def _exit():
570                result.append(1)
571            self.assertIsNotNone(_exit)
572            stack.close()
573            result.append(2)
574        self.assertEqual(result, [1, 2])
575
576    def test_pop_all(self):
577        result = []
578        with ExitStack() as stack:
579            @stack.callback
580            def _exit():
581                result.append(3)
582            self.assertIsNotNone(_exit)
583            new_stack = stack.pop_all()
584            result.append(1)
585        result.append(2)
586        new_stack.close()
587        self.assertEqual(result, [1, 2, 3])
588
589    def test_exit_raise(self):
590        with self.assertRaises(ZeroDivisionError):
591            with ExitStack() as stack:
592                stack.push(lambda *exc: False)
593                1/0
594
595    def test_exit_suppress(self):
596        with ExitStack() as stack:
597            stack.push(lambda *exc: True)
598            1/0
599
600    def test_exit_exception_chaining_reference(self):
601        # Sanity check to make sure that ExitStack chaining matches
602        # actual nested with statements
603        class RaiseExc:
604            def __init__(self, exc):
605                self.exc = exc
606            def __enter__(self):
607                return self
608            def __exit__(self, *exc_details):
609                raise self.exc
610
611        class RaiseExcWithContext:
612            def __init__(self, outer, inner):
613                self.outer = outer
614                self.inner = inner
615            def __enter__(self):
616                return self
617            def __exit__(self, *exc_details):
618                try:
619                    raise self.inner
620                except:
621                    raise self.outer
622
623        class SuppressExc:
624            def __enter__(self):
625                return self
626            def __exit__(self, *exc_details):
627                type(self).saved_details = exc_details
628                return True
629
630        try:
631            with RaiseExc(IndexError):
632                with RaiseExcWithContext(KeyError, AttributeError):
633                    with SuppressExc():
634                        with RaiseExc(ValueError):
635                            1 / 0
636        except IndexError as exc:
637            self.assertIsInstance(exc.__context__, KeyError)
638            self.assertIsInstance(exc.__context__.__context__, AttributeError)
639            # Inner exceptions were suppressed
640            self.assertIsNone(exc.__context__.__context__.__context__)
641        else:
642            self.fail("Expected IndexError, but no exception was raised")
643        # Check the inner exceptions
644        inner_exc = SuppressExc.saved_details[1]
645        self.assertIsInstance(inner_exc, ValueError)
646        self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
647
648    def test_exit_exception_chaining(self):
649        # Ensure exception chaining matches the reference behaviour
650        def raise_exc(exc):
651            raise exc
652
653        saved_details = None
654        def suppress_exc(*exc_details):
655            nonlocal saved_details
656            saved_details = exc_details
657            return True
658
659        try:
660            with ExitStack() as stack:
661                stack.callback(raise_exc, IndexError)
662                stack.callback(raise_exc, KeyError)
663                stack.callback(raise_exc, AttributeError)
664                stack.push(suppress_exc)
665                stack.callback(raise_exc, ValueError)
666                1 / 0
667        except IndexError as exc:
668            self.assertIsInstance(exc.__context__, KeyError)
669            self.assertIsInstance(exc.__context__.__context__, AttributeError)
670            # Inner exceptions were suppressed
671            self.assertIsNone(exc.__context__.__context__.__context__)
672        else:
673            self.fail("Expected IndexError, but no exception was raised")
674        # Check the inner exceptions
675        inner_exc = saved_details[1]
676        self.assertIsInstance(inner_exc, ValueError)
677        self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
678
679    def test_exit_exception_non_suppressing(self):
680        # http://bugs.python.org/issue19092
681        def raise_exc(exc):
682            raise exc
683
684        def suppress_exc(*exc_details):
685            return True
686
687        try:
688            with ExitStack() as stack:
689                stack.callback(lambda: None)
690                stack.callback(raise_exc, IndexError)
691        except Exception as exc:
692            self.assertIsInstance(exc, IndexError)
693        else:
694            self.fail("Expected IndexError, but no exception was raised")
695
696        try:
697            with ExitStack() as stack:
698                stack.callback(raise_exc, KeyError)
699                stack.push(suppress_exc)
700                stack.callback(raise_exc, IndexError)
701        except Exception as exc:
702            self.assertIsInstance(exc, KeyError)
703        else:
704            self.fail("Expected KeyError, but no exception was raised")
705
706    def test_exit_exception_with_correct_context(self):
707        # http://bugs.python.org/issue20317
708        @contextmanager
709        def gets_the_context_right(exc):
710            try:
711                yield
712            finally:
713                raise exc
714
715        exc1 = Exception(1)
716        exc2 = Exception(2)
717        exc3 = Exception(3)
718        exc4 = Exception(4)
719
720        # The contextmanager already fixes the context, so prior to the
721        # fix, ExitStack would try to fix it *again* and get into an
722        # infinite self-referential loop
723        try:
724            with ExitStack() as stack:
725                stack.enter_context(gets_the_context_right(exc4))
726                stack.enter_context(gets_the_context_right(exc3))
727                stack.enter_context(gets_the_context_right(exc2))
728                raise exc1
729        except Exception as exc:
730            self.assertIs(exc, exc4)
731            self.assertIs(exc.__context__, exc3)
732            self.assertIs(exc.__context__.__context__, exc2)
733            self.assertIs(exc.__context__.__context__.__context__, exc1)
734            self.assertIsNone(
735                       exc.__context__.__context__.__context__.__context__)
736
737    def test_exit_exception_with_existing_context(self):
738        # Addresses a lack of test coverage discovered after checking in a
739        # fix for issue 20317 that still contained debugging code.
740        def raise_nested(inner_exc, outer_exc):
741            try:
742                raise inner_exc
743            finally:
744                raise outer_exc
745        exc1 = Exception(1)
746        exc2 = Exception(2)
747        exc3 = Exception(3)
748        exc4 = Exception(4)
749        exc5 = Exception(5)
750        try:
751            with ExitStack() as stack:
752                stack.callback(raise_nested, exc4, exc5)
753                stack.callback(raise_nested, exc2, exc3)
754                raise exc1
755        except Exception as exc:
756            self.assertIs(exc, exc5)
757            self.assertIs(exc.__context__, exc4)
758            self.assertIs(exc.__context__.__context__, exc3)
759            self.assertIs(exc.__context__.__context__.__context__, exc2)
760            self.assertIs(
761                 exc.__context__.__context__.__context__.__context__, exc1)
762            self.assertIsNone(
763                exc.__context__.__context__.__context__.__context__.__context__)
764
765
766
767    def test_body_exception_suppress(self):
768        def suppress_exc(*exc_details):
769            return True
770        try:
771            with ExitStack() as stack:
772                stack.push(suppress_exc)
773                1/0
774        except IndexError as exc:
775            self.fail("Expected no exception, got IndexError")
776
777    def test_exit_exception_chaining_suppress(self):
778        with ExitStack() as stack:
779            stack.push(lambda *exc: True)
780            stack.push(lambda *exc: 1/0)
781            stack.push(lambda *exc: {}[1])
782
783    def test_excessive_nesting(self):
784        # The original implementation would die with RecursionError here
785        with ExitStack() as stack:
786            for i in range(10000):
787                stack.callback(int)
788
789    def test_instance_bypass(self):
790        class Example(object): pass
791        cm = Example()
792        cm.__exit__ = object()
793        stack = ExitStack()
794        self.assertRaises(AttributeError, stack.enter_context, cm)
795        stack.push(cm)
796        self.assertIs(stack._exit_callbacks[-1], cm)
797
798    def test_dont_reraise_RuntimeError(self):
799        # https://bugs.python.org/issue27122
800        class UniqueException(Exception): pass
801        class UniqueRuntimeError(RuntimeError): pass
802
803        @contextmanager
804        def second():
805            try:
806                yield 1
807            except Exception as exc:
808                raise UniqueException("new exception") from exc
809
810        @contextmanager
811        def first():
812            try:
813                yield 1
814            except Exception as exc:
815                raise exc
816
817        # The UniqueRuntimeError should be caught by second()'s exception
818        # handler which chain raised a new UniqueException.
819        with self.assertRaises(UniqueException) as err_ctx:
820            with ExitStack() as es_ctx:
821                es_ctx.enter_context(second())
822                es_ctx.enter_context(first())
823                raise UniqueRuntimeError("please no infinite loop.")
824
825        exc = err_ctx.exception
826        self.assertIsInstance(exc, UniqueException)
827        self.assertIsInstance(exc.__context__, UniqueRuntimeError)
828        self.assertIsNone(exc.__context__.__context__)
829        self.assertIsNone(exc.__context__.__cause__)
830        self.assertIs(exc.__cause__, exc.__context__)
831
832
833class TestRedirectStream:
834
835    redirect_stream = None
836    orig_stream = None
837
838    @support.requires_docstrings
839    def test_instance_docs(self):
840        # Issue 19330: ensure context manager instances have good docstrings
841        cm_docstring = self.redirect_stream.__doc__
842        obj = self.redirect_stream(None)
843        self.assertEqual(obj.__doc__, cm_docstring)
844
845    def test_no_redirect_in_init(self):
846        orig_stdout = getattr(sys, self.orig_stream)
847        self.redirect_stream(None)
848        self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
849
850    def test_redirect_to_string_io(self):
851        f = io.StringIO()
852        msg = "Consider an API like help(), which prints directly to stdout"
853        orig_stdout = getattr(sys, self.orig_stream)
854        with self.redirect_stream(f):
855            print(msg, file=getattr(sys, self.orig_stream))
856        self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
857        s = f.getvalue().strip()
858        self.assertEqual(s, msg)
859
860    def test_enter_result_is_target(self):
861        f = io.StringIO()
862        with self.redirect_stream(f) as enter_result:
863            self.assertIs(enter_result, f)
864
865    def test_cm_is_reusable(self):
866        f = io.StringIO()
867        write_to_f = self.redirect_stream(f)
868        orig_stdout = getattr(sys, self.orig_stream)
869        with write_to_f:
870            print("Hello", end=" ", file=getattr(sys, self.orig_stream))
871        with write_to_f:
872            print("World!", file=getattr(sys, self.orig_stream))
873        self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
874        s = f.getvalue()
875        self.assertEqual(s, "Hello World!\n")
876
877    def test_cm_is_reentrant(self):
878        f = io.StringIO()
879        write_to_f = self.redirect_stream(f)
880        orig_stdout = getattr(sys, self.orig_stream)
881        with write_to_f:
882            print("Hello", end=" ", file=getattr(sys, self.orig_stream))
883            with write_to_f:
884                print("World!", file=getattr(sys, self.orig_stream))
885        self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
886        s = f.getvalue()
887        self.assertEqual(s, "Hello World!\n")
888
889
890class TestRedirectStdout(TestRedirectStream, unittest.TestCase):
891
892    redirect_stream = redirect_stdout
893    orig_stream = "stdout"
894
895
896class TestRedirectStderr(TestRedirectStream, unittest.TestCase):
897
898    redirect_stream = redirect_stderr
899    orig_stream = "stderr"
900
901
902class TestSuppress(unittest.TestCase):
903
904    @support.requires_docstrings
905    def test_instance_docs(self):
906        # Issue 19330: ensure context manager instances have good docstrings
907        cm_docstring = suppress.__doc__
908        obj = suppress()
909        self.assertEqual(obj.__doc__, cm_docstring)
910
911    def test_no_result_from_enter(self):
912        with suppress(ValueError) as enter_result:
913            self.assertIsNone(enter_result)
914
915    def test_no_exception(self):
916        with suppress(ValueError):
917            self.assertEqual(pow(2, 5), 32)
918
919    def test_exact_exception(self):
920        with suppress(TypeError):
921            len(5)
922
923    def test_exception_hierarchy(self):
924        with suppress(LookupError):
925            'Hello'[50]
926
927    def test_other_exception(self):
928        with self.assertRaises(ZeroDivisionError):
929            with suppress(TypeError):
930                1/0
931
932    def test_no_args(self):
933        with self.assertRaises(ZeroDivisionError):
934            with suppress():
935                1/0
936
937    def test_multiple_exception_args(self):
938        with suppress(ZeroDivisionError, TypeError):
939            1/0
940        with suppress(ZeroDivisionError, TypeError):
941            len(5)
942
943    def test_cm_is_reentrant(self):
944        ignore_exceptions = suppress(Exception)
945        with ignore_exceptions:
946            pass
947        with ignore_exceptions:
948            len(5)
949        with ignore_exceptions:
950            with ignore_exceptions: # Check nested usage
951                len(5)
952            outer_continued = True
953            1/0
954        self.assertTrue(outer_continued)
955
956if __name__ == "__main__":
957    unittest.main()
958