1"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11#     (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
22from __future__ import print_function
23from __future__ import unicode_literals
24
25import os
26import sys
27import time
28import array
29import random
30import unittest
31import weakref
32import abc
33import signal
34import errno
35from itertools import cycle, count
36from collections import deque
37from UserList import UserList
38from test import test_support as support
39import contextlib
40
41import codecs
42import io  # C implementation of io
43import _pyio as pyio # Python implementation of io
44try:
45    import threading
46except ImportError:
47    threading = None
48try:
49    import fcntl
50except ImportError:
51    fcntl = None
52
53__metaclass__ = type
54bytes = support.py3k_bytes
55
56def _default_chunk_size():
57    """Get the default TextIOWrapper chunk size"""
58    with io.open(__file__, "r", encoding="latin1") as f:
59        return f._CHUNK_SIZE
60
61
62class MockRawIOWithoutRead:
63    """A RawIO implementation without read(), so as to exercise the default
64    RawIO.read() which calls readinto()."""
65
66    def __init__(self, read_stack=()):
67        self._read_stack = list(read_stack)
68        self._write_stack = []
69        self._reads = 0
70        self._extraneous_reads = 0
71
72    def write(self, b):
73        self._write_stack.append(bytes(b))
74        return len(b)
75
76    def writable(self):
77        return True
78
79    def fileno(self):
80        return 42
81
82    def readable(self):
83        return True
84
85    def seekable(self):
86        return True
87
88    def seek(self, pos, whence):
89        return 0   # wrong but we gotta return something
90
91    def tell(self):
92        return 0   # same comment as above
93
94    def readinto(self, buf):
95        self._reads += 1
96        max_len = len(buf)
97        try:
98            data = self._read_stack[0]
99        except IndexError:
100            self._extraneous_reads += 1
101            return 0
102        if data is None:
103            del self._read_stack[0]
104            return None
105        n = len(data)
106        if len(data) <= max_len:
107            del self._read_stack[0]
108            buf[:n] = data
109            return n
110        else:
111            buf[:] = data[:max_len]
112            self._read_stack[0] = data[max_len:]
113            return max_len
114
115    def truncate(self, pos=None):
116        return pos
117
118class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
119    pass
120
121class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
122    pass
123
124
125class MockRawIO(MockRawIOWithoutRead):
126
127    def read(self, n=None):
128        self._reads += 1
129        try:
130            return self._read_stack.pop(0)
131        except:
132            self._extraneous_reads += 1
133            return b""
134
135class CMockRawIO(MockRawIO, io.RawIOBase):
136    pass
137
138class PyMockRawIO(MockRawIO, pyio.RawIOBase):
139    pass
140
141
142class MisbehavedRawIO(MockRawIO):
143    def write(self, b):
144        return MockRawIO.write(self, b) * 2
145
146    def read(self, n=None):
147        return MockRawIO.read(self, n) * 2
148
149    def seek(self, pos, whence):
150        return -123
151
152    def tell(self):
153        return -456
154
155    def readinto(self, buf):
156        MockRawIO.readinto(self, buf)
157        return len(buf) * 5
158
159class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
160    pass
161
162class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
163    pass
164
165
166class CloseFailureIO(MockRawIO):
167    closed = 0
168
169    def close(self):
170        if not self.closed:
171            self.closed = 1
172            raise IOError
173
174class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
175    pass
176
177class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
178    pass
179
180
181class MockFileIO:
182
183    def __init__(self, data):
184        self.read_history = []
185        super(MockFileIO, self).__init__(data)
186
187    def read(self, n=None):
188        res = super(MockFileIO, self).read(n)
189        self.read_history.append(None if res is None else len(res))
190        return res
191
192    def readinto(self, b):
193        res = super(MockFileIO, self).readinto(b)
194        self.read_history.append(res)
195        return res
196
197class CMockFileIO(MockFileIO, io.BytesIO):
198    pass
199
200class PyMockFileIO(MockFileIO, pyio.BytesIO):
201    pass
202
203
204class MockNonBlockWriterIO:
205
206    def __init__(self):
207        self._write_stack = []
208        self._blocker_char = None
209
210    def pop_written(self):
211        s = b"".join(self._write_stack)
212        self._write_stack[:] = []
213        return s
214
215    def block_on(self, char):
216        """Block when a given char is encountered."""
217        self._blocker_char = char
218
219    def readable(self):
220        return True
221
222    def seekable(self):
223        return True
224
225    def writable(self):
226        return True
227
228    def write(self, b):
229        b = bytes(b)
230        n = -1
231        if self._blocker_char:
232            try:
233                n = b.index(self._blocker_char)
234            except ValueError:
235                pass
236            else:
237                if n > 0:
238                    # write data up to the first blocker
239                    self._write_stack.append(b[:n])
240                    return n
241                else:
242                    # cancel blocker and indicate would block
243                    self._blocker_char = None
244                    return None
245        self._write_stack.append(b)
246        return len(b)
247
248class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
249    BlockingIOError = io.BlockingIOError
250
251class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
252    BlockingIOError = pyio.BlockingIOError
253
254
255class IOTest(unittest.TestCase):
256
257    def setUp(self):
258        support.unlink(support.TESTFN)
259
260    def tearDown(self):
261        support.unlink(support.TESTFN)
262
263    def write_ops(self, f):
264        self.assertEqual(f.write(b"blah."), 5)
265        f.truncate(0)
266        self.assertEqual(f.tell(), 5)
267        f.seek(0)
268
269        self.assertEqual(f.write(b"blah."), 5)
270        self.assertEqual(f.seek(0), 0)
271        self.assertEqual(f.write(b"Hello."), 6)
272        self.assertEqual(f.tell(), 6)
273        self.assertEqual(f.seek(-1, 1), 5)
274        self.assertEqual(f.tell(), 5)
275        self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
276        self.assertEqual(f.seek(0), 0)
277        self.assertEqual(f.write(b"h"), 1)
278        self.assertEqual(f.seek(-1, 2), 13)
279        self.assertEqual(f.tell(), 13)
280
281        self.assertEqual(f.truncate(12), 12)
282        self.assertEqual(f.tell(), 13)
283        self.assertRaises(TypeError, f.seek, 0.0)
284
285    def read_ops(self, f, buffered=False):
286        data = f.read(5)
287        self.assertEqual(data, b"hello")
288        data = bytearray(data)
289        self.assertEqual(f.readinto(data), 5)
290        self.assertEqual(data, b" worl")
291        self.assertEqual(f.readinto(data), 2)
292        self.assertEqual(len(data), 5)
293        self.assertEqual(data[:2], b"d\n")
294        self.assertEqual(f.seek(0), 0)
295        self.assertEqual(f.read(20), b"hello world\n")
296        self.assertEqual(f.read(1), b"")
297        self.assertEqual(f.readinto(bytearray(b"x")), 0)
298        self.assertEqual(f.seek(-6, 2), 6)
299        self.assertEqual(f.read(5), b"world")
300        self.assertEqual(f.read(0), b"")
301        self.assertEqual(f.readinto(bytearray()), 0)
302        self.assertEqual(f.seek(-6, 1), 5)
303        self.assertEqual(f.read(5), b" worl")
304        self.assertEqual(f.tell(), 10)
305        self.assertRaises(TypeError, f.seek, 0.0)
306        if buffered:
307            f.seek(0)
308            self.assertEqual(f.read(), b"hello world\n")
309            f.seek(6)
310            self.assertEqual(f.read(), b"world\n")
311            self.assertEqual(f.read(), b"")
312
313    LARGE = 2**31
314
315    def large_file_ops(self, f):
316        assert f.readable()
317        assert f.writable()
318        self.assertEqual(f.seek(self.LARGE), self.LARGE)
319        self.assertEqual(f.tell(), self.LARGE)
320        self.assertEqual(f.write(b"xxx"), 3)
321        self.assertEqual(f.tell(), self.LARGE + 3)
322        self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
323        self.assertEqual(f.truncate(), self.LARGE + 2)
324        self.assertEqual(f.tell(), self.LARGE + 2)
325        self.assertEqual(f.seek(0, 2), self.LARGE + 2)
326        self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
327        self.assertEqual(f.tell(), self.LARGE + 2)
328        self.assertEqual(f.seek(0, 2), self.LARGE + 1)
329        self.assertEqual(f.seek(-1, 2), self.LARGE)
330        self.assertEqual(f.read(2), b"x")
331
332    def test_invalid_operations(self):
333        # Try writing on a file opened in read mode and vice-versa.
334        for mode in ("w", "wb"):
335            with self.open(support.TESTFN, mode) as fp:
336                self.assertRaises(IOError, fp.read)
337                self.assertRaises(IOError, fp.readline)
338        with self.open(support.TESTFN, "rb") as fp:
339            self.assertRaises(IOError, fp.write, b"blah")
340            self.assertRaises(IOError, fp.writelines, [b"blah\n"])
341        with self.open(support.TESTFN, "r") as fp:
342            self.assertRaises(IOError, fp.write, "blah")
343            self.assertRaises(IOError, fp.writelines, ["blah\n"])
344
345    def test_raw_file_io(self):
346        with self.open(support.TESTFN, "wb", buffering=0) as f:
347            self.assertEqual(f.readable(), False)
348            self.assertEqual(f.writable(), True)
349            self.assertEqual(f.seekable(), True)
350            self.write_ops(f)
351        with self.open(support.TESTFN, "rb", buffering=0) as f:
352            self.assertEqual(f.readable(), True)
353            self.assertEqual(f.writable(), False)
354            self.assertEqual(f.seekable(), True)
355            self.read_ops(f)
356
357    def test_buffered_file_io(self):
358        with self.open(support.TESTFN, "wb") as f:
359            self.assertEqual(f.readable(), False)
360            self.assertEqual(f.writable(), True)
361            self.assertEqual(f.seekable(), True)
362            self.write_ops(f)
363        with self.open(support.TESTFN, "rb") as f:
364            self.assertEqual(f.readable(), True)
365            self.assertEqual(f.writable(), False)
366            self.assertEqual(f.seekable(), True)
367            self.read_ops(f, True)
368
369    def test_readline(self):
370        with self.open(support.TESTFN, "wb") as f:
371            f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
372        with self.open(support.TESTFN, "rb") as f:
373            self.assertEqual(f.readline(), b"abc\n")
374            self.assertEqual(f.readline(10), b"def\n")
375            self.assertEqual(f.readline(2), b"xy")
376            self.assertEqual(f.readline(4), b"zzy\n")
377            self.assertEqual(f.readline(), b"foo\x00bar\n")
378            self.assertEqual(f.readline(None), b"another line")
379            self.assertRaises(TypeError, f.readline, 5.3)
380        with self.open(support.TESTFN, "r") as f:
381            self.assertRaises(TypeError, f.readline, 5.3)
382
383    def test_raw_bytes_io(self):
384        f = self.BytesIO()
385        self.write_ops(f)
386        data = f.getvalue()
387        self.assertEqual(data, b"hello world\n")
388        f = self.BytesIO(data)
389        self.read_ops(f, True)
390
391    def test_large_file_ops(self):
392        # On Windows and Mac OSX this test comsumes large resources; It takes
393        # a long time to build the >2GB file and takes >2GB of disk space
394        # therefore the resource must be enabled to run this test.
395        if sys.platform[:3] == 'win' or sys.platform == 'darwin':
396            if not support.is_resource_enabled("largefile"):
397                print("\nTesting large file ops skipped on %s." % sys.platform,
398                      file=sys.stderr)
399                print("It requires %d bytes and a long time." % self.LARGE,
400                      file=sys.stderr)
401                print("Use 'regrtest.py -u largefile test_io' to run it.",
402                      file=sys.stderr)
403                return
404        with self.open(support.TESTFN, "w+b", 0) as f:
405            self.large_file_ops(f)
406        with self.open(support.TESTFN, "w+b") as f:
407            self.large_file_ops(f)
408
409    def test_with_open(self):
410        for bufsize in (0, 1, 100):
411            f = None
412            with self.open(support.TESTFN, "wb", bufsize) as f:
413                f.write(b"xxx")
414            self.assertEqual(f.closed, True)
415            f = None
416            try:
417                with self.open(support.TESTFN, "wb", bufsize) as f:
418                    1 // 0
419            except ZeroDivisionError:
420                self.assertEqual(f.closed, True)
421            else:
422                self.fail("1 // 0 didn't raise an exception")
423
424    # issue 5008
425    def test_append_mode_tell(self):
426        with self.open(support.TESTFN, "wb") as f:
427            f.write(b"xxx")
428        with self.open(support.TESTFN, "ab", buffering=0) as f:
429            self.assertEqual(f.tell(), 3)
430        with self.open(support.TESTFN, "ab") as f:
431            self.assertEqual(f.tell(), 3)
432        with self.open(support.TESTFN, "a") as f:
433            self.assertTrue(f.tell() > 0)
434
435    def test_destructor(self):
436        record = []
437        class MyFileIO(self.FileIO):
438            def __del__(self):
439                record.append(1)
440                try:
441                    f = super(MyFileIO, self).__del__
442                except AttributeError:
443                    pass
444                else:
445                    f()
446            def close(self):
447                record.append(2)
448                super(MyFileIO, self).close()
449            def flush(self):
450                record.append(3)
451                super(MyFileIO, self).flush()
452        f = MyFileIO(support.TESTFN, "wb")
453        f.write(b"xxx")
454        del f
455        support.gc_collect()
456        self.assertEqual(record, [1, 2, 3])
457        with self.open(support.TESTFN, "rb") as f:
458            self.assertEqual(f.read(), b"xxx")
459
460    def _check_base_destructor(self, base):
461        record = []
462        class MyIO(base):
463            def __init__(self):
464                # This exercises the availability of attributes on object
465                # destruction.
466                # (in the C version, close() is called by the tp_dealloc
467                # function, not by __del__)
468                self.on_del = 1
469                self.on_close = 2
470                self.on_flush = 3
471            def __del__(self):
472                record.append(self.on_del)
473                try:
474                    f = super(MyIO, self).__del__
475                except AttributeError:
476                    pass
477                else:
478                    f()
479            def close(self):
480                record.append(self.on_close)
481                super(MyIO, self).close()
482            def flush(self):
483                record.append(self.on_flush)
484                super(MyIO, self).flush()
485        f = MyIO()
486        del f
487        support.gc_collect()
488        self.assertEqual(record, [1, 2, 3])
489
490    def test_IOBase_destructor(self):
491        self._check_base_destructor(self.IOBase)
492
493    def test_RawIOBase_destructor(self):
494        self._check_base_destructor(self.RawIOBase)
495
496    def test_BufferedIOBase_destructor(self):
497        self._check_base_destructor(self.BufferedIOBase)
498
499    def test_TextIOBase_destructor(self):
500        self._check_base_destructor(self.TextIOBase)
501
502    def test_close_flushes(self):
503        with self.open(support.TESTFN, "wb") as f:
504            f.write(b"xxx")
505        with self.open(support.TESTFN, "rb") as f:
506            self.assertEqual(f.read(), b"xxx")
507
508    def test_array_writes(self):
509        a = array.array(b'i', range(10))
510        n = len(a.tostring())
511        with self.open(support.TESTFN, "wb", 0) as f:
512            self.assertEqual(f.write(a), n)
513        with self.open(support.TESTFN, "wb") as f:
514            self.assertEqual(f.write(a), n)
515
516    def test_closefd(self):
517        self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
518                          closefd=False)
519
520    def test_read_closed(self):
521        with self.open(support.TESTFN, "w") as f:
522            f.write("egg\n")
523        with self.open(support.TESTFN, "r") as f:
524            file = self.open(f.fileno(), "r", closefd=False)
525            self.assertEqual(file.read(), "egg\n")
526            file.seek(0)
527            file.close()
528            self.assertRaises(ValueError, file.read)
529
530    def test_no_closefd_with_filename(self):
531        # can't use closefd in combination with a file name
532        self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
533
534    def test_closefd_attr(self):
535        with self.open(support.TESTFN, "wb") as f:
536            f.write(b"egg\n")
537        with self.open(support.TESTFN, "r") as f:
538            self.assertEqual(f.buffer.raw.closefd, True)
539            file = self.open(f.fileno(), "r", closefd=False)
540            self.assertEqual(file.buffer.raw.closefd, False)
541
542    def test_garbage_collection(self):
543        # FileIO objects are collected, and collecting them flushes
544        # all data to disk.
545        f = self.FileIO(support.TESTFN, "wb")
546        f.write(b"abcxxx")
547        f.f = f
548        wr = weakref.ref(f)
549        del f
550        support.gc_collect()
551        self.assertTrue(wr() is None, wr)
552        with self.open(support.TESTFN, "rb") as f:
553            self.assertEqual(f.read(), b"abcxxx")
554
555    def test_unbounded_file(self):
556        # Issue #1174606: reading from an unbounded stream such as /dev/zero.
557        zero = "/dev/zero"
558        if not os.path.exists(zero):
559            self.skipTest("{0} does not exist".format(zero))
560        if sys.maxsize > 0x7FFFFFFF:
561            self.skipTest("test can only run in a 32-bit address space")
562        if support.real_max_memuse < support._2G:
563            self.skipTest("test requires at least 2GB of memory")
564        with self.open(zero, "rb", buffering=0) as f:
565            self.assertRaises(OverflowError, f.read)
566        with self.open(zero, "rb") as f:
567            self.assertRaises(OverflowError, f.read)
568        with self.open(zero, "r") as f:
569            self.assertRaises(OverflowError, f.read)
570
571    def test_flush_error_on_close(self):
572        f = self.open(support.TESTFN, "wb", buffering=0)
573        def bad_flush():
574            raise IOError()
575        f.flush = bad_flush
576        self.assertRaises(IOError, f.close) # exception not swallowed
577        self.assertTrue(f.closed)
578
579    def test_multi_close(self):
580        f = self.open(support.TESTFN, "wb", buffering=0)
581        f.close()
582        f.close()
583        f.close()
584        self.assertRaises(ValueError, f.flush)
585
586    def test_RawIOBase_read(self):
587        # Exercise the default RawIOBase.read() implementation (which calls
588        # readinto() internally).
589        rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
590        self.assertEqual(rawio.read(2), b"ab")
591        self.assertEqual(rawio.read(2), b"c")
592        self.assertEqual(rawio.read(2), b"d")
593        self.assertEqual(rawio.read(2), None)
594        self.assertEqual(rawio.read(2), b"ef")
595        self.assertEqual(rawio.read(2), b"g")
596        self.assertEqual(rawio.read(2), None)
597        self.assertEqual(rawio.read(2), b"")
598
599    def test_fileio_closefd(self):
600        # Issue #4841
601        with self.open(__file__, 'rb') as f1, \
602             self.open(__file__, 'rb') as f2:
603            fileio = self.FileIO(f1.fileno(), closefd=False)
604            # .__init__() must not close f1
605            fileio.__init__(f2.fileno(), closefd=False)
606            f1.readline()
607            # .close() must not close f2
608            fileio.close()
609            f2.readline()
610
611
612class CIOTest(IOTest):
613
614    def test_IOBase_finalize(self):
615        # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
616        # class which inherits IOBase and an object of this class are caught
617        # in a reference cycle and close() is already in the method cache.
618        class MyIO(self.IOBase):
619            def close(self):
620                pass
621
622        # create an instance to populate the method cache
623        MyIO()
624        obj = MyIO()
625        obj.obj = obj
626        wr = weakref.ref(obj)
627        del MyIO
628        del obj
629        support.gc_collect()
630        self.assertTrue(wr() is None, wr)
631
632class PyIOTest(IOTest):
633    test_array_writes = unittest.skip(
634        "len(array.array) returns number of elements rather than bytelength"
635    )(IOTest.test_array_writes)
636
637
638class CommonBufferedTests:
639    # Tests common to BufferedReader, BufferedWriter and BufferedRandom
640
641    def test_detach(self):
642        raw = self.MockRawIO()
643        buf = self.tp(raw)
644        self.assertIs(buf.detach(), raw)
645        self.assertRaises(ValueError, buf.detach)
646
647    def test_fileno(self):
648        rawio = self.MockRawIO()
649        bufio = self.tp(rawio)
650
651        self.assertEqual(42, bufio.fileno())
652
653    def test_no_fileno(self):
654        # XXX will we always have fileno() function? If so, kill
655        # this test. Else, write it.
656        pass
657
658    def test_invalid_args(self):
659        rawio = self.MockRawIO()
660        bufio = self.tp(rawio)
661        # Invalid whence
662        self.assertRaises(ValueError, bufio.seek, 0, -1)
663        self.assertRaises(ValueError, bufio.seek, 0, 3)
664
665    def test_override_destructor(self):
666        tp = self.tp
667        record = []
668        class MyBufferedIO(tp):
669            def __del__(self):
670                record.append(1)
671                try:
672                    f = super(MyBufferedIO, self).__del__
673                except AttributeError:
674                    pass
675                else:
676                    f()
677            def close(self):
678                record.append(2)
679                super(MyBufferedIO, self).close()
680            def flush(self):
681                record.append(3)
682                super(MyBufferedIO, self).flush()
683        rawio = self.MockRawIO()
684        bufio = MyBufferedIO(rawio)
685        writable = bufio.writable()
686        del bufio
687        support.gc_collect()
688        if writable:
689            self.assertEqual(record, [1, 2, 3])
690        else:
691            self.assertEqual(record, [1, 2])
692
693    def test_context_manager(self):
694        # Test usability as a context manager
695        rawio = self.MockRawIO()
696        bufio = self.tp(rawio)
697        def _with():
698            with bufio:
699                pass
700        _with()
701        # bufio should now be closed, and using it a second time should raise
702        # a ValueError.
703        self.assertRaises(ValueError, _with)
704
705    def test_error_through_destructor(self):
706        # Test that the exception state is not modified by a destructor,
707        # even if close() fails.
708        rawio = self.CloseFailureIO()
709        def f():
710            self.tp(rawio).xyzzy
711        with support.captured_output("stderr") as s:
712            self.assertRaises(AttributeError, f)
713        s = s.getvalue().strip()
714        if s:
715            # The destructor *may* have printed an unraisable error, check it
716            self.assertEqual(len(s.splitlines()), 1)
717            self.assertTrue(s.startswith("Exception IOError: "), s)
718            self.assertTrue(s.endswith(" ignored"), s)
719
720    def test_repr(self):
721        raw = self.MockRawIO()
722        b = self.tp(raw)
723        clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
724        self.assertEqual(repr(b), "<%s>" % clsname)
725        raw.name = "dummy"
726        self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
727        raw.name = b"dummy"
728        self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
729
730    def test_flush_error_on_close(self):
731        raw = self.MockRawIO()
732        def bad_flush():
733            raise IOError()
734        raw.flush = bad_flush
735        b = self.tp(raw)
736        self.assertRaises(IOError, b.close) # exception not swallowed
737        self.assertTrue(b.closed)
738
739    def test_close_error_on_close(self):
740        raw = self.MockRawIO()
741        def bad_flush():
742            raise IOError('flush')
743        def bad_close():
744            raise IOError('close')
745        raw.close = bad_close
746        b = self.tp(raw)
747        b.flush = bad_flush
748        with self.assertRaises(IOError) as err: # exception not swallowed
749            b.close()
750        self.assertEqual(err.exception.args, ('close',))
751        self.assertFalse(b.closed)
752
753    def test_multi_close(self):
754        raw = self.MockRawIO()
755        b = self.tp(raw)
756        b.close()
757        b.close()
758        b.close()
759        self.assertRaises(ValueError, b.flush)
760
761    def test_readonly_attributes(self):
762        raw = self.MockRawIO()
763        buf = self.tp(raw)
764        x = self.MockRawIO()
765        with self.assertRaises((AttributeError, TypeError)):
766            buf.raw = x
767
768
769class SizeofTest:
770
771    @support.cpython_only
772    def test_sizeof(self):
773        bufsize1 = 4096
774        bufsize2 = 8192
775        rawio = self.MockRawIO()
776        bufio = self.tp(rawio, buffer_size=bufsize1)
777        size = sys.getsizeof(bufio) - bufsize1
778        rawio = self.MockRawIO()
779        bufio = self.tp(rawio, buffer_size=bufsize2)
780        self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
781
782
783class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
784    read_mode = "rb"
785
786    def test_constructor(self):
787        rawio = self.MockRawIO([b"abc"])
788        bufio = self.tp(rawio)
789        bufio.__init__(rawio)
790        bufio.__init__(rawio, buffer_size=1024)
791        bufio.__init__(rawio, buffer_size=16)
792        self.assertEqual(b"abc", bufio.read())
793        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
794        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
795        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
796        rawio = self.MockRawIO([b"abc"])
797        bufio.__init__(rawio)
798        self.assertEqual(b"abc", bufio.read())
799
800    def test_read(self):
801        for arg in (None, 7):
802            rawio = self.MockRawIO((b"abc", b"d", b"efg"))
803            bufio = self.tp(rawio)
804            self.assertEqual(b"abcdefg", bufio.read(arg))
805        # Invalid args
806        self.assertRaises(ValueError, bufio.read, -2)
807
808    def test_read1(self):
809        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
810        bufio = self.tp(rawio)
811        self.assertEqual(b"a", bufio.read(1))
812        self.assertEqual(b"b", bufio.read1(1))
813        self.assertEqual(rawio._reads, 1)
814        self.assertEqual(b"c", bufio.read1(100))
815        self.assertEqual(rawio._reads, 1)
816        self.assertEqual(b"d", bufio.read1(100))
817        self.assertEqual(rawio._reads, 2)
818        self.assertEqual(b"efg", bufio.read1(100))
819        self.assertEqual(rawio._reads, 3)
820        self.assertEqual(b"", bufio.read1(100))
821        self.assertEqual(rawio._reads, 4)
822        # Invalid args
823        self.assertRaises(ValueError, bufio.read1, -1)
824
825    def test_readinto(self):
826        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
827        bufio = self.tp(rawio)
828        b = bytearray(2)
829        self.assertEqual(bufio.readinto(b), 2)
830        self.assertEqual(b, b"ab")
831        self.assertEqual(bufio.readinto(b), 2)
832        self.assertEqual(b, b"cd")
833        self.assertEqual(bufio.readinto(b), 2)
834        self.assertEqual(b, b"ef")
835        self.assertEqual(bufio.readinto(b), 1)
836        self.assertEqual(b, b"gf")
837        self.assertEqual(bufio.readinto(b), 0)
838        self.assertEqual(b, b"gf")
839
840    def test_readlines(self):
841        def bufio():
842            rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
843            return self.tp(rawio)
844        self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
845        self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
846        self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
847
848    def test_buffering(self):
849        data = b"abcdefghi"
850        dlen = len(data)
851
852        tests = [
853            [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
854            [ 100, [ 3, 3, 3],     [ dlen ]    ],
855            [   4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
856        ]
857
858        for bufsize, buf_read_sizes, raw_read_sizes in tests:
859            rawio = self.MockFileIO(data)
860            bufio = self.tp(rawio, buffer_size=bufsize)
861            pos = 0
862            for nbytes in buf_read_sizes:
863                self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
864                pos += nbytes
865            # this is mildly implementation-dependent
866            self.assertEqual(rawio.read_history, raw_read_sizes)
867
868    def test_read_non_blocking(self):
869        # Inject some None's in there to simulate EWOULDBLOCK
870        rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
871        bufio = self.tp(rawio)
872        self.assertEqual(b"abcd", bufio.read(6))
873        self.assertEqual(b"e", bufio.read(1))
874        self.assertEqual(b"fg", bufio.read())
875        self.assertEqual(b"", bufio.peek(1))
876        self.assertIsNone(bufio.read())
877        self.assertEqual(b"", bufio.read())
878
879        rawio = self.MockRawIO((b"a", None, None))
880        self.assertEqual(b"a", rawio.readall())
881        self.assertIsNone(rawio.readall())
882
883    def test_read_past_eof(self):
884        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
885        bufio = self.tp(rawio)
886
887        self.assertEqual(b"abcdefg", bufio.read(9000))
888
889    def test_read_all(self):
890        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
891        bufio = self.tp(rawio)
892
893        self.assertEqual(b"abcdefg", bufio.read())
894
895    @unittest.skipUnless(threading, 'Threading required for this test.')
896    @support.requires_resource('cpu')
897    def test_threads(self):
898        try:
899            # Write out many bytes with exactly the same number of 0's,
900            # 1's... 255's. This will help us check that concurrent reading
901            # doesn't duplicate or forget contents.
902            N = 1000
903            l = list(range(256)) * N
904            random.shuffle(l)
905            s = bytes(bytearray(l))
906            with self.open(support.TESTFN, "wb") as f:
907                f.write(s)
908            with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
909                bufio = self.tp(raw, 8)
910                errors = []
911                results = []
912                def f():
913                    try:
914                        # Intra-buffer read then buffer-flushing read
915                        for n in cycle([1, 19]):
916                            s = bufio.read(n)
917                            if not s:
918                                break
919                            # list.append() is atomic
920                            results.append(s)
921                    except Exception as e:
922                        errors.append(e)
923                        raise
924                threads = [threading.Thread(target=f) for x in range(20)]
925                for t in threads:
926                    t.start()
927                time.sleep(0.02) # yield
928                for t in threads:
929                    t.join()
930                self.assertFalse(errors,
931                    "the following exceptions were caught: %r" % errors)
932                s = b''.join(results)
933                for i in range(256):
934                    c = bytes(bytearray([i]))
935                    self.assertEqual(s.count(c), N)
936        finally:
937            support.unlink(support.TESTFN)
938
939    def test_misbehaved_io(self):
940        rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
941        bufio = self.tp(rawio)
942        self.assertRaises(IOError, bufio.seek, 0)
943        self.assertRaises(IOError, bufio.tell)
944
945    def test_no_extraneous_read(self):
946        # Issue #9550; when the raw IO object has satisfied the read request,
947        # we should not issue any additional reads, otherwise it may block
948        # (e.g. socket).
949        bufsize = 16
950        for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
951            rawio = self.MockRawIO([b"x" * n])
952            bufio = self.tp(rawio, bufsize)
953            self.assertEqual(bufio.read(n), b"x" * n)
954            # Simple case: one raw read is enough to satisfy the request.
955            self.assertEqual(rawio._extraneous_reads, 0,
956                             "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
957            # A more complex case where two raw reads are needed to satisfy
958            # the request.
959            rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
960            bufio = self.tp(rawio, bufsize)
961            self.assertEqual(bufio.read(n), b"x" * n)
962            self.assertEqual(rawio._extraneous_reads, 0,
963                             "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
964
965
966class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
967    tp = io.BufferedReader
968
969    def test_constructor(self):
970        BufferedReaderTest.test_constructor(self)
971        # The allocation can succeed on 32-bit builds, e.g. with more
972        # than 2GB RAM and a 64-bit kernel.
973        if sys.maxsize > 0x7FFFFFFF:
974            rawio = self.MockRawIO()
975            bufio = self.tp(rawio)
976            self.assertRaises((OverflowError, MemoryError, ValueError),
977                bufio.__init__, rawio, sys.maxsize)
978
979    def test_initialization(self):
980        rawio = self.MockRawIO([b"abc"])
981        bufio = self.tp(rawio)
982        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
983        self.assertRaises(ValueError, bufio.read)
984        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
985        self.assertRaises(ValueError, bufio.read)
986        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
987        self.assertRaises(ValueError, bufio.read)
988
989    def test_misbehaved_io_read(self):
990        rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
991        bufio = self.tp(rawio)
992        # _pyio.BufferedReader seems to implement reading different, so that
993        # checking this is not so easy.
994        self.assertRaises(IOError, bufio.read, 10)
995
996    def test_garbage_collection(self):
997        # C BufferedReader objects are collected.
998        # The Python version has __del__, so it ends into gc.garbage instead
999        rawio = self.FileIO(support.TESTFN, "w+b")
1000        f = self.tp(rawio)
1001        f.f = f
1002        wr = weakref.ref(f)
1003        del f
1004        support.gc_collect()
1005        self.assertTrue(wr() is None, wr)
1006
1007    def test_args_error(self):
1008        # Issue #17275
1009        with self.assertRaisesRegexp(TypeError, "BufferedReader"):
1010            self.tp(io.BytesIO(), 1024, 1024, 1024)
1011
1012
1013class PyBufferedReaderTest(BufferedReaderTest):
1014    tp = pyio.BufferedReader
1015
1016
1017class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1018    write_mode = "wb"
1019
1020    def test_constructor(self):
1021        rawio = self.MockRawIO()
1022        bufio = self.tp(rawio)
1023        bufio.__init__(rawio)
1024        bufio.__init__(rawio, buffer_size=1024)
1025        bufio.__init__(rawio, buffer_size=16)
1026        self.assertEqual(3, bufio.write(b"abc"))
1027        bufio.flush()
1028        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1029        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1030        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1031        bufio.__init__(rawio)
1032        self.assertEqual(3, bufio.write(b"ghi"))
1033        bufio.flush()
1034        self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
1035
1036    def test_detach_flush(self):
1037        raw = self.MockRawIO()
1038        buf = self.tp(raw)
1039        buf.write(b"howdy!")
1040        self.assertFalse(raw._write_stack)
1041        buf.detach()
1042        self.assertEqual(raw._write_stack, [b"howdy!"])
1043
1044    def test_write(self):
1045        # Write to the buffered IO but don't overflow the buffer.
1046        writer = self.MockRawIO()
1047        bufio = self.tp(writer, 8)
1048        bufio.write(b"abc")
1049        self.assertFalse(writer._write_stack)
1050
1051    def test_write_overflow(self):
1052        writer = self.MockRawIO()
1053        bufio = self.tp(writer, 8)
1054        contents = b"abcdefghijklmnop"
1055        for n in range(0, len(contents), 3):
1056            bufio.write(contents[n:n+3])
1057        flushed = b"".join(writer._write_stack)
1058        # At least (total - 8) bytes were implicitly flushed, perhaps more
1059        # depending on the implementation.
1060        self.assertTrue(flushed.startswith(contents[:-8]), flushed)
1061
1062    def check_writes(self, intermediate_func):
1063        # Lots of writes, test the flushed output is as expected.
1064        contents = bytes(range(256)) * 1000
1065        n = 0
1066        writer = self.MockRawIO()
1067        bufio = self.tp(writer, 13)
1068        # Generator of write sizes: repeat each N 15 times then proceed to N+1
1069        def gen_sizes():
1070            for size in count(1):
1071                for i in range(15):
1072                    yield size
1073        sizes = gen_sizes()
1074        while n < len(contents):
1075            size = min(next(sizes), len(contents) - n)
1076            self.assertEqual(bufio.write(contents[n:n+size]), size)
1077            intermediate_func(bufio)
1078            n += size
1079        bufio.flush()
1080        self.assertEqual(contents,
1081            b"".join(writer._write_stack))
1082
1083    def test_writes(self):
1084        self.check_writes(lambda bufio: None)
1085
1086    def test_writes_and_flushes(self):
1087        self.check_writes(lambda bufio: bufio.flush())
1088
1089    def test_writes_and_seeks(self):
1090        def _seekabs(bufio):
1091            pos = bufio.tell()
1092            bufio.seek(pos + 1, 0)
1093            bufio.seek(pos - 1, 0)
1094            bufio.seek(pos, 0)
1095        self.check_writes(_seekabs)
1096        def _seekrel(bufio):
1097            pos = bufio.seek(0, 1)
1098            bufio.seek(+1, 1)
1099            bufio.seek(-1, 1)
1100            bufio.seek(pos, 0)
1101        self.check_writes(_seekrel)
1102
1103    def test_writes_and_truncates(self):
1104        self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
1105
1106    def test_write_non_blocking(self):
1107        raw = self.MockNonBlockWriterIO()
1108        bufio = self.tp(raw, 8)
1109
1110        self.assertEqual(bufio.write(b"abcd"), 4)
1111        self.assertEqual(bufio.write(b"efghi"), 5)
1112        # 1 byte will be written, the rest will be buffered
1113        raw.block_on(b"k")
1114        self.assertEqual(bufio.write(b"jklmn"), 5)
1115
1116        # 8 bytes will be written, 8 will be buffered and the rest will be lost
1117        raw.block_on(b"0")
1118        try:
1119            bufio.write(b"opqrwxyz0123456789")
1120        except self.BlockingIOError as e:
1121            written = e.characters_written
1122        else:
1123            self.fail("BlockingIOError should have been raised")
1124        self.assertEqual(written, 16)
1125        self.assertEqual(raw.pop_written(),
1126            b"abcdefghijklmnopqrwxyz")
1127
1128        self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
1129        s = raw.pop_written()
1130        # Previously buffered bytes were flushed
1131        self.assertTrue(s.startswith(b"01234567A"), s)
1132
1133    def test_write_and_rewind(self):
1134        raw = io.BytesIO()
1135        bufio = self.tp(raw, 4)
1136        self.assertEqual(bufio.write(b"abcdef"), 6)
1137        self.assertEqual(bufio.tell(), 6)
1138        bufio.seek(0, 0)
1139        self.assertEqual(bufio.write(b"XY"), 2)
1140        bufio.seek(6, 0)
1141        self.assertEqual(raw.getvalue(), b"XYcdef")
1142        self.assertEqual(bufio.write(b"123456"), 6)
1143        bufio.flush()
1144        self.assertEqual(raw.getvalue(), b"XYcdef123456")
1145
1146    def test_flush(self):
1147        writer = self.MockRawIO()
1148        bufio = self.tp(writer, 8)
1149        bufio.write(b"abc")
1150        bufio.flush()
1151        self.assertEqual(b"abc", writer._write_stack[0])
1152
1153    def test_writelines(self):
1154        l = [b'ab', b'cd', b'ef']
1155        writer = self.MockRawIO()
1156        bufio = self.tp(writer, 8)
1157        bufio.writelines(l)
1158        bufio.flush()
1159        self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1160
1161    def test_writelines_userlist(self):
1162        l = UserList([b'ab', b'cd', b'ef'])
1163        writer = self.MockRawIO()
1164        bufio = self.tp(writer, 8)
1165        bufio.writelines(l)
1166        bufio.flush()
1167        self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1168
1169    def test_writelines_error(self):
1170        writer = self.MockRawIO()
1171        bufio = self.tp(writer, 8)
1172        self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1173        self.assertRaises(TypeError, bufio.writelines, None)
1174
1175    def test_destructor(self):
1176        writer = self.MockRawIO()
1177        bufio = self.tp(writer, 8)
1178        bufio.write(b"abc")
1179        del bufio
1180        support.gc_collect()
1181        self.assertEqual(b"abc", writer._write_stack[0])
1182
1183    def test_truncate(self):
1184        # Truncate implicitly flushes the buffer.
1185        with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
1186            bufio = self.tp(raw, 8)
1187            bufio.write(b"abcdef")
1188            self.assertEqual(bufio.truncate(3), 3)
1189            self.assertEqual(bufio.tell(), 6)
1190        with self.open(support.TESTFN, "rb", buffering=0) as f:
1191            self.assertEqual(f.read(), b"abc")
1192
1193    @unittest.skipUnless(threading, 'Threading required for this test.')
1194    @support.requires_resource('cpu')
1195    def test_threads(self):
1196        try:
1197            # Write out many bytes from many threads and test they were
1198            # all flushed.
1199            N = 1000
1200            contents = bytes(range(256)) * N
1201            sizes = cycle([1, 19])
1202            n = 0
1203            queue = deque()
1204            while n < len(contents):
1205                size = next(sizes)
1206                queue.append(contents[n:n+size])
1207                n += size
1208            del contents
1209            # We use a real file object because it allows us to
1210            # exercise situations where the GIL is released before
1211            # writing the buffer to the raw streams. This is in addition
1212            # to concurrency issues due to switching threads in the middle
1213            # of Python code.
1214            with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
1215                bufio = self.tp(raw, 8)
1216                errors = []
1217                def f():
1218                    try:
1219                        while True:
1220                            try:
1221                                s = queue.popleft()
1222                            except IndexError:
1223                                return
1224                            bufio.write(s)
1225                    except Exception as e:
1226                        errors.append(e)
1227                        raise
1228                threads = [threading.Thread(target=f) for x in range(20)]
1229                for t in threads:
1230                    t.start()
1231                time.sleep(0.02) # yield
1232                for t in threads:
1233                    t.join()
1234                self.assertFalse(errors,
1235                    "the following exceptions were caught: %r" % errors)
1236                bufio.close()
1237            with self.open(support.TESTFN, "rb") as f:
1238                s = f.read()
1239            for i in range(256):
1240                self.assertEqual(s.count(bytes([i])), N)
1241        finally:
1242            support.unlink(support.TESTFN)
1243
1244    def test_misbehaved_io(self):
1245        rawio = self.MisbehavedRawIO()
1246        bufio = self.tp(rawio, 5)
1247        self.assertRaises(IOError, bufio.seek, 0)
1248        self.assertRaises(IOError, bufio.tell)
1249        self.assertRaises(IOError, bufio.write, b"abcdef")
1250
1251    def test_max_buffer_size_deprecation(self):
1252        with support.check_warnings(("max_buffer_size is deprecated",
1253                                     DeprecationWarning)):
1254            self.tp(self.MockRawIO(), 8, 12)
1255
1256    def test_write_error_on_close(self):
1257        raw = self.MockRawIO()
1258        def bad_write(b):
1259            raise IOError()
1260        raw.write = bad_write
1261        b = self.tp(raw)
1262        b.write(b'spam')
1263        self.assertRaises(IOError, b.close) # exception not swallowed
1264        self.assertTrue(b.closed)
1265
1266
1267class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
1268    tp = io.BufferedWriter
1269
1270    def test_constructor(self):
1271        BufferedWriterTest.test_constructor(self)
1272        # The allocation can succeed on 32-bit builds, e.g. with more
1273        # than 2GB RAM and a 64-bit kernel.
1274        if sys.maxsize > 0x7FFFFFFF:
1275            rawio = self.MockRawIO()
1276            bufio = self.tp(rawio)
1277            self.assertRaises((OverflowError, MemoryError, ValueError),
1278                bufio.__init__, rawio, sys.maxsize)
1279
1280    def test_initialization(self):
1281        rawio = self.MockRawIO()
1282        bufio = self.tp(rawio)
1283        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1284        self.assertRaises(ValueError, bufio.write, b"def")
1285        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1286        self.assertRaises(ValueError, bufio.write, b"def")
1287        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1288        self.assertRaises(ValueError, bufio.write, b"def")
1289
1290    def test_garbage_collection(self):
1291        # C BufferedWriter objects are collected, and collecting them flushes
1292        # all data to disk.
1293        # The Python version has __del__, so it ends into gc.garbage instead
1294        rawio = self.FileIO(support.TESTFN, "w+b")
1295        f = self.tp(rawio)
1296        f.write(b"123xxx")
1297        f.x = f
1298        wr = weakref.ref(f)
1299        del f
1300        support.gc_collect()
1301        self.assertTrue(wr() is None, wr)
1302        with self.open(support.TESTFN, "rb") as f:
1303            self.assertEqual(f.read(), b"123xxx")
1304
1305    def test_args_error(self):
1306        # Issue #17275
1307        with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
1308            self.tp(io.BytesIO(), 1024, 1024, 1024)
1309
1310
1311class PyBufferedWriterTest(BufferedWriterTest):
1312    tp = pyio.BufferedWriter
1313
1314class BufferedRWPairTest(unittest.TestCase):
1315
1316    def test_constructor(self):
1317        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1318        self.assertFalse(pair.closed)
1319
1320    def test_detach(self):
1321        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1322        self.assertRaises(self.UnsupportedOperation, pair.detach)
1323
1324    def test_constructor_max_buffer_size_deprecation(self):
1325        with support.check_warnings(("max_buffer_size is deprecated",
1326                                     DeprecationWarning)):
1327            self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1328
1329    def test_constructor_with_not_readable(self):
1330        class NotReadable(MockRawIO):
1331            def readable(self):
1332                return False
1333
1334        self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1335
1336    def test_constructor_with_not_writeable(self):
1337        class NotWriteable(MockRawIO):
1338            def writable(self):
1339                return False
1340
1341        self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1342
1343    def test_read(self):
1344        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1345
1346        self.assertEqual(pair.read(3), b"abc")
1347        self.assertEqual(pair.read(1), b"d")
1348        self.assertEqual(pair.read(), b"ef")
1349        pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1350        self.assertEqual(pair.read(None), b"abc")
1351
1352    def test_readlines(self):
1353        pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1354        self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1355        self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1356        self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
1357
1358    def test_read1(self):
1359        # .read1() is delegated to the underlying reader object, so this test
1360        # can be shallow.
1361        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1362
1363        self.assertEqual(pair.read1(3), b"abc")
1364
1365    def test_readinto(self):
1366        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1367
1368        data = bytearray(5)
1369        self.assertEqual(pair.readinto(data), 5)
1370        self.assertEqual(data, b"abcde")
1371
1372    def test_write(self):
1373        w = self.MockRawIO()
1374        pair = self.tp(self.MockRawIO(), w)
1375
1376        pair.write(b"abc")
1377        pair.flush()
1378        pair.write(b"def")
1379        pair.flush()
1380        self.assertEqual(w._write_stack, [b"abc", b"def"])
1381
1382    def test_peek(self):
1383        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1384
1385        self.assertTrue(pair.peek(3).startswith(b"abc"))
1386        self.assertEqual(pair.read(3), b"abc")
1387
1388    def test_readable(self):
1389        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1390        self.assertTrue(pair.readable())
1391
1392    def test_writeable(self):
1393        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1394        self.assertTrue(pair.writable())
1395
1396    def test_seekable(self):
1397        # BufferedRWPairs are never seekable, even if their readers and writers
1398        # are.
1399        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1400        self.assertFalse(pair.seekable())
1401
1402    # .flush() is delegated to the underlying writer object and has been
1403    # tested in the test_write method.
1404
1405    def test_close_and_closed(self):
1406        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1407        self.assertFalse(pair.closed)
1408        pair.close()
1409        self.assertTrue(pair.closed)
1410
1411    def test_isatty(self):
1412        class SelectableIsAtty(MockRawIO):
1413            def __init__(self, isatty):
1414                MockRawIO.__init__(self)
1415                self._isatty = isatty
1416
1417            def isatty(self):
1418                return self._isatty
1419
1420        pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1421        self.assertFalse(pair.isatty())
1422
1423        pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1424        self.assertTrue(pair.isatty())
1425
1426        pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1427        self.assertTrue(pair.isatty())
1428
1429        pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1430        self.assertTrue(pair.isatty())
1431
1432class CBufferedRWPairTest(BufferedRWPairTest):
1433    tp = io.BufferedRWPair
1434
1435class PyBufferedRWPairTest(BufferedRWPairTest):
1436    tp = pyio.BufferedRWPair
1437
1438
1439class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1440    read_mode = "rb+"
1441    write_mode = "wb+"
1442
1443    def test_constructor(self):
1444        BufferedReaderTest.test_constructor(self)
1445        BufferedWriterTest.test_constructor(self)
1446
1447    def test_read_and_write(self):
1448        raw = self.MockRawIO((b"asdf", b"ghjk"))
1449        rw = self.tp(raw, 8)
1450
1451        self.assertEqual(b"as", rw.read(2))
1452        rw.write(b"ddd")
1453        rw.write(b"eee")
1454        self.assertFalse(raw._write_stack) # Buffer writes
1455        self.assertEqual(b"ghjk", rw.read())
1456        self.assertEqual(b"dddeee", raw._write_stack[0])
1457
1458    def test_seek_and_tell(self):
1459        raw = self.BytesIO(b"asdfghjkl")
1460        rw = self.tp(raw)
1461
1462        self.assertEqual(b"as", rw.read(2))
1463        self.assertEqual(2, rw.tell())
1464        rw.seek(0, 0)
1465        self.assertEqual(b"asdf", rw.read(4))
1466
1467        rw.write(b"123f")
1468        rw.seek(0, 0)
1469        self.assertEqual(b"asdf123fl", rw.read())
1470        self.assertEqual(9, rw.tell())
1471        rw.seek(-4, 2)
1472        self.assertEqual(5, rw.tell())
1473        rw.seek(2, 1)
1474        self.assertEqual(7, rw.tell())
1475        self.assertEqual(b"fl", rw.read(11))
1476        rw.flush()
1477        self.assertEqual(b"asdf123fl", raw.getvalue())
1478
1479        self.assertRaises(TypeError, rw.seek, 0.0)
1480
1481    def check_flush_and_read(self, read_func):
1482        raw = self.BytesIO(b"abcdefghi")
1483        bufio = self.tp(raw)
1484
1485        self.assertEqual(b"ab", read_func(bufio, 2))
1486        bufio.write(b"12")
1487        self.assertEqual(b"ef", read_func(bufio, 2))
1488        self.assertEqual(6, bufio.tell())
1489        bufio.flush()
1490        self.assertEqual(6, bufio.tell())
1491        self.assertEqual(b"ghi", read_func(bufio))
1492        raw.seek(0, 0)
1493        raw.write(b"XYZ")
1494        # flush() resets the read buffer
1495        bufio.flush()
1496        bufio.seek(0, 0)
1497        self.assertEqual(b"XYZ", read_func(bufio, 3))
1498
1499    def test_flush_and_read(self):
1500        self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1501
1502    def test_flush_and_readinto(self):
1503        def _readinto(bufio, n=-1):
1504            b = bytearray(n if n >= 0 else 9999)
1505            n = bufio.readinto(b)
1506            return bytes(b[:n])
1507        self.check_flush_and_read(_readinto)
1508
1509    def test_flush_and_peek(self):
1510        def _peek(bufio, n=-1):
1511            # This relies on the fact that the buffer can contain the whole
1512            # raw stream, otherwise peek() can return less.
1513            b = bufio.peek(n)
1514            if n != -1:
1515                b = b[:n]
1516            bufio.seek(len(b), 1)
1517            return b
1518        self.check_flush_and_read(_peek)
1519
1520    def test_flush_and_write(self):
1521        raw = self.BytesIO(b"abcdefghi")
1522        bufio = self.tp(raw)
1523
1524        bufio.write(b"123")
1525        bufio.flush()
1526        bufio.write(b"45")
1527        bufio.flush()
1528        bufio.seek(0, 0)
1529        self.assertEqual(b"12345fghi", raw.getvalue())
1530        self.assertEqual(b"12345fghi", bufio.read())
1531
1532    def test_threads(self):
1533        BufferedReaderTest.test_threads(self)
1534        BufferedWriterTest.test_threads(self)
1535
1536    def test_writes_and_peek(self):
1537        def _peek(bufio):
1538            bufio.peek(1)
1539        self.check_writes(_peek)
1540        def _peek(bufio):
1541            pos = bufio.tell()
1542            bufio.seek(-1, 1)
1543            bufio.peek(1)
1544            bufio.seek(pos, 0)
1545        self.check_writes(_peek)
1546
1547    def test_writes_and_reads(self):
1548        def _read(bufio):
1549            bufio.seek(-1, 1)
1550            bufio.read(1)
1551        self.check_writes(_read)
1552
1553    def test_writes_and_read1s(self):
1554        def _read1(bufio):
1555            bufio.seek(-1, 1)
1556            bufio.read1(1)
1557        self.check_writes(_read1)
1558
1559    def test_writes_and_readintos(self):
1560        def _read(bufio):
1561            bufio.seek(-1, 1)
1562            bufio.readinto(bytearray(1))
1563        self.check_writes(_read)
1564
1565    def test_write_after_readahead(self):
1566        # Issue #6629: writing after the buffer was filled by readahead should
1567        # first rewind the raw stream.
1568        for overwrite_size in [1, 5]:
1569            raw = self.BytesIO(b"A" * 10)
1570            bufio = self.tp(raw, 4)
1571            # Trigger readahead
1572            self.assertEqual(bufio.read(1), b"A")
1573            self.assertEqual(bufio.tell(), 1)
1574            # Overwriting should rewind the raw stream if it needs so
1575            bufio.write(b"B" * overwrite_size)
1576            self.assertEqual(bufio.tell(), overwrite_size + 1)
1577            # If the write size was smaller than the buffer size, flush() and
1578            # check that rewind happens.
1579            bufio.flush()
1580            self.assertEqual(bufio.tell(), overwrite_size + 1)
1581            s = raw.getvalue()
1582            self.assertEqual(s,
1583                b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1584
1585    def test_write_rewind_write(self):
1586        # Various combinations of reading / writing / seeking backwards / writing again
1587        def mutate(bufio, pos1, pos2):
1588            assert pos2 >= pos1
1589            # Fill the buffer
1590            bufio.seek(pos1)
1591            bufio.read(pos2 - pos1)
1592            bufio.write(b'\x02')
1593            # This writes earlier than the previous write, but still inside
1594            # the buffer.
1595            bufio.seek(pos1)
1596            bufio.write(b'\x01')
1597
1598        b = b"\x80\x81\x82\x83\x84"
1599        for i in range(0, len(b)):
1600            for j in range(i, len(b)):
1601                raw = self.BytesIO(b)
1602                bufio = self.tp(raw, 100)
1603                mutate(bufio, i, j)
1604                bufio.flush()
1605                expected = bytearray(b)
1606                expected[j] = 2
1607                expected[i] = 1
1608                self.assertEqual(raw.getvalue(), expected,
1609                                 "failed result for i=%d, j=%d" % (i, j))
1610
1611    def test_truncate_after_read_or_write(self):
1612        raw = self.BytesIO(b"A" * 10)
1613        bufio = self.tp(raw, 100)
1614        self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1615        self.assertEqual(bufio.truncate(), 2)
1616        self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1617        self.assertEqual(bufio.truncate(), 4)
1618
1619    def test_misbehaved_io(self):
1620        BufferedReaderTest.test_misbehaved_io(self)
1621        BufferedWriterTest.test_misbehaved_io(self)
1622
1623    def test_interleaved_read_write(self):
1624        # Test for issue #12213
1625        with self.BytesIO(b'abcdefgh') as raw:
1626            with self.tp(raw, 100) as f:
1627                f.write(b"1")
1628                self.assertEqual(f.read(1), b'b')
1629                f.write(b'2')
1630                self.assertEqual(f.read1(1), b'd')
1631                f.write(b'3')
1632                buf = bytearray(1)
1633                f.readinto(buf)
1634                self.assertEqual(buf, b'f')
1635                f.write(b'4')
1636                self.assertEqual(f.peek(1), b'h')
1637                f.flush()
1638                self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1639
1640        with self.BytesIO(b'abc') as raw:
1641            with self.tp(raw, 100) as f:
1642                self.assertEqual(f.read(1), b'a')
1643                f.write(b"2")
1644                self.assertEqual(f.read(1), b'c')
1645                f.flush()
1646                self.assertEqual(raw.getvalue(), b'a2c')
1647
1648    def test_interleaved_readline_write(self):
1649        with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1650            with self.tp(raw) as f:
1651                f.write(b'1')
1652                self.assertEqual(f.readline(), b'b\n')
1653                f.write(b'2')
1654                self.assertEqual(f.readline(), b'def\n')
1655                f.write(b'3')
1656                self.assertEqual(f.readline(), b'\n')
1657                f.flush()
1658                self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1659
1660
1661class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
1662                          BufferedRandomTest, SizeofTest):
1663    tp = io.BufferedRandom
1664
1665    def test_constructor(self):
1666        BufferedRandomTest.test_constructor(self)
1667        # The allocation can succeed on 32-bit builds, e.g. with more
1668        # than 2GB RAM and a 64-bit kernel.
1669        if sys.maxsize > 0x7FFFFFFF:
1670            rawio = self.MockRawIO()
1671            bufio = self.tp(rawio)
1672            self.assertRaises((OverflowError, MemoryError, ValueError),
1673                bufio.__init__, rawio, sys.maxsize)
1674
1675    def test_garbage_collection(self):
1676        CBufferedReaderTest.test_garbage_collection(self)
1677        CBufferedWriterTest.test_garbage_collection(self)
1678
1679    def test_args_error(self):
1680        # Issue #17275
1681        with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
1682            self.tp(io.BytesIO(), 1024, 1024, 1024)
1683
1684
1685class PyBufferedRandomTest(BufferedRandomTest):
1686    tp = pyio.BufferedRandom
1687
1688
1689# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1690# properties:
1691#   - A single output character can correspond to many bytes of input.
1692#   - The number of input bytes to complete the character can be
1693#     undetermined until the last input byte is received.
1694#   - The number of input bytes can vary depending on previous input.
1695#   - A single input byte can correspond to many characters of output.
1696#   - The number of output characters can be undetermined until the
1697#     last input byte is received.
1698#   - The number of output characters can vary depending on previous input.
1699
1700class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1701    """
1702    For testing seek/tell behavior with a stateful, buffering decoder.
1703
1704    Input is a sequence of words.  Words may be fixed-length (length set
1705    by input) or variable-length (period-terminated).  In variable-length
1706    mode, extra periods are ignored.  Possible words are:
1707      - 'i' followed by a number sets the input length, I (maximum 99).
1708        When I is set to 0, words are space-terminated.
1709      - 'o' followed by a number sets the output length, O (maximum 99).
1710      - Any other word is converted into a word followed by a period on
1711        the output.  The output word consists of the input word truncated
1712        or padded out with hyphens to make its length equal to O.  If O
1713        is 0, the word is output verbatim without truncating or padding.
1714    I and O are initially set to 1.  When I changes, any buffered input is
1715    re-scanned according to the new I.  EOF also terminates the last word.
1716    """
1717
1718    def __init__(self, errors='strict'):
1719        codecs.IncrementalDecoder.__init__(self, errors)
1720        self.reset()
1721
1722    def __repr__(self):
1723        return '<SID %x>' % id(self)
1724
1725    def reset(self):
1726        self.i = 1
1727        self.o = 1
1728        self.buffer = bytearray()
1729
1730    def getstate(self):
1731        i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1732        return bytes(self.buffer), i*100 + o
1733
1734    def setstate(self, state):
1735        buffer, io = state
1736        self.buffer = bytearray(buffer)
1737        i, o = divmod(io, 100)
1738        self.i, self.o = i ^ 1, o ^ 1
1739
1740    def decode(self, input, final=False):
1741        output = ''
1742        for b in input:
1743            if self.i == 0: # variable-length, terminated with period
1744                if b == '.':
1745                    if self.buffer:
1746                        output += self.process_word()
1747                else:
1748                    self.buffer.append(b)
1749            else: # fixed-length, terminate after self.i bytes
1750                self.buffer.append(b)
1751                if len(self.buffer) == self.i:
1752                    output += self.process_word()
1753        if final and self.buffer: # EOF terminates the last word
1754            output += self.process_word()
1755        return output
1756
1757    def process_word(self):
1758        output = ''
1759        if self.buffer[0] == ord('i'):
1760            self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1761        elif self.buffer[0] == ord('o'):
1762            self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1763        else:
1764            output = self.buffer.decode('ascii')
1765            if len(output) < self.o:
1766                output += '-'*self.o # pad out with hyphens
1767            if self.o:
1768                output = output[:self.o] # truncate to output length
1769            output += '.'
1770        self.buffer = bytearray()
1771        return output
1772
1773    codecEnabled = False
1774
1775    @classmethod
1776    def lookupTestDecoder(cls, name):
1777        if cls.codecEnabled and name == 'test_decoder':
1778            latin1 = codecs.lookup('latin-1')
1779            return codecs.CodecInfo(
1780                name='test_decoder', encode=latin1.encode, decode=None,
1781                incrementalencoder=None,
1782                streamreader=None, streamwriter=None,
1783                incrementaldecoder=cls)
1784
1785# Register the previous decoder for testing.
1786# Disabled by default, tests will enable it.
1787codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1788
1789
1790class StatefulIncrementalDecoderTest(unittest.TestCase):
1791    """
1792    Make sure the StatefulIncrementalDecoder actually works.
1793    """
1794
1795    test_cases = [
1796        # I=1, O=1 (fixed-length input == fixed-length output)
1797        (b'abcd', False, 'a.b.c.d.'),
1798        # I=0, O=0 (variable-length input, variable-length output)
1799        (b'oiabcd', True, 'abcd.'),
1800        # I=0, O=0 (should ignore extra periods)
1801        (b'oi...abcd...', True, 'abcd.'),
1802        # I=0, O=6 (variable-length input, fixed-length output)
1803        (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1804        # I=2, O=6 (fixed-length input < fixed-length output)
1805        (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1806        # I=6, O=3 (fixed-length input > fixed-length output)
1807        (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1808        # I=0, then 3; O=29, then 15 (with longer output)
1809        (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1810         'a----------------------------.' +
1811         'b----------------------------.' +
1812         'cde--------------------------.' +
1813         'abcdefghijabcde.' +
1814         'a.b------------.' +
1815         '.c.------------.' +
1816         'd.e------------.' +
1817         'k--------------.' +
1818         'l--------------.' +
1819         'm--------------.')
1820    ]
1821
1822    def test_decoder(self):
1823        # Try a few one-shot test cases.
1824        for input, eof, output in self.test_cases:
1825            d = StatefulIncrementalDecoder()
1826            self.assertEqual(d.decode(input, eof), output)
1827
1828        # Also test an unfinished decode, followed by forcing EOF.
1829        d = StatefulIncrementalDecoder()
1830        self.assertEqual(d.decode(b'oiabcd'), '')
1831        self.assertEqual(d.decode(b'', 1), 'abcd.')
1832
1833class TextIOWrapperTest(unittest.TestCase):
1834
1835    def setUp(self):
1836        self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1837        self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
1838        support.unlink(support.TESTFN)
1839
1840    def tearDown(self):
1841        support.unlink(support.TESTFN)
1842
1843    def test_constructor(self):
1844        r = self.BytesIO(b"\xc3\xa9\n\n")
1845        b = self.BufferedReader(r, 1000)
1846        t = self.TextIOWrapper(b)
1847        t.__init__(b, encoding="latin1", newline="\r\n")
1848        self.assertEqual(t.encoding, "latin1")
1849        self.assertEqual(t.line_buffering, False)
1850        t.__init__(b, encoding="utf8", line_buffering=True)
1851        self.assertEqual(t.encoding, "utf8")
1852        self.assertEqual(t.line_buffering, True)
1853        self.assertEqual("\xe9\n", t.readline())
1854        self.assertRaises(TypeError, t.__init__, b, newline=42)
1855        self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1856
1857    def test_detach(self):
1858        r = self.BytesIO()
1859        b = self.BufferedWriter(r)
1860        t = self.TextIOWrapper(b)
1861        self.assertIs(t.detach(), b)
1862
1863        t = self.TextIOWrapper(b, encoding="ascii")
1864        t.write("howdy")
1865        self.assertFalse(r.getvalue())
1866        t.detach()
1867        self.assertEqual(r.getvalue(), b"howdy")
1868        self.assertRaises(ValueError, t.detach)
1869
1870    def test_repr(self):
1871        raw = self.BytesIO("hello".encode("utf-8"))
1872        b = self.BufferedReader(raw)
1873        t = self.TextIOWrapper(b, encoding="utf-8")
1874        modname = self.TextIOWrapper.__module__
1875        self.assertEqual(repr(t),
1876                         "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1877        raw.name = "dummy"
1878        self.assertEqual(repr(t),
1879                         "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
1880        raw.name = b"dummy"
1881        self.assertEqual(repr(t),
1882                         "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1883
1884    def test_line_buffering(self):
1885        r = self.BytesIO()
1886        b = self.BufferedWriter(r, 1000)
1887        t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
1888        t.write("X")
1889        self.assertEqual(r.getvalue(), b"")  # No flush happened
1890        t.write("Y\nZ")
1891        self.assertEqual(r.getvalue(), b"XY\nZ")  # All got flushed
1892        t.write("A\rB")
1893        self.assertEqual(r.getvalue(), b"XY\nZA\rB")
1894
1895    def test_encoding(self):
1896        # Check the encoding attribute is always set, and valid
1897        b = self.BytesIO()
1898        t = self.TextIOWrapper(b, encoding="utf8")
1899        self.assertEqual(t.encoding, "utf8")
1900        t = self.TextIOWrapper(b)
1901        self.assertTrue(t.encoding is not None)
1902        codecs.lookup(t.encoding)
1903
1904    def test_encoding_errors_reading(self):
1905        # (1) default
1906        b = self.BytesIO(b"abc\n\xff\n")
1907        t = self.TextIOWrapper(b, encoding="ascii")
1908        self.assertRaises(UnicodeError, t.read)
1909        # (2) explicit strict
1910        b = self.BytesIO(b"abc\n\xff\n")
1911        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1912        self.assertRaises(UnicodeError, t.read)
1913        # (3) ignore
1914        b = self.BytesIO(b"abc\n\xff\n")
1915        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
1916        self.assertEqual(t.read(), "abc\n\n")
1917        # (4) replace
1918        b = self.BytesIO(b"abc\n\xff\n")
1919        t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
1920        self.assertEqual(t.read(), "abc\n\ufffd\n")
1921
1922    def test_encoding_errors_writing(self):
1923        # (1) default
1924        b = self.BytesIO()
1925        t = self.TextIOWrapper(b, encoding="ascii")
1926        self.assertRaises(UnicodeError, t.write, "\xff")
1927        # (2) explicit strict
1928        b = self.BytesIO()
1929        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
1930        self.assertRaises(UnicodeError, t.write, "\xff")
1931        # (3) ignore
1932        b = self.BytesIO()
1933        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
1934                             newline="\n")
1935        t.write("abc\xffdef\n")
1936        t.flush()
1937        self.assertEqual(b.getvalue(), b"abcdef\n")
1938        # (4) replace
1939        b = self.BytesIO()
1940        t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
1941                             newline="\n")
1942        t.write("abc\xffdef\n")
1943        t.flush()
1944        self.assertEqual(b.getvalue(), b"abc?def\n")
1945
1946    def test_newlines(self):
1947        input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1948
1949        tests = [
1950            [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
1951            [ '', input_lines ],
1952            [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1953            [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1954            [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
1955        ]
1956        encodings = (
1957            'utf-8', 'latin-1',
1958            'utf-16', 'utf-16-le', 'utf-16-be',
1959            'utf-32', 'utf-32-le', 'utf-32-be',
1960        )
1961
1962        # Try a range of buffer sizes to test the case where \r is the last
1963        # character in TextIOWrapper._pending_line.
1964        for encoding in encodings:
1965            # XXX: str.encode() should return bytes
1966            data = bytes(''.join(input_lines).encode(encoding))
1967            for do_reads in (False, True):
1968                for bufsize in range(1, 10):
1969                    for newline, exp_lines in tests:
1970                        bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1971                        textio = self.TextIOWrapper(bufio, newline=newline,
1972                                                  encoding=encoding)
1973                        if do_reads:
1974                            got_lines = []
1975                            while True:
1976                                c2 = textio.read(2)
1977                                if c2 == '':
1978                                    break
1979                                self.assertEqual(len(c2), 2)
1980                                got_lines.append(c2 + textio.readline())
1981                        else:
1982                            got_lines = list(textio)
1983
1984                        for got_line, exp_line in zip(got_lines, exp_lines):
1985                            self.assertEqual(got_line, exp_line)
1986                        self.assertEqual(len(got_lines), len(exp_lines))
1987
1988    def test_newlines_input(self):
1989        testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
1990        normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1991        for newline, expected in [
1992            (None, normalized.decode("ascii").splitlines(True)),
1993            ("", testdata.decode("ascii").splitlines(True)),
1994            ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1995            ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1996            ("\r",  ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
1997            ]:
1998            buf = self.BytesIO(testdata)
1999            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2000            self.assertEqual(txt.readlines(), expected)
2001            txt.seek(0)
2002            self.assertEqual(txt.read(), "".join(expected))
2003
2004    def test_newlines_output(self):
2005        testdict = {
2006            "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2007            "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2008            "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2009            "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2010            }
2011        tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2012        for newline, expected in tests:
2013            buf = self.BytesIO()
2014            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2015            txt.write("AAA\nB")
2016            txt.write("BB\nCCC\n")
2017            txt.write("X\rY\r\nZ")
2018            txt.flush()
2019            self.assertEqual(buf.closed, False)
2020            self.assertEqual(buf.getvalue(), expected)
2021
2022    def test_destructor(self):
2023        l = []
2024        base = self.BytesIO
2025        class MyBytesIO(base):
2026            def close(self):
2027                l.append(self.getvalue())
2028                base.close(self)
2029        b = MyBytesIO()
2030        t = self.TextIOWrapper(b, encoding="ascii")
2031        t.write("abc")
2032        del t
2033        support.gc_collect()
2034        self.assertEqual([b"abc"], l)
2035
2036    def test_override_destructor(self):
2037        record = []
2038        class MyTextIO(self.TextIOWrapper):
2039            def __del__(self):
2040                record.append(1)
2041                try:
2042                    f = super(MyTextIO, self).__del__
2043                except AttributeError:
2044                    pass
2045                else:
2046                    f()
2047            def close(self):
2048                record.append(2)
2049                super(MyTextIO, self).close()
2050            def flush(self):
2051                record.append(3)
2052                super(MyTextIO, self).flush()
2053        b = self.BytesIO()
2054        t = MyTextIO(b, encoding="ascii")
2055        del t
2056        support.gc_collect()
2057        self.assertEqual(record, [1, 2, 3])
2058
2059    def test_error_through_destructor(self):
2060        # Test that the exception state is not modified by a destructor,
2061        # even if close() fails.
2062        rawio = self.CloseFailureIO()
2063        def f():
2064            self.TextIOWrapper(rawio).xyzzy
2065        with support.captured_output("stderr") as s:
2066            self.assertRaises(AttributeError, f)
2067        s = s.getvalue().strip()
2068        if s:
2069            # The destructor *may* have printed an unraisable error, check it
2070            self.assertEqual(len(s.splitlines()), 1)
2071            self.assertTrue(s.startswith("Exception IOError: "), s)
2072            self.assertTrue(s.endswith(" ignored"), s)
2073
2074    # Systematic tests of the text I/O API
2075
2076    def test_basic_io(self):
2077        for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2078            for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
2079                f = self.open(support.TESTFN, "w+", encoding=enc)
2080                f._CHUNK_SIZE = chunksize
2081                self.assertEqual(f.write("abc"), 3)
2082                f.close()
2083                f = self.open(support.TESTFN, "r+", encoding=enc)
2084                f._CHUNK_SIZE = chunksize
2085                self.assertEqual(f.tell(), 0)
2086                self.assertEqual(f.read(), "abc")
2087                cookie = f.tell()
2088                self.assertEqual(f.seek(0), 0)
2089                self.assertEqual(f.read(None), "abc")
2090                f.seek(0)
2091                self.assertEqual(f.read(2), "ab")
2092                self.assertEqual(f.read(1), "c")
2093                self.assertEqual(f.read(1), "")
2094                self.assertEqual(f.read(), "")
2095                self.assertEqual(f.tell(), cookie)
2096                self.assertEqual(f.seek(0), 0)
2097                self.assertEqual(f.seek(0, 2), cookie)
2098                self.assertEqual(f.write("def"), 3)
2099                self.assertEqual(f.seek(cookie), cookie)
2100                self.assertEqual(f.read(), "def")
2101                if enc.startswith("utf"):
2102                    self.multi_line_test(f, enc)
2103                f.close()
2104
2105    def multi_line_test(self, f, enc):
2106        f.seek(0)
2107        f.truncate()
2108        sample = "s\xff\u0fff\uffff"
2109        wlines = []
2110        for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2111            chars = []
2112            for i in range(size):
2113                chars.append(sample[i % len(sample)])
2114            line = "".join(chars) + "\n"
2115            wlines.append((f.tell(), line))
2116            f.write(line)
2117        f.seek(0)
2118        rlines = []
2119        while True:
2120            pos = f.tell()
2121            line = f.readline()
2122            if not line:
2123                break
2124            rlines.append((pos, line))
2125        self.assertEqual(rlines, wlines)
2126
2127    def test_telling(self):
2128        f = self.open(support.TESTFN, "w+", encoding="utf8")
2129        p0 = f.tell()
2130        f.write("\xff\n")
2131        p1 = f.tell()
2132        f.write("\xff\n")
2133        p2 = f.tell()
2134        f.seek(0)
2135        self.assertEqual(f.tell(), p0)
2136        self.assertEqual(f.readline(), "\xff\n")
2137        self.assertEqual(f.tell(), p1)
2138        self.assertEqual(f.readline(), "\xff\n")
2139        self.assertEqual(f.tell(), p2)
2140        f.seek(0)
2141        for line in f:
2142            self.assertEqual(line, "\xff\n")
2143            self.assertRaises(IOError, f.tell)
2144        self.assertEqual(f.tell(), p2)
2145        f.close()
2146
2147    def test_seeking(self):
2148        chunk_size = _default_chunk_size()
2149        prefix_size = chunk_size - 2
2150        u_prefix = "a" * prefix_size
2151        prefix = bytes(u_prefix.encode("utf-8"))
2152        self.assertEqual(len(u_prefix), len(prefix))
2153        u_suffix = "\u8888\n"
2154        suffix = bytes(u_suffix.encode("utf-8"))
2155        line = prefix + suffix
2156        f = self.open(support.TESTFN, "wb")
2157        f.write(line*2)
2158        f.close()
2159        f = self.open(support.TESTFN, "r", encoding="utf-8")
2160        s = f.read(prefix_size)
2161        self.assertEqual(s, prefix.decode("ascii"))
2162        self.assertEqual(f.tell(), prefix_size)
2163        self.assertEqual(f.readline(), u_suffix)
2164
2165    def test_seeking_too(self):
2166        # Regression test for a specific bug
2167        data = b'\xe0\xbf\xbf\n'
2168        f = self.open(support.TESTFN, "wb")
2169        f.write(data)
2170        f.close()
2171        f = self.open(support.TESTFN, "r", encoding="utf-8")
2172        f._CHUNK_SIZE  # Just test that it exists
2173        f._CHUNK_SIZE = 2
2174        f.readline()
2175        f.tell()
2176
2177    def test_seek_and_tell(self):
2178        #Test seek/tell using the StatefulIncrementalDecoder.
2179        # Make test faster by doing smaller seeks
2180        CHUNK_SIZE = 128
2181
2182        def test_seek_and_tell_with_data(data, min_pos=0):
2183            """Tell/seek to various points within a data stream and ensure
2184            that the decoded data returned by read() is consistent."""
2185            f = self.open(support.TESTFN, 'wb')
2186            f.write(data)
2187            f.close()
2188            f = self.open(support.TESTFN, encoding='test_decoder')
2189            f._CHUNK_SIZE = CHUNK_SIZE
2190            decoded = f.read()
2191            f.close()
2192
2193            for i in range(min_pos, len(decoded) + 1): # seek positions
2194                for j in [1, 5, len(decoded) - i]: # read lengths
2195                    f = self.open(support.TESTFN, encoding='test_decoder')
2196                    self.assertEqual(f.read(i), decoded[:i])
2197                    cookie = f.tell()
2198                    self.assertEqual(f.read(j), decoded[i:i + j])
2199                    f.seek(cookie)
2200                    self.assertEqual(f.read(), decoded[i:])
2201                    f.close()
2202
2203        # Enable the test decoder.
2204        StatefulIncrementalDecoder.codecEnabled = 1
2205
2206        # Run the tests.
2207        try:
2208            # Try each test case.
2209            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2210                test_seek_and_tell_with_data(input)
2211
2212            # Position each test case so that it crosses a chunk boundary.
2213            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2214                offset = CHUNK_SIZE - len(input)//2
2215                prefix = b'.'*offset
2216                # Don't bother seeking into the prefix (takes too long).
2217                min_pos = offset*2
2218                test_seek_and_tell_with_data(prefix + input, min_pos)
2219
2220        # Ensure our test decoder won't interfere with subsequent tests.
2221        finally:
2222            StatefulIncrementalDecoder.codecEnabled = 0
2223
2224    def test_encoded_writes(self):
2225        data = "1234567890"
2226        tests = ("utf-16",
2227                 "utf-16-le",
2228                 "utf-16-be",
2229                 "utf-32",
2230                 "utf-32-le",
2231                 "utf-32-be")
2232        for encoding in tests:
2233            buf = self.BytesIO()
2234            f = self.TextIOWrapper(buf, encoding=encoding)
2235            # Check if the BOM is written only once (see issue1753).
2236            f.write(data)
2237            f.write(data)
2238            f.seek(0)
2239            self.assertEqual(f.read(), data * 2)
2240            f.seek(0)
2241            self.assertEqual(f.read(), data * 2)
2242            self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
2243
2244    def test_unreadable(self):
2245        class UnReadable(self.BytesIO):
2246            def readable(self):
2247                return False
2248        txt = self.TextIOWrapper(UnReadable())
2249        self.assertRaises(IOError, txt.read)
2250
2251    def test_read_one_by_one(self):
2252        txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
2253        reads = ""
2254        while True:
2255            c = txt.read(1)
2256            if not c:
2257                break
2258            reads += c
2259        self.assertEqual(reads, "AA\nBB")
2260
2261    def test_readlines(self):
2262        txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2263        self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2264        txt.seek(0)
2265        self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2266        txt.seek(0)
2267        self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2268
2269    # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
2270    def test_read_by_chunk(self):
2271        # make sure "\r\n" straddles 128 char boundary.
2272        txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
2273        reads = ""
2274        while True:
2275            c = txt.read(128)
2276            if not c:
2277                break
2278            reads += c
2279        self.assertEqual(reads, "A"*127+"\nB")
2280
2281    def test_writelines(self):
2282        l = ['ab', 'cd', 'ef']
2283        buf = self.BytesIO()
2284        txt = self.TextIOWrapper(buf)
2285        txt.writelines(l)
2286        txt.flush()
2287        self.assertEqual(buf.getvalue(), b'abcdef')
2288
2289    def test_writelines_userlist(self):
2290        l = UserList(['ab', 'cd', 'ef'])
2291        buf = self.BytesIO()
2292        txt = self.TextIOWrapper(buf)
2293        txt.writelines(l)
2294        txt.flush()
2295        self.assertEqual(buf.getvalue(), b'abcdef')
2296
2297    def test_writelines_error(self):
2298        txt = self.TextIOWrapper(self.BytesIO())
2299        self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2300        self.assertRaises(TypeError, txt.writelines, None)
2301        self.assertRaises(TypeError, txt.writelines, b'abc')
2302
2303    def test_issue1395_1(self):
2304        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2305
2306        # read one char at a time
2307        reads = ""
2308        while True:
2309            c = txt.read(1)
2310            if not c:
2311                break
2312            reads += c
2313        self.assertEqual(reads, self.normalized)
2314
2315    def test_issue1395_2(self):
2316        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2317        txt._CHUNK_SIZE = 4
2318
2319        reads = ""
2320        while True:
2321            c = txt.read(4)
2322            if not c:
2323                break
2324            reads += c
2325        self.assertEqual(reads, self.normalized)
2326
2327    def test_issue1395_3(self):
2328        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2329        txt._CHUNK_SIZE = 4
2330
2331        reads = txt.read(4)
2332        reads += txt.read(4)
2333        reads += txt.readline()
2334        reads += txt.readline()
2335        reads += txt.readline()
2336        self.assertEqual(reads, self.normalized)
2337
2338    def test_issue1395_4(self):
2339        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2340        txt._CHUNK_SIZE = 4
2341
2342        reads = txt.read(4)
2343        reads += txt.read()
2344        self.assertEqual(reads, self.normalized)
2345
2346    def test_issue1395_5(self):
2347        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2348        txt._CHUNK_SIZE = 4
2349
2350        reads = txt.read(4)
2351        pos = txt.tell()
2352        txt.seek(0)
2353        txt.seek(pos)
2354        self.assertEqual(txt.read(4), "BBB\n")
2355
2356    def test_issue2282(self):
2357        buffer = self.BytesIO(self.testdata)
2358        txt = self.TextIOWrapper(buffer, encoding="ascii")
2359
2360        self.assertEqual(buffer.seekable(), txt.seekable())
2361
2362    def test_append_bom(self):
2363        # The BOM is not written again when appending to a non-empty file
2364        filename = support.TESTFN
2365        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2366            with self.open(filename, 'w', encoding=charset) as f:
2367                f.write('aaa')
2368                pos = f.tell()
2369            with self.open(filename, 'rb') as f:
2370                self.assertEqual(f.read(), 'aaa'.encode(charset))
2371
2372            with self.open(filename, 'a', encoding=charset) as f:
2373                f.write('xxx')
2374            with self.open(filename, 'rb') as f:
2375                self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
2376
2377    def test_seek_bom(self):
2378        # Same test, but when seeking manually
2379        filename = support.TESTFN
2380        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2381            with self.open(filename, 'w', encoding=charset) as f:
2382                f.write('aaa')
2383                pos = f.tell()
2384            with self.open(filename, 'r+', encoding=charset) as f:
2385                f.seek(pos)
2386                f.write('zzz')
2387                f.seek(0)
2388                f.write('bbb')
2389            with self.open(filename, 'rb') as f:
2390                self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
2391
2392    def test_errors_property(self):
2393        with self.open(support.TESTFN, "w") as f:
2394            self.assertEqual(f.errors, "strict")
2395        with self.open(support.TESTFN, "w", errors="replace") as f:
2396            self.assertEqual(f.errors, "replace")
2397
2398    @unittest.skipUnless(threading, 'Threading required for this test.')
2399    def test_threads_write(self):
2400        # Issue6750: concurrent writes could duplicate data
2401        event = threading.Event()
2402        with self.open(support.TESTFN, "w", buffering=1) as f:
2403            def run(n):
2404                text = "Thread%03d\n" % n
2405                event.wait()
2406                f.write(text)
2407            threads = [threading.Thread(target=lambda n=x: run(n))
2408                       for x in range(20)]
2409            for t in threads:
2410                t.start()
2411            time.sleep(0.02)
2412            event.set()
2413            for t in threads:
2414                t.join()
2415        with self.open(support.TESTFN) as f:
2416            content = f.read()
2417            for n in range(20):
2418                self.assertEqual(content.count("Thread%03d\n" % n), 1)
2419
2420    def test_flush_error_on_close(self):
2421        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2422        def bad_flush():
2423            raise IOError()
2424        txt.flush = bad_flush
2425        self.assertRaises(IOError, txt.close) # exception not swallowed
2426        self.assertTrue(txt.closed)
2427
2428    def test_multi_close(self):
2429        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2430        txt.close()
2431        txt.close()
2432        txt.close()
2433        self.assertRaises(ValueError, txt.flush)
2434
2435    def test_readonly_attributes(self):
2436        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2437        buf = self.BytesIO(self.testdata)
2438        with self.assertRaises((AttributeError, TypeError)):
2439            txt.buffer = buf
2440
2441    def test_read_nonbytes(self):
2442        # Issue #17106
2443        # Crash when underlying read() returns non-bytes
2444        class NonbytesStream(self.StringIO):
2445            read1 = self.StringIO.read
2446        class NonbytesStream(self.StringIO):
2447            read1 = self.StringIO.read
2448        t = self.TextIOWrapper(NonbytesStream('a'))
2449        with self.maybeRaises(TypeError):
2450            t.read(1)
2451        t = self.TextIOWrapper(NonbytesStream('a'))
2452        with self.maybeRaises(TypeError):
2453            t.readline()
2454        t = self.TextIOWrapper(NonbytesStream('a'))
2455        self.assertEqual(t.read(), u'a')
2456
2457    def test_illegal_decoder(self):
2458        # Issue #17106
2459        # Crash when decoder returns non-string
2460        t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2461                               encoding='quopri_codec')
2462        with self.maybeRaises(TypeError):
2463            t.read(1)
2464        t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2465                               encoding='quopri_codec')
2466        with self.maybeRaises(TypeError):
2467            t.readline()
2468        t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2469                               encoding='quopri_codec')
2470        with self.maybeRaises(TypeError):
2471            t.read()
2472
2473
2474class CTextIOWrapperTest(TextIOWrapperTest):
2475
2476    def test_initialization(self):
2477        r = self.BytesIO(b"\xc3\xa9\n\n")
2478        b = self.BufferedReader(r, 1000)
2479        t = self.TextIOWrapper(b)
2480        self.assertRaises(TypeError, t.__init__, b, newline=42)
2481        self.assertRaises(ValueError, t.read)
2482        self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2483        self.assertRaises(ValueError, t.read)
2484
2485    def test_garbage_collection(self):
2486        # C TextIOWrapper objects are collected, and collecting them flushes
2487        # all data to disk.
2488        # The Python version has __del__, so it ends in gc.garbage instead.
2489        rawio = io.FileIO(support.TESTFN, "wb")
2490        b = self.BufferedWriter(rawio)
2491        t = self.TextIOWrapper(b, encoding="ascii")
2492        t.write("456def")
2493        t.x = t
2494        wr = weakref.ref(t)
2495        del t
2496        support.gc_collect()
2497        self.assertTrue(wr() is None, wr)
2498        with self.open(support.TESTFN, "rb") as f:
2499            self.assertEqual(f.read(), b"456def")
2500
2501    def test_rwpair_cleared_before_textio(self):
2502        # Issue 13070: TextIOWrapper's finalization would crash when called
2503        # after the reference to the underlying BufferedRWPair's writer got
2504        # cleared by the GC.
2505        for i in range(1000):
2506            b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2507            t1 = self.TextIOWrapper(b1, encoding="ascii")
2508            b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2509            t2 = self.TextIOWrapper(b2, encoding="ascii")
2510            # circular references
2511            t1.buddy = t2
2512            t2.buddy = t1
2513        support.gc_collect()
2514
2515    maybeRaises = unittest.TestCase.assertRaises
2516
2517
2518class PyTextIOWrapperTest(TextIOWrapperTest):
2519    @contextlib.contextmanager
2520    def maybeRaises(self, *args, **kwds):
2521        yield
2522
2523
2524class IncrementalNewlineDecoderTest(unittest.TestCase):
2525
2526    def check_newline_decoding_utf8(self, decoder):
2527        # UTF-8 specific tests for a newline decoder
2528        def _check_decode(b, s, **kwargs):
2529            # We exercise getstate() / setstate() as well as decode()
2530            state = decoder.getstate()
2531            self.assertEqual(decoder.decode(b, **kwargs), s)
2532            decoder.setstate(state)
2533            self.assertEqual(decoder.decode(b, **kwargs), s)
2534
2535        _check_decode(b'\xe8\xa2\x88', "\u8888")
2536
2537        _check_decode(b'\xe8', "")
2538        _check_decode(b'\xa2', "")
2539        _check_decode(b'\x88', "\u8888")
2540
2541        _check_decode(b'\xe8', "")
2542        _check_decode(b'\xa2', "")
2543        _check_decode(b'\x88', "\u8888")
2544
2545        _check_decode(b'\xe8', "")
2546        self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2547
2548        decoder.reset()
2549        _check_decode(b'\n', "\n")
2550        _check_decode(b'\r', "")
2551        _check_decode(b'', "\n", final=True)
2552        _check_decode(b'\r', "\n", final=True)
2553
2554        _check_decode(b'\r', "")
2555        _check_decode(b'a', "\na")
2556
2557        _check_decode(b'\r\r\n', "\n\n")
2558        _check_decode(b'\r', "")
2559        _check_decode(b'\r', "\n")
2560        _check_decode(b'\na', "\na")
2561
2562        _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2563        _check_decode(b'\xe8\xa2\x88', "\u8888")
2564        _check_decode(b'\n', "\n")
2565        _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2566        _check_decode(b'\n', "\n")
2567
2568    def check_newline_decoding(self, decoder, encoding):
2569        result = []
2570        if encoding is not None:
2571            encoder = codecs.getincrementalencoder(encoding)()
2572            def _decode_bytewise(s):
2573                # Decode one byte at a time
2574                for b in encoder.encode(s):
2575                    result.append(decoder.decode(b))
2576        else:
2577            encoder = None
2578            def _decode_bytewise(s):
2579                # Decode one char at a time
2580                for c in s:
2581                    result.append(decoder.decode(c))
2582        self.assertEqual(decoder.newlines, None)
2583        _decode_bytewise("abc\n\r")
2584        self.assertEqual(decoder.newlines, '\n')
2585        _decode_bytewise("\nabc")
2586        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
2587        _decode_bytewise("abc\r")
2588        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
2589        _decode_bytewise("abc")
2590        self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
2591        _decode_bytewise("abc\r")
2592        self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
2593        decoder.reset()
2594        input = "abc"
2595        if encoder is not None:
2596            encoder.reset()
2597            input = encoder.encode(input)
2598        self.assertEqual(decoder.decode(input), "abc")
2599        self.assertEqual(decoder.newlines, None)
2600
2601    def test_newline_decoder(self):
2602        encodings = (
2603            # None meaning the IncrementalNewlineDecoder takes unicode input
2604            # rather than bytes input
2605            None, 'utf-8', 'latin-1',
2606            'utf-16', 'utf-16-le', 'utf-16-be',
2607            'utf-32', 'utf-32-le', 'utf-32-be',
2608        )
2609        for enc in encodings:
2610            decoder = enc and codecs.getincrementaldecoder(enc)()
2611            decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2612            self.check_newline_decoding(decoder, enc)
2613        decoder = codecs.getincrementaldecoder("utf-8")()
2614        decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2615        self.check_newline_decoding_utf8(decoder)
2616
2617    def test_newline_bytes(self):
2618        # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2619        def _check(dec):
2620            self.assertEqual(dec.newlines, None)
2621            self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2622            self.assertEqual(dec.newlines, None)
2623            self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2624            self.assertEqual(dec.newlines, None)
2625        dec = self.IncrementalNewlineDecoder(None, translate=False)
2626        _check(dec)
2627        dec = self.IncrementalNewlineDecoder(None, translate=True)
2628        _check(dec)
2629
2630class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2631    pass
2632
2633class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2634    pass
2635
2636
2637# XXX Tests for open()
2638
2639class MiscIOTest(unittest.TestCase):
2640
2641    def tearDown(self):
2642        support.unlink(support.TESTFN)
2643
2644    def test___all__(self):
2645        for name in self.io.__all__:
2646            obj = getattr(self.io, name, None)
2647            self.assertTrue(obj is not None, name)
2648            if name == "open":
2649                continue
2650            elif "error" in name.lower() or name == "UnsupportedOperation":
2651                self.assertTrue(issubclass(obj, Exception), name)
2652            elif not name.startswith("SEEK_"):
2653                self.assertTrue(issubclass(obj, self.IOBase))
2654
2655    def test_attributes(self):
2656        f = self.open(support.TESTFN, "wb", buffering=0)
2657        self.assertEqual(f.mode, "wb")
2658        f.close()
2659
2660        f = self.open(support.TESTFN, "U")
2661        self.assertEqual(f.name,            support.TESTFN)
2662        self.assertEqual(f.buffer.name,     support.TESTFN)
2663        self.assertEqual(f.buffer.raw.name, support.TESTFN)
2664        self.assertEqual(f.mode,            "U")
2665        self.assertEqual(f.buffer.mode,     "rb")
2666        self.assertEqual(f.buffer.raw.mode, "rb")
2667        f.close()
2668
2669        f = self.open(support.TESTFN, "w+")
2670        self.assertEqual(f.mode,            "w+")
2671        self.assertEqual(f.buffer.mode,     "rb+") # Does it really matter?
2672        self.assertEqual(f.buffer.raw.mode, "rb+")
2673
2674        g = self.open(f.fileno(), "wb", closefd=False)
2675        self.assertEqual(g.mode,     "wb")
2676        self.assertEqual(g.raw.mode, "wb")
2677        self.assertEqual(g.name,     f.fileno())
2678        self.assertEqual(g.raw.name, f.fileno())
2679        f.close()
2680        g.close()
2681
2682    def test_io_after_close(self):
2683        for kwargs in [
2684                {"mode": "w"},
2685                {"mode": "wb"},
2686                {"mode": "w", "buffering": 1},
2687                {"mode": "w", "buffering": 2},
2688                {"mode": "wb", "buffering": 0},
2689                {"mode": "r"},
2690                {"mode": "rb"},
2691                {"mode": "r", "buffering": 1},
2692                {"mode": "r", "buffering": 2},
2693                {"mode": "rb", "buffering": 0},
2694                {"mode": "w+"},
2695                {"mode": "w+b"},
2696                {"mode": "w+", "buffering": 1},
2697                {"mode": "w+", "buffering": 2},
2698                {"mode": "w+b", "buffering": 0},
2699            ]:
2700            f = self.open(support.TESTFN, **kwargs)
2701            f.close()
2702            self.assertRaises(ValueError, f.flush)
2703            self.assertRaises(ValueError, f.fileno)
2704            self.assertRaises(ValueError, f.isatty)
2705            self.assertRaises(ValueError, f.__iter__)
2706            if hasattr(f, "peek"):
2707                self.assertRaises(ValueError, f.peek, 1)
2708            self.assertRaises(ValueError, f.read)
2709            if hasattr(f, "read1"):
2710                self.assertRaises(ValueError, f.read1, 1024)
2711            if hasattr(f, "readall"):
2712                self.assertRaises(ValueError, f.readall)
2713            if hasattr(f, "readinto"):
2714                self.assertRaises(ValueError, f.readinto, bytearray(1024))
2715            self.assertRaises(ValueError, f.readline)
2716            self.assertRaises(ValueError, f.readlines)
2717            self.assertRaises(ValueError, f.seek, 0)
2718            self.assertRaises(ValueError, f.tell)
2719            self.assertRaises(ValueError, f.truncate)
2720            self.assertRaises(ValueError, f.write,
2721                              b"" if "b" in kwargs['mode'] else "")
2722            self.assertRaises(ValueError, f.writelines, [])
2723            self.assertRaises(ValueError, next, f)
2724
2725    def test_blockingioerror(self):
2726        # Various BlockingIOError issues
2727        self.assertRaises(TypeError, self.BlockingIOError)
2728        self.assertRaises(TypeError, self.BlockingIOError, 1)
2729        self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2730        self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2731        b = self.BlockingIOError(1, "")
2732        self.assertEqual(b.characters_written, 0)
2733        class C(unicode):
2734            pass
2735        c = C("")
2736        b = self.BlockingIOError(1, c)
2737        c.b = b
2738        b.c = c
2739        wr = weakref.ref(c)
2740        del c, b
2741        support.gc_collect()
2742        self.assertTrue(wr() is None, wr)
2743
2744    def test_abcs(self):
2745        # Test the visible base classes are ABCs.
2746        self.assertIsInstance(self.IOBase, abc.ABCMeta)
2747        self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2748        self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2749        self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
2750
2751    def _check_abc_inheritance(self, abcmodule):
2752        with self.open(support.TESTFN, "wb", buffering=0) as f:
2753            self.assertIsInstance(f, abcmodule.IOBase)
2754            self.assertIsInstance(f, abcmodule.RawIOBase)
2755            self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2756            self.assertNotIsInstance(f, abcmodule.TextIOBase)
2757        with self.open(support.TESTFN, "wb") as f:
2758            self.assertIsInstance(f, abcmodule.IOBase)
2759            self.assertNotIsInstance(f, abcmodule.RawIOBase)
2760            self.assertIsInstance(f, abcmodule.BufferedIOBase)
2761            self.assertNotIsInstance(f, abcmodule.TextIOBase)
2762        with self.open(support.TESTFN, "w") as f:
2763            self.assertIsInstance(f, abcmodule.IOBase)
2764            self.assertNotIsInstance(f, abcmodule.RawIOBase)
2765            self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2766            self.assertIsInstance(f, abcmodule.TextIOBase)
2767
2768    def test_abc_inheritance(self):
2769        # Test implementations inherit from their respective ABCs
2770        self._check_abc_inheritance(self)
2771
2772    def test_abc_inheritance_official(self):
2773        # Test implementations inherit from the official ABCs of the
2774        # baseline "io" module.
2775        self._check_abc_inheritance(io)
2776
2777    @unittest.skipUnless(fcntl, 'fcntl required for this test')
2778    def test_nonblock_pipe_write_bigbuf(self):
2779        self._test_nonblock_pipe_write(16*1024)
2780
2781    @unittest.skipUnless(fcntl, 'fcntl required for this test')
2782    def test_nonblock_pipe_write_smallbuf(self):
2783        self._test_nonblock_pipe_write(1024)
2784
2785    def _set_non_blocking(self, fd):
2786        flags = fcntl.fcntl(fd, fcntl.F_GETFL)
2787        self.assertNotEqual(flags, -1)
2788        res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
2789        self.assertEqual(res, 0)
2790
2791    def _test_nonblock_pipe_write(self, bufsize):
2792        sent = []
2793        received = []
2794        r, w = os.pipe()
2795        self._set_non_blocking(r)
2796        self._set_non_blocking(w)
2797
2798        # To exercise all code paths in the C implementation we need
2799        # to play with buffer sizes.  For instance, if we choose a
2800        # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
2801        # then we will never get a partial write of the buffer.
2802        rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
2803        wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
2804
2805        with rf, wf:
2806            for N in 9999, 73, 7574:
2807                try:
2808                    i = 0
2809                    while True:
2810                        msg = bytes([i % 26 + 97]) * N
2811                        sent.append(msg)
2812                        wf.write(msg)
2813                        i += 1
2814
2815                except self.BlockingIOError as e:
2816                    self.assertEqual(e.args[0], errno.EAGAIN)
2817                    sent[-1] = sent[-1][:e.characters_written]
2818                    received.append(rf.read())
2819                    msg = b'BLOCKED'
2820                    wf.write(msg)
2821                    sent.append(msg)
2822
2823            while True:
2824                try:
2825                    wf.flush()
2826                    break
2827                except self.BlockingIOError as e:
2828                    self.assertEqual(e.args[0], errno.EAGAIN)
2829                    self.assertEqual(e.characters_written, 0)
2830                    received.append(rf.read())
2831
2832            received += iter(rf.read, None)
2833
2834        sent, received = b''.join(sent), b''.join(received)
2835        self.assertTrue(sent == received)
2836        self.assertTrue(wf.closed)
2837        self.assertTrue(rf.closed)
2838
2839class CMiscIOTest(MiscIOTest):
2840    io = io
2841
2842class PyMiscIOTest(MiscIOTest):
2843    io = pyio
2844
2845
2846@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
2847class SignalsTest(unittest.TestCase):
2848
2849    def setUp(self):
2850        self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
2851
2852    def tearDown(self):
2853        signal.signal(signal.SIGALRM, self.oldalrm)
2854
2855    def alarm_interrupt(self, sig, frame):
2856        1 // 0
2857
2858    @unittest.skipUnless(threading, 'Threading required for this test.')
2859    @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
2860                     'issue #12429: skip test on FreeBSD <= 7')
2861    def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
2862        """Check that a partial write, when it gets interrupted, properly
2863        invokes the signal handler, and bubbles up the exception raised
2864        in the latter."""
2865        read_results = []
2866        def _read():
2867            s = os.read(r, 1)
2868            read_results.append(s)
2869        t = threading.Thread(target=_read)
2870        t.daemon = True
2871        r, w = os.pipe()
2872        try:
2873            wio = self.io.open(w, **fdopen_kwargs)
2874            t.start()
2875            signal.alarm(1)
2876            # Fill the pipe enough that the write will be blocking.
2877            # It will be interrupted by the timer armed above.  Since the
2878            # other thread has read one byte, the low-level write will
2879            # return with a successful (partial) result rather than an EINTR.
2880            # The buffered IO layer must check for pending signal
2881            # handlers, which in this case will invoke alarm_interrupt().
2882            self.assertRaises(ZeroDivisionError,
2883                        wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1))
2884            t.join()
2885            # We got one byte, get another one and check that it isn't a
2886            # repeat of the first one.
2887            read_results.append(os.read(r, 1))
2888            self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
2889        finally:
2890            os.close(w)
2891            os.close(r)
2892            # This is deliberate. If we didn't close the file descriptor
2893            # before closing wio, wio would try to flush its internal
2894            # buffer, and block again.
2895            try:
2896                wio.close()
2897            except IOError as e:
2898                if e.errno != errno.EBADF:
2899                    raise
2900
2901    def test_interrupted_write_unbuffered(self):
2902        self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
2903
2904    def test_interrupted_write_buffered(self):
2905        self.check_interrupted_write(b"xy", b"xy", mode="wb")
2906
2907    def test_interrupted_write_text(self):
2908        self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
2909
2910    def check_reentrant_write(self, data, **fdopen_kwargs):
2911        def on_alarm(*args):
2912            # Will be called reentrantly from the same thread
2913            wio.write(data)
2914            1//0
2915        signal.signal(signal.SIGALRM, on_alarm)
2916        r, w = os.pipe()
2917        wio = self.io.open(w, **fdopen_kwargs)
2918        try:
2919            signal.alarm(1)
2920            # Either the reentrant call to wio.write() fails with RuntimeError,
2921            # or the signal handler raises ZeroDivisionError.
2922            with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
2923                while 1:
2924                    for i in range(100):
2925                        wio.write(data)
2926                        wio.flush()
2927                    # Make sure the buffer doesn't fill up and block further writes
2928                    os.read(r, len(data) * 100)
2929            exc = cm.exception
2930            if isinstance(exc, RuntimeError):
2931                self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
2932        finally:
2933            wio.close()
2934            os.close(r)
2935
2936    def test_reentrant_write_buffered(self):
2937        self.check_reentrant_write(b"xy", mode="wb")
2938
2939    def test_reentrant_write_text(self):
2940        self.check_reentrant_write("xy", mode="w", encoding="ascii")
2941
2942    def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
2943        """Check that a buffered read, when it gets interrupted (either
2944        returning a partial result or EINTR), properly invokes the signal
2945        handler and retries if the latter returned successfully."""
2946        r, w = os.pipe()
2947        fdopen_kwargs["closefd"] = False
2948        def alarm_handler(sig, frame):
2949            os.write(w, b"bar")
2950        signal.signal(signal.SIGALRM, alarm_handler)
2951        try:
2952            rio = self.io.open(r, **fdopen_kwargs)
2953            os.write(w, b"foo")
2954            signal.alarm(1)
2955            # Expected behaviour:
2956            # - first raw read() returns partial b"foo"
2957            # - second raw read() returns EINTR
2958            # - third raw read() returns b"bar"
2959            self.assertEqual(decode(rio.read(6)), "foobar")
2960        finally:
2961            rio.close()
2962            os.close(w)
2963            os.close(r)
2964
2965    def test_interrupterd_read_retry_buffered(self):
2966        self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
2967                                          mode="rb")
2968
2969    def test_interrupterd_read_retry_text(self):
2970        self.check_interrupted_read_retry(lambda x: x,
2971                                          mode="r")
2972
2973    @unittest.skipUnless(threading, 'Threading required for this test.')
2974    def check_interrupted_write_retry(self, item, **fdopen_kwargs):
2975        """Check that a buffered write, when it gets interrupted (either
2976        returning a partial result or EINTR), properly invokes the signal
2977        handler and retries if the latter returned successfully."""
2978        select = support.import_module("select")
2979        # A quantity that exceeds the buffer size of an anonymous pipe's
2980        # write end.
2981        N = support.PIPE_MAX_SIZE
2982        r, w = os.pipe()
2983        fdopen_kwargs["closefd"] = False
2984        # We need a separate thread to read from the pipe and allow the
2985        # write() to finish.  This thread is started after the SIGALRM is
2986        # received (forcing a first EINTR in write()).
2987        read_results = []
2988        write_finished = False
2989        def _read():
2990            while not write_finished:
2991                while r in select.select([r], [], [], 1.0)[0]:
2992                    s = os.read(r, 1024)
2993                    read_results.append(s)
2994        t = threading.Thread(target=_read)
2995        t.daemon = True
2996        def alarm1(sig, frame):
2997            signal.signal(signal.SIGALRM, alarm2)
2998            signal.alarm(1)
2999        def alarm2(sig, frame):
3000            t.start()
3001        signal.signal(signal.SIGALRM, alarm1)
3002        try:
3003            wio = self.io.open(w, **fdopen_kwargs)
3004            signal.alarm(1)
3005            # Expected behaviour:
3006            # - first raw write() is partial (because of the limited pipe buffer
3007            #   and the first alarm)
3008            # - second raw write() returns EINTR (because of the second alarm)
3009            # - subsequent write()s are successful (either partial or complete)
3010            self.assertEqual(N, wio.write(item * N))
3011            wio.flush()
3012            write_finished = True
3013            t.join()
3014            self.assertEqual(N, sum(len(x) for x in read_results))
3015        finally:
3016            write_finished = True
3017            os.close(w)
3018            os.close(r)
3019            # This is deliberate. If we didn't close the file descriptor
3020            # before closing wio, wio would try to flush its internal
3021            # buffer, and could block (in case of failure).
3022            try:
3023                wio.close()
3024            except IOError as e:
3025                if e.errno != errno.EBADF:
3026                    raise
3027
3028    def test_interrupterd_write_retry_buffered(self):
3029        self.check_interrupted_write_retry(b"x", mode="wb")
3030
3031    def test_interrupterd_write_retry_text(self):
3032        self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3033
3034
3035class CSignalsTest(SignalsTest):
3036    io = io
3037
3038class PySignalsTest(SignalsTest):
3039    io = pyio
3040
3041    # Handling reentrancy issues would slow down _pyio even more, so the
3042    # tests are disabled.
3043    test_reentrant_write_buffered = None
3044    test_reentrant_write_text = None
3045
3046
3047def test_main():
3048    tests = (CIOTest, PyIOTest,
3049             CBufferedReaderTest, PyBufferedReaderTest,
3050             CBufferedWriterTest, PyBufferedWriterTest,
3051             CBufferedRWPairTest, PyBufferedRWPairTest,
3052             CBufferedRandomTest, PyBufferedRandomTest,
3053             StatefulIncrementalDecoderTest,
3054             CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3055             CTextIOWrapperTest, PyTextIOWrapperTest,
3056             CMiscIOTest, PyMiscIOTest,
3057             CSignalsTest, PySignalsTest,
3058             )
3059
3060    # Put the namespaces of the IO module we are testing and some useful mock
3061    # classes in the __dict__ of each test.
3062    mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
3063             MockNonBlockWriterIO, MockRawIOWithoutRead)
3064    all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3065    c_io_ns = dict((name, getattr(io, name)) for name in all_members)
3066    py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
3067    globs = globals()
3068    c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3069    py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3070    # Avoid turning open into a bound method.
3071    py_io_ns["open"] = pyio.OpenWrapper
3072    for test in tests:
3073        if test.__name__.startswith("C"):
3074            for name, obj in c_io_ns.items():
3075                setattr(test, name, obj)
3076        elif test.__name__.startswith("Py"):
3077            for name, obj in py_io_ns.items():
3078                setattr(test, name, obj)
3079
3080    support.run_unittest(*tests)
3081
3082if __name__ == "__main__":
3083    test_main()
3084