1"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11#     (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as an attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
22from __future__ import print_function
23from __future__ import unicode_literals
24
25import os
26import sys
27import time
28import array
29import random
30import unittest
31import weakref
32import warnings
33import abc
34import signal
35import errno
36from itertools import cycle, count
37from collections import deque
38from UserList import UserList
39from test import test_support as support
40import contextlib
41
42import codecs
43import io  # C implementation of io
44import _pyio as pyio # Python implementation of io
45try:
46    import threading
47except ImportError:
48    threading = None
49try:
50    import fcntl
51except ImportError:
52    fcntl = None
53
54__metaclass__ = type
55bytes = support.py3k_bytes
56
57def byteslike(*pos, **kw):
58    return memoryview(bytearray(*pos, **kw))
59
60def _default_chunk_size():
61    """Get the default TextIOWrapper chunk size"""
62    with io.open(__file__, "r", encoding="latin1") as f:
63        return f._CHUNK_SIZE
64
65
66class MockRawIOWithoutRead:
67    """A RawIO implementation without read(), so as to exercise the default
68    RawIO.read() which calls readinto()."""
69
70    def __init__(self, read_stack=()):
71        self._read_stack = list(read_stack)
72        self._write_stack = []
73        self._reads = 0
74        self._extraneous_reads = 0
75
76    def write(self, b):
77        self._write_stack.append(bytes(b))
78        return len(b)
79
80    def writable(self):
81        return True
82
83    def fileno(self):
84        return 42
85
86    def readable(self):
87        return True
88
89    def seekable(self):
90        return True
91
92    def seek(self, pos, whence):
93        return 0   # wrong but we gotta return something
94
95    def tell(self):
96        return 0   # same comment as above
97
98    def readinto(self, buf):
99        self._reads += 1
100        max_len = len(buf)
101        try:
102            data = self._read_stack[0]
103        except IndexError:
104            self._extraneous_reads += 1
105            return 0
106        if data is None:
107            del self._read_stack[0]
108            return None
109        n = len(data)
110        if len(data) <= max_len:
111            del self._read_stack[0]
112            buf[:n] = data
113            return n
114        else:
115            buf[:] = data[:max_len]
116            self._read_stack[0] = data[max_len:]
117            return max_len
118
119    def truncate(self, pos=None):
120        return pos
121
122class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
123    pass
124
125class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
126    pass
127
128
129class MockRawIO(MockRawIOWithoutRead):
130
131    def read(self, n=None):
132        self._reads += 1
133        try:
134            return self._read_stack.pop(0)
135        except:
136            self._extraneous_reads += 1
137            return b""
138
139class CMockRawIO(MockRawIO, io.RawIOBase):
140    pass
141
142class PyMockRawIO(MockRawIO, pyio.RawIOBase):
143    pass
144
145
146class MisbehavedRawIO(MockRawIO):
147    def write(self, b):
148        return MockRawIO.write(self, b) * 2
149
150    def read(self, n=None):
151        return MockRawIO.read(self, n) * 2
152
153    def seek(self, pos, whence):
154        return -123
155
156    def tell(self):
157        return -456
158
159    def readinto(self, buf):
160        MockRawIO.readinto(self, buf)
161        return len(buf) * 5
162
163class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
164    pass
165
166class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
167    pass
168
169
170class CloseFailureIO(MockRawIO):
171    closed = 0
172
173    def close(self):
174        if not self.closed:
175            self.closed = 1
176            raise IOError
177
178class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
179    pass
180
181class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
182    pass
183
184
185class MockFileIO:
186
187    def __init__(self, data):
188        self.read_history = []
189        super(MockFileIO, self).__init__(data)
190
191    def read(self, n=None):
192        res = super(MockFileIO, self).read(n)
193        self.read_history.append(None if res is None else len(res))
194        return res
195
196    def readinto(self, b):
197        res = super(MockFileIO, self).readinto(b)
198        self.read_history.append(res)
199        return res
200
201class CMockFileIO(MockFileIO, io.BytesIO):
202    pass
203
204class PyMockFileIO(MockFileIO, pyio.BytesIO):
205    pass
206
207
208class MockNonBlockWriterIO:
209
210    def __init__(self):
211        self._write_stack = []
212        self._blocker_char = None
213
214    def pop_written(self):
215        s = b"".join(self._write_stack)
216        self._write_stack[:] = []
217        return s
218
219    def block_on(self, char):
220        """Block when a given char is encountered."""
221        self._blocker_char = char
222
223    def readable(self):
224        return True
225
226    def seekable(self):
227        return True
228
229    def writable(self):
230        return True
231
232    def write(self, b):
233        b = bytes(b)
234        n = -1
235        if self._blocker_char:
236            try:
237                n = b.index(self._blocker_char)
238            except ValueError:
239                pass
240            else:
241                if n > 0:
242                    # write data up to the first blocker
243                    self._write_stack.append(b[:n])
244                    return n
245                else:
246                    # cancel blocker and indicate would block
247                    self._blocker_char = None
248                    return None
249        self._write_stack.append(b)
250        return len(b)
251
252class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
253    BlockingIOError = io.BlockingIOError
254
255class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
256    BlockingIOError = pyio.BlockingIOError
257
258
259class IOTest(unittest.TestCase):
260
261    def setUp(self):
262        support.unlink(support.TESTFN)
263
264    def tearDown(self):
265        support.unlink(support.TESTFN)
266
267    def write_ops(self, f):
268        self.assertEqual(f.write(b"blah."), 5)
269        f.truncate(0)
270        self.assertEqual(f.tell(), 5)
271        f.seek(0)
272
273        self.assertEqual(f.write(b"blah."), 5)
274        self.assertEqual(f.seek(0), 0)
275        self.assertEqual(f.write(b"Hello."), 6)
276        self.assertEqual(f.tell(), 6)
277        self.assertEqual(f.seek(-1, 1), 5)
278        self.assertEqual(f.tell(), 5)
279        buffer = bytearray(b" world\n\n\n")
280        self.assertEqual(f.write(buffer), 9)
281        buffer[:] = b"*" * 9  # Overwrite our copy of the data
282        self.assertEqual(f.seek(0), 0)
283        self.assertEqual(f.write(b"h"), 1)
284        self.assertEqual(f.seek(-1, 2), 13)
285        self.assertEqual(f.tell(), 13)
286
287        self.assertEqual(f.truncate(12), 12)
288        self.assertEqual(f.tell(), 13)
289        self.assertRaises(TypeError, f.seek, 0.0)
290
291    def read_ops(self, f, buffered=False):
292        data = f.read(5)
293        self.assertEqual(data, b"hello")
294        data = byteslike(data)
295        self.assertEqual(f.readinto(data), 5)
296        self.assertEqual(data.tobytes(), b" worl")
297        data = bytearray(5)
298        self.assertEqual(f.readinto(data), 2)
299        self.assertEqual(len(data), 5)
300        self.assertEqual(data[:2], b"d\n")
301        self.assertEqual(f.seek(0), 0)
302        self.assertEqual(f.read(20), b"hello world\n")
303        self.assertEqual(f.read(1), b"")
304        self.assertEqual(f.readinto(byteslike(b"x")), 0)
305        self.assertEqual(f.seek(-6, 2), 6)
306        self.assertEqual(f.read(5), b"world")
307        self.assertEqual(f.read(0), b"")
308        self.assertEqual(f.readinto(byteslike()), 0)
309        self.assertEqual(f.seek(-6, 1), 5)
310        self.assertEqual(f.read(5), b" worl")
311        self.assertEqual(f.tell(), 10)
312        self.assertRaises(TypeError, f.seek, 0.0)
313        if buffered:
314            f.seek(0)
315            self.assertEqual(f.read(), b"hello world\n")
316            f.seek(6)
317            self.assertEqual(f.read(), b"world\n")
318            self.assertEqual(f.read(), b"")
319
320    LARGE = 2**31
321
322    def large_file_ops(self, f):
323        assert f.readable()
324        assert f.writable()
325        self.assertEqual(f.seek(self.LARGE), self.LARGE)
326        self.assertEqual(f.tell(), self.LARGE)
327        self.assertEqual(f.write(b"xxx"), 3)
328        self.assertEqual(f.tell(), self.LARGE + 3)
329        self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
330        self.assertEqual(f.truncate(), self.LARGE + 2)
331        self.assertEqual(f.tell(), self.LARGE + 2)
332        self.assertEqual(f.seek(0, 2), self.LARGE + 2)
333        self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
334        self.assertEqual(f.tell(), self.LARGE + 2)
335        self.assertEqual(f.seek(0, 2), self.LARGE + 1)
336        self.assertEqual(f.seek(-1, 2), self.LARGE)
337        self.assertEqual(f.read(2), b"x")
338
339    def test_invalid_operations(self):
340        # Try writing on a file opened in read mode and vice-versa.
341        for mode in ("w", "wb"):
342            with self.open(support.TESTFN, mode) as fp:
343                self.assertRaises(IOError, fp.read)
344                self.assertRaises(IOError, fp.readline)
345        with self.open(support.TESTFN, "rb") as fp:
346            self.assertRaises(IOError, fp.write, b"blah")
347            self.assertRaises(IOError, fp.writelines, [b"blah\n"])
348        with self.open(support.TESTFN, "r") as fp:
349            self.assertRaises(IOError, fp.write, "blah")
350            self.assertRaises(IOError, fp.writelines, ["blah\n"])
351
352    def test_open_handles_NUL_chars(self):
353        fn_with_NUL = 'foo\0bar'
354        self.assertRaises(TypeError, self.open, fn_with_NUL, 'w')
355
356        bytes_fn = fn_with_NUL.encode('ascii')
357        with warnings.catch_warnings():
358            warnings.simplefilter("ignore", DeprecationWarning)
359            self.assertRaises(TypeError, self.open, bytes_fn, 'w')
360
361    def test_raw_file_io(self):
362        with self.open(support.TESTFN, "wb", buffering=0) as f:
363            self.assertEqual(f.readable(), False)
364            self.assertEqual(f.writable(), True)
365            self.assertEqual(f.seekable(), True)
366            self.write_ops(f)
367        with self.open(support.TESTFN, "rb", buffering=0) as f:
368            self.assertEqual(f.readable(), True)
369            self.assertEqual(f.writable(), False)
370            self.assertEqual(f.seekable(), True)
371            self.read_ops(f)
372
373    def test_buffered_file_io(self):
374        with self.open(support.TESTFN, "wb") as f:
375            self.assertEqual(f.readable(), False)
376            self.assertEqual(f.writable(), True)
377            self.assertEqual(f.seekable(), True)
378            self.write_ops(f)
379        with self.open(support.TESTFN, "rb") as f:
380            self.assertEqual(f.readable(), True)
381            self.assertEqual(f.writable(), False)
382            self.assertEqual(f.seekable(), True)
383            self.read_ops(f, True)
384
385    def test_readline(self):
386        with self.open(support.TESTFN, "wb") as f:
387            f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
388        with self.open(support.TESTFN, "rb") as f:
389            self.assertEqual(f.readline(), b"abc\n")
390            self.assertEqual(f.readline(10), b"def\n")
391            self.assertEqual(f.readline(2), b"xy")
392            self.assertEqual(f.readline(4), b"zzy\n")
393            self.assertEqual(f.readline(), b"foo\x00bar\n")
394            self.assertEqual(f.readline(None), b"another line")
395            self.assertRaises(TypeError, f.readline, 5.3)
396        with self.open(support.TESTFN, "r") as f:
397            self.assertRaises(TypeError, f.readline, 5.3)
398
399    def test_raw_bytes_io(self):
400        f = self.BytesIO()
401        self.write_ops(f)
402        data = f.getvalue()
403        self.assertEqual(data, b"hello world\n")
404        f = self.BytesIO(data)
405        self.read_ops(f, True)
406
407    def test_large_file_ops(self):
408        # On Windows and Mac OSX this test comsumes large resources; It takes
409        # a long time to build the >2GB file and takes >2GB of disk space
410        # therefore the resource must be enabled to run this test.
411        if sys.platform[:3] == 'win' or sys.platform == 'darwin':
412            support.requires(
413                'largefile',
414                'test requires %s bytes and a long time to run' % self.LARGE)
415        with self.open(support.TESTFN, "w+b", 0) as f:
416            self.large_file_ops(f)
417        with self.open(support.TESTFN, "w+b") as f:
418            self.large_file_ops(f)
419
420    def test_with_open(self):
421        for bufsize in (0, 1, 100):
422            f = None
423            with self.open(support.TESTFN, "wb", bufsize) as f:
424                f.write(b"xxx")
425            self.assertEqual(f.closed, True)
426            f = None
427            try:
428                with self.open(support.TESTFN, "wb", bufsize) as f:
429                    1 // 0
430            except ZeroDivisionError:
431                self.assertEqual(f.closed, True)
432            else:
433                self.fail("1 // 0 didn't raise an exception")
434
435    # issue 5008
436    def test_append_mode_tell(self):
437        with self.open(support.TESTFN, "wb") as f:
438            f.write(b"xxx")
439        with self.open(support.TESTFN, "ab", buffering=0) as f:
440            self.assertEqual(f.tell(), 3)
441        with self.open(support.TESTFN, "ab") as f:
442            self.assertEqual(f.tell(), 3)
443        with self.open(support.TESTFN, "a") as f:
444            self.assertGreater(f.tell(), 0)
445
446    def test_destructor(self):
447        record = []
448        class MyFileIO(self.FileIO):
449            def __del__(self):
450                record.append(1)
451                try:
452                    f = super(MyFileIO, self).__del__
453                except AttributeError:
454                    pass
455                else:
456                    f()
457            def close(self):
458                record.append(2)
459                super(MyFileIO, self).close()
460            def flush(self):
461                record.append(3)
462                super(MyFileIO, self).flush()
463        f = MyFileIO(support.TESTFN, "wb")
464        f.write(b"xxx")
465        del f
466        support.gc_collect()
467        self.assertEqual(record, [1, 2, 3])
468        with self.open(support.TESTFN, "rb") as f:
469            self.assertEqual(f.read(), b"xxx")
470
471    def _check_base_destructor(self, base):
472        record = []
473        class MyIO(base):
474            def __init__(self):
475                # This exercises the availability of attributes on object
476                # destruction.
477                # (in the C version, close() is called by the tp_dealloc
478                # function, not by __del__)
479                self.on_del = 1
480                self.on_close = 2
481                self.on_flush = 3
482            def __del__(self):
483                record.append(self.on_del)
484                try:
485                    f = super(MyIO, self).__del__
486                except AttributeError:
487                    pass
488                else:
489                    f()
490            def close(self):
491                record.append(self.on_close)
492                super(MyIO, self).close()
493            def flush(self):
494                record.append(self.on_flush)
495                super(MyIO, self).flush()
496        f = MyIO()
497        del f
498        support.gc_collect()
499        self.assertEqual(record, [1, 2, 3])
500
501    def test_IOBase_destructor(self):
502        self._check_base_destructor(self.IOBase)
503
504    def test_RawIOBase_destructor(self):
505        self._check_base_destructor(self.RawIOBase)
506
507    def test_BufferedIOBase_destructor(self):
508        self._check_base_destructor(self.BufferedIOBase)
509
510    def test_TextIOBase_destructor(self):
511        self._check_base_destructor(self.TextIOBase)
512
513    def test_close_flushes(self):
514        with self.open(support.TESTFN, "wb") as f:
515            f.write(b"xxx")
516        with self.open(support.TESTFN, "rb") as f:
517            self.assertEqual(f.read(), b"xxx")
518
519    def test_array_writes(self):
520        a = array.array(b'i', range(10))
521        n = len(a.tostring())
522        with self.open(support.TESTFN, "wb", 0) as f:
523            self.assertEqual(f.write(a), n)
524        with self.open(support.TESTFN, "wb") as f:
525            self.assertEqual(f.write(a), n)
526
527    def test_closefd(self):
528        self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
529                          closefd=False)
530
531    def test_read_closed(self):
532        with self.open(support.TESTFN, "w") as f:
533            f.write("egg\n")
534        with self.open(support.TESTFN, "r") as f:
535            file = self.open(f.fileno(), "r", closefd=False)
536            self.assertEqual(file.read(), "egg\n")
537            file.seek(0)
538            file.close()
539            self.assertRaises(ValueError, file.read)
540
541    def test_no_closefd_with_filename(self):
542        # can't use closefd in combination with a file name
543        self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
544
545    def test_closefd_attr(self):
546        with self.open(support.TESTFN, "wb") as f:
547            f.write(b"egg\n")
548        with self.open(support.TESTFN, "r") as f:
549            self.assertEqual(f.buffer.raw.closefd, True)
550            file = self.open(f.fileno(), "r", closefd=False)
551            self.assertEqual(file.buffer.raw.closefd, False)
552
553    def test_garbage_collection(self):
554        # FileIO objects are collected, and collecting them flushes
555        # all data to disk.
556        f = self.FileIO(support.TESTFN, "wb")
557        f.write(b"abcxxx")
558        f.f = f
559        wr = weakref.ref(f)
560        del f
561        support.gc_collect()
562        self.assertIsNone(wr(), wr)
563        with self.open(support.TESTFN, "rb") as f:
564            self.assertEqual(f.read(), b"abcxxx")
565
566    def test_unbounded_file(self):
567        # Issue #1174606: reading from an unbounded stream such as /dev/zero.
568        zero = "/dev/zero"
569        if not os.path.exists(zero):
570            self.skipTest("{0} does not exist".format(zero))
571        if sys.maxsize > 0x7FFFFFFF:
572            self.skipTest("test can only run in a 32-bit address space")
573        if support.real_max_memuse < support._2G:
574            self.skipTest("test requires at least 2GB of memory")
575        with self.open(zero, "rb", buffering=0) as f:
576            self.assertRaises(OverflowError, f.read)
577        with self.open(zero, "rb") as f:
578            self.assertRaises(OverflowError, f.read)
579        with self.open(zero, "r") as f:
580            self.assertRaises(OverflowError, f.read)
581
582    def check_flush_error_on_close(self, *args, **kwargs):
583        # Test that the file is closed despite failed flush
584        # and that flush() is called before file closed.
585        f = self.open(*args, **kwargs)
586        closed = []
587        def bad_flush():
588            closed[:] = [f.closed]
589            raise IOError()
590        f.flush = bad_flush
591        self.assertRaises(IOError, f.close) # exception not swallowed
592        self.assertTrue(f.closed)
593        self.assertTrue(closed)      # flush() called
594        self.assertFalse(closed[0])  # flush() called before file closed
595        f.flush = lambda: None  # break reference loop
596
597    def test_flush_error_on_close(self):
598        # raw file
599        # Issue #5700: io.FileIO calls flush() after file closed
600        self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
601        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
602        self.check_flush_error_on_close(fd, 'wb', buffering=0)
603        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
604        self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
605        os.close(fd)
606        # buffered io
607        self.check_flush_error_on_close(support.TESTFN, 'wb')
608        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
609        self.check_flush_error_on_close(fd, 'wb')
610        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
611        self.check_flush_error_on_close(fd, 'wb', closefd=False)
612        os.close(fd)
613        # text io
614        self.check_flush_error_on_close(support.TESTFN, 'w')
615        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
616        self.check_flush_error_on_close(fd, 'w')
617        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
618        self.check_flush_error_on_close(fd, 'w', closefd=False)
619        os.close(fd)
620
621    def test_multi_close(self):
622        f = self.open(support.TESTFN, "wb", buffering=0)
623        f.close()
624        f.close()
625        f.close()
626        self.assertRaises(ValueError, f.flush)
627
628    def test_RawIOBase_read(self):
629        # Exercise the default RawIOBase.read() implementation (which calls
630        # readinto() internally).
631        rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
632        self.assertEqual(rawio.read(2), b"ab")
633        self.assertEqual(rawio.read(2), b"c")
634        self.assertEqual(rawio.read(2), b"d")
635        self.assertEqual(rawio.read(2), None)
636        self.assertEqual(rawio.read(2), b"ef")
637        self.assertEqual(rawio.read(2), b"g")
638        self.assertEqual(rawio.read(2), None)
639        self.assertEqual(rawio.read(2), b"")
640
641    def test_fileio_closefd(self):
642        # Issue #4841
643        with self.open(__file__, 'rb') as f1, \
644             self.open(__file__, 'rb') as f2:
645            fileio = self.FileIO(f1.fileno(), closefd=False)
646            # .__init__() must not close f1
647            fileio.__init__(f2.fileno(), closefd=False)
648            f1.readline()
649            # .close() must not close f2
650            fileio.close()
651            f2.readline()
652
653    def test_nonbuffered_textio(self):
654        with warnings.catch_warnings(record=True) as recorded:
655            with self.assertRaises(ValueError):
656                self.open(support.TESTFN, 'w', buffering=0)
657            support.gc_collect()
658        self.assertEqual(recorded, [])
659
660    def test_invalid_newline(self):
661        with warnings.catch_warnings(record=True) as recorded:
662            with self.assertRaises(ValueError):
663                self.open(support.TESTFN, 'w', newline='invalid')
664            support.gc_collect()
665        self.assertEqual(recorded, [])
666
667    def test_buffered_readinto_mixin(self):
668        # Test the implementation provided by BufferedIOBase
669        class Stream(self.BufferedIOBase):
670            def read(self, size):
671                return b"12345"
672        stream = Stream()
673        buffer = byteslike(5)
674        self.assertEqual(stream.readinto(buffer), 5)
675        self.assertEqual(buffer.tobytes(), b"12345")
676
677
678class CIOTest(IOTest):
679
680    def test_IOBase_finalize(self):
681        # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
682        # class which inherits IOBase and an object of this class are caught
683        # in a reference cycle and close() is already in the method cache.
684        class MyIO(self.IOBase):
685            def close(self):
686                pass
687
688        # create an instance to populate the method cache
689        MyIO()
690        obj = MyIO()
691        obj.obj = obj
692        wr = weakref.ref(obj)
693        del MyIO
694        del obj
695        support.gc_collect()
696        self.assertIsNone(wr(), wr)
697
698class PyIOTest(IOTest):
699    test_array_writes = unittest.skip(
700        "len(array.array) returns number of elements rather than bytelength"
701    )(IOTest.test_array_writes)
702
703
704class CommonBufferedTests:
705    # Tests common to BufferedReader, BufferedWriter and BufferedRandom
706
707    def test_detach(self):
708        raw = self.MockRawIO()
709        buf = self.tp(raw)
710        self.assertIs(buf.detach(), raw)
711        self.assertRaises(ValueError, buf.detach)
712
713        repr(buf)  # Should still work
714
715    def test_fileno(self):
716        rawio = self.MockRawIO()
717        bufio = self.tp(rawio)
718
719        self.assertEqual(42, bufio.fileno())
720
721    def test_invalid_args(self):
722        rawio = self.MockRawIO()
723        bufio = self.tp(rawio)
724        # Invalid whence
725        self.assertRaises(ValueError, bufio.seek, 0, -1)
726        self.assertRaises(ValueError, bufio.seek, 0, 3)
727
728    def test_override_destructor(self):
729        tp = self.tp
730        record = []
731        class MyBufferedIO(tp):
732            def __del__(self):
733                record.append(1)
734                try:
735                    f = super(MyBufferedIO, self).__del__
736                except AttributeError:
737                    pass
738                else:
739                    f()
740            def close(self):
741                record.append(2)
742                super(MyBufferedIO, self).close()
743            def flush(self):
744                record.append(3)
745                super(MyBufferedIO, self).flush()
746        rawio = self.MockRawIO()
747        bufio = MyBufferedIO(rawio)
748        writable = bufio.writable()
749        del bufio
750        support.gc_collect()
751        if writable:
752            self.assertEqual(record, [1, 2, 3])
753        else:
754            self.assertEqual(record, [1, 2])
755
756    def test_context_manager(self):
757        # Test usability as a context manager
758        rawio = self.MockRawIO()
759        bufio = self.tp(rawio)
760        def _with():
761            with bufio:
762                pass
763        _with()
764        # bufio should now be closed, and using it a second time should raise
765        # a ValueError.
766        self.assertRaises(ValueError, _with)
767
768    def test_error_through_destructor(self):
769        # Test that the exception state is not modified by a destructor,
770        # even if close() fails.
771        rawio = self.CloseFailureIO()
772        def f():
773            self.tp(rawio).xyzzy
774        with support.captured_output("stderr") as s:
775            self.assertRaises(AttributeError, f)
776        s = s.getvalue().strip()
777        if s:
778            # The destructor *may* have printed an unraisable error, check it
779            self.assertEqual(len(s.splitlines()), 1)
780            self.assertTrue(s.startswith("Exception IOError: "), s)
781            self.assertTrue(s.endswith(" ignored"), s)
782
783    def test_repr(self):
784        raw = self.MockRawIO()
785        b = self.tp(raw)
786        clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
787        self.assertEqual(repr(b), "<%s>" % clsname)
788        raw.name = "dummy"
789        self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
790        raw.name = b"dummy"
791        self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
792
793    def test_flush_error_on_close(self):
794        # Test that buffered file is closed despite failed flush
795        # and that flush() is called before file closed.
796        raw = self.MockRawIO()
797        closed = []
798        def bad_flush():
799            closed[:] = [b.closed, raw.closed]
800            raise IOError()
801        raw.flush = bad_flush
802        b = self.tp(raw)
803        self.assertRaises(IOError, b.close) # exception not swallowed
804        self.assertTrue(b.closed)
805        self.assertTrue(raw.closed)
806        self.assertTrue(closed)      # flush() called
807        self.assertFalse(closed[0])  # flush() called before file closed
808        self.assertFalse(closed[1])
809        raw.flush = lambda: None  # break reference loop
810
811    def test_close_error_on_close(self):
812        raw = self.MockRawIO()
813        def bad_flush():
814            raise IOError('flush')
815        def bad_close():
816            raise IOError('close')
817        raw.close = bad_close
818        b = self.tp(raw)
819        b.flush = bad_flush
820        with self.assertRaises(IOError) as err: # exception not swallowed
821            b.close()
822        self.assertEqual(err.exception.args, ('close',))
823        self.assertFalse(b.closed)
824
825    def test_multi_close(self):
826        raw = self.MockRawIO()
827        b = self.tp(raw)
828        b.close()
829        b.close()
830        b.close()
831        self.assertRaises(ValueError, b.flush)
832
833    def test_readonly_attributes(self):
834        raw = self.MockRawIO()
835        buf = self.tp(raw)
836        x = self.MockRawIO()
837        with self.assertRaises((AttributeError, TypeError)):
838            buf.raw = x
839
840
841class SizeofTest:
842
843    @support.cpython_only
844    def test_sizeof(self):
845        bufsize1 = 4096
846        bufsize2 = 8192
847        rawio = self.MockRawIO()
848        bufio = self.tp(rawio, buffer_size=bufsize1)
849        size = sys.getsizeof(bufio) - bufsize1
850        rawio = self.MockRawIO()
851        bufio = self.tp(rawio, buffer_size=bufsize2)
852        self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
853
854
855class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
856    read_mode = "rb"
857
858    def test_constructor(self):
859        rawio = self.MockRawIO([b"abc"])
860        bufio = self.tp(rawio)
861        bufio.__init__(rawio)
862        bufio.__init__(rawio, buffer_size=1024)
863        bufio.__init__(rawio, buffer_size=16)
864        self.assertEqual(b"abc", bufio.read())
865        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
866        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
867        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
868        rawio = self.MockRawIO([b"abc"])
869        bufio.__init__(rawio)
870        self.assertEqual(b"abc", bufio.read())
871
872    def test_uninitialized(self):
873        bufio = self.tp.__new__(self.tp)
874        del bufio
875        bufio = self.tp.__new__(self.tp)
876        self.assertRaisesRegexp((ValueError, AttributeError),
877                                'uninitialized|has no attribute',
878                                bufio.read, 0)
879        bufio.__init__(self.MockRawIO())
880        self.assertEqual(bufio.read(0), b'')
881
882    def test_read(self):
883        for arg in (None, 7):
884            rawio = self.MockRawIO((b"abc", b"d", b"efg"))
885            bufio = self.tp(rawio)
886            self.assertEqual(b"abcdefg", bufio.read(arg))
887        # Invalid args
888        self.assertRaises(ValueError, bufio.read, -2)
889
890    def test_read1(self):
891        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
892        bufio = self.tp(rawio)
893        self.assertEqual(b"a", bufio.read(1))
894        self.assertEqual(b"b", bufio.read1(1))
895        self.assertEqual(rawio._reads, 1)
896        self.assertEqual(b"c", bufio.read1(100))
897        self.assertEqual(rawio._reads, 1)
898        self.assertEqual(b"d", bufio.read1(100))
899        self.assertEqual(rawio._reads, 2)
900        self.assertEqual(b"efg", bufio.read1(100))
901        self.assertEqual(rawio._reads, 3)
902        self.assertEqual(b"", bufio.read1(100))
903        self.assertEqual(rawio._reads, 4)
904        # Invalid args
905        self.assertRaises(ValueError, bufio.read1, -1)
906
907    def test_readinto(self):
908        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
909        bufio = self.tp(rawio)
910        b = bytearray(2)
911        self.assertEqual(bufio.readinto(b), 2)
912        self.assertEqual(b, b"ab")
913        self.assertEqual(bufio.readinto(b), 2)
914        self.assertEqual(b, b"cd")
915        self.assertEqual(bufio.readinto(b), 2)
916        self.assertEqual(b, b"ef")
917        self.assertEqual(bufio.readinto(b), 1)
918        self.assertEqual(b, b"gf")
919        self.assertEqual(bufio.readinto(b), 0)
920        self.assertEqual(b, b"gf")
921
922    def test_readlines(self):
923        def bufio():
924            rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
925            return self.tp(rawio)
926        self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
927        self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
928        self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
929
930    def test_buffering(self):
931        data = b"abcdefghi"
932        dlen = len(data)
933
934        tests = [
935            [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
936            [ 100, [ 3, 3, 3],     [ dlen ]    ],
937            [   4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
938        ]
939
940        for bufsize, buf_read_sizes, raw_read_sizes in tests:
941            rawio = self.MockFileIO(data)
942            bufio = self.tp(rawio, buffer_size=bufsize)
943            pos = 0
944            for nbytes in buf_read_sizes:
945                self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
946                pos += nbytes
947            # this is mildly implementation-dependent
948            self.assertEqual(rawio.read_history, raw_read_sizes)
949
950    def test_read_non_blocking(self):
951        # Inject some None's in there to simulate EWOULDBLOCK
952        rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
953        bufio = self.tp(rawio)
954        self.assertEqual(b"abcd", bufio.read(6))
955        self.assertEqual(b"e", bufio.read(1))
956        self.assertEqual(b"fg", bufio.read())
957        self.assertEqual(b"", bufio.peek(1))
958        self.assertIsNone(bufio.read())
959        self.assertEqual(b"", bufio.read())
960
961        rawio = self.MockRawIO((b"a", None, None))
962        self.assertEqual(b"a", rawio.readall())
963        self.assertIsNone(rawio.readall())
964
965    def test_read_past_eof(self):
966        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
967        bufio = self.tp(rawio)
968
969        self.assertEqual(b"abcdefg", bufio.read(9000))
970
971    def test_read_all(self):
972        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
973        bufio = self.tp(rawio)
974
975        self.assertEqual(b"abcdefg", bufio.read())
976
977    @unittest.skipUnless(threading, 'Threading required for this test.')
978    @support.requires_resource('cpu')
979    def test_threads(self):
980        try:
981            # Write out many bytes with exactly the same number of 0's,
982            # 1's... 255's. This will help us check that concurrent reading
983            # doesn't duplicate or forget contents.
984            N = 1000
985            l = list(range(256)) * N
986            random.shuffle(l)
987            s = bytes(bytearray(l))
988            with self.open(support.TESTFN, "wb") as f:
989                f.write(s)
990            with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
991                bufio = self.tp(raw, 8)
992                errors = []
993                results = []
994                def f():
995                    try:
996                        # Intra-buffer read then buffer-flushing read
997                        for n in cycle([1, 19]):
998                            s = bufio.read(n)
999                            if not s:
1000                                break
1001                            # list.append() is atomic
1002                            results.append(s)
1003                    except Exception as e:
1004                        errors.append(e)
1005                        raise
1006                threads = [threading.Thread(target=f) for x in range(20)]
1007                with support.start_threads(threads):
1008                    time.sleep(0.02) # yield
1009                self.assertFalse(errors,
1010                    "the following exceptions were caught: %r" % errors)
1011                s = b''.join(results)
1012                for i in range(256):
1013                    c = bytes(bytearray([i]))
1014                    self.assertEqual(s.count(c), N)
1015        finally:
1016            support.unlink(support.TESTFN)
1017
1018    def test_misbehaved_io(self):
1019        rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1020        bufio = self.tp(rawio)
1021        self.assertRaises(IOError, bufio.seek, 0)
1022        self.assertRaises(IOError, bufio.tell)
1023
1024    def test_no_extraneous_read(self):
1025        # Issue #9550; when the raw IO object has satisfied the read request,
1026        # we should not issue any additional reads, otherwise it may block
1027        # (e.g. socket).
1028        bufsize = 16
1029        for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1030            rawio = self.MockRawIO([b"x" * n])
1031            bufio = self.tp(rawio, bufsize)
1032            self.assertEqual(bufio.read(n), b"x" * n)
1033            # Simple case: one raw read is enough to satisfy the request.
1034            self.assertEqual(rawio._extraneous_reads, 0,
1035                             "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1036            # A more complex case where two raw reads are needed to satisfy
1037            # the request.
1038            rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1039            bufio = self.tp(rawio, bufsize)
1040            self.assertEqual(bufio.read(n), b"x" * n)
1041            self.assertEqual(rawio._extraneous_reads, 0,
1042                             "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1043
1044
1045class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
1046    tp = io.BufferedReader
1047
1048    def test_constructor(self):
1049        BufferedReaderTest.test_constructor(self)
1050        # The allocation can succeed on 32-bit builds, e.g. with more
1051        # than 2GB RAM and a 64-bit kernel.
1052        if sys.maxsize > 0x7FFFFFFF:
1053            rawio = self.MockRawIO()
1054            bufio = self.tp(rawio)
1055            self.assertRaises((OverflowError, MemoryError, ValueError),
1056                bufio.__init__, rawio, sys.maxsize)
1057
1058    def test_initialization(self):
1059        rawio = self.MockRawIO([b"abc"])
1060        bufio = self.tp(rawio)
1061        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1062        self.assertRaises(ValueError, bufio.read)
1063        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1064        self.assertRaises(ValueError, bufio.read)
1065        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1066        self.assertRaises(ValueError, bufio.read)
1067
1068    def test_misbehaved_io_read(self):
1069        rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1070        bufio = self.tp(rawio)
1071        # _pyio.BufferedReader seems to implement reading different, so that
1072        # checking this is not so easy.
1073        self.assertRaises(IOError, bufio.read, 10)
1074
1075    def test_garbage_collection(self):
1076        # C BufferedReader objects are collected.
1077        # The Python version has __del__, so it ends into gc.garbage instead
1078        rawio = self.FileIO(support.TESTFN, "w+b")
1079        f = self.tp(rawio)
1080        f.f = f
1081        wr = weakref.ref(f)
1082        del f
1083        support.gc_collect()
1084        self.assertIsNone(wr(), wr)
1085
1086    def test_args_error(self):
1087        # Issue #17275
1088        with self.assertRaisesRegexp(TypeError, "BufferedReader"):
1089            self.tp(io.BytesIO(), 1024, 1024, 1024)
1090
1091
1092class PyBufferedReaderTest(BufferedReaderTest):
1093    tp = pyio.BufferedReader
1094
1095
1096class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1097    write_mode = "wb"
1098
1099    def test_constructor(self):
1100        rawio = self.MockRawIO()
1101        bufio = self.tp(rawio)
1102        bufio.__init__(rawio)
1103        bufio.__init__(rawio, buffer_size=1024)
1104        bufio.__init__(rawio, buffer_size=16)
1105        self.assertEqual(3, bufio.write(b"abc"))
1106        bufio.flush()
1107        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1108        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1109        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1110        bufio.__init__(rawio)
1111        self.assertEqual(3, bufio.write(b"ghi"))
1112        bufio.flush()
1113        self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
1114
1115    def test_uninitialized(self):
1116        bufio = self.tp.__new__(self.tp)
1117        del bufio
1118        bufio = self.tp.__new__(self.tp)
1119        self.assertRaisesRegexp((ValueError, AttributeError),
1120                                'uninitialized|has no attribute',
1121                                bufio.write, b'')
1122        bufio.__init__(self.MockRawIO())
1123        self.assertEqual(bufio.write(b''), 0)
1124
1125    def test_detach_flush(self):
1126        raw = self.MockRawIO()
1127        buf = self.tp(raw)
1128        buf.write(b"howdy!")
1129        self.assertFalse(raw._write_stack)
1130        buf.detach()
1131        self.assertEqual(raw._write_stack, [b"howdy!"])
1132
1133    def test_write(self):
1134        # Write to the buffered IO but don't overflow the buffer.
1135        writer = self.MockRawIO()
1136        bufio = self.tp(writer, 8)
1137        bufio.write(b"abc")
1138        self.assertFalse(writer._write_stack)
1139        buffer = bytearray(b"def")
1140        bufio.write(buffer)
1141        buffer[:] = b"***"  # Overwrite our copy of the data
1142        bufio.flush()
1143        self.assertEqual(b"".join(writer._write_stack), b"abcdef")
1144
1145    def test_write_overflow(self):
1146        writer = self.MockRawIO()
1147        bufio = self.tp(writer, 8)
1148        contents = b"abcdefghijklmnop"
1149        for n in range(0, len(contents), 3):
1150            bufio.write(contents[n:n+3])
1151        flushed = b"".join(writer._write_stack)
1152        # At least (total - 8) bytes were implicitly flushed, perhaps more
1153        # depending on the implementation.
1154        self.assertTrue(flushed.startswith(contents[:-8]), flushed)
1155
1156    def check_writes(self, intermediate_func):
1157        # Lots of writes, test the flushed output is as expected.
1158        contents = bytes(range(256)) * 1000
1159        n = 0
1160        writer = self.MockRawIO()
1161        bufio = self.tp(writer, 13)
1162        # Generator of write sizes: repeat each N 15 times then proceed to N+1
1163        def gen_sizes():
1164            for size in count(1):
1165                for i in range(15):
1166                    yield size
1167        sizes = gen_sizes()
1168        while n < len(contents):
1169            size = min(next(sizes), len(contents) - n)
1170            self.assertEqual(bufio.write(contents[n:n+size]), size)
1171            intermediate_func(bufio)
1172            n += size
1173        bufio.flush()
1174        self.assertEqual(contents,
1175            b"".join(writer._write_stack))
1176
1177    def test_writes(self):
1178        self.check_writes(lambda bufio: None)
1179
1180    def test_writes_and_flushes(self):
1181        self.check_writes(lambda bufio: bufio.flush())
1182
1183    def test_writes_and_seeks(self):
1184        def _seekabs(bufio):
1185            pos = bufio.tell()
1186            bufio.seek(pos + 1, 0)
1187            bufio.seek(pos - 1, 0)
1188            bufio.seek(pos, 0)
1189        self.check_writes(_seekabs)
1190        def _seekrel(bufio):
1191            pos = bufio.seek(0, 1)
1192            bufio.seek(+1, 1)
1193            bufio.seek(-1, 1)
1194            bufio.seek(pos, 0)
1195        self.check_writes(_seekrel)
1196
1197    def test_writes_and_truncates(self):
1198        self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
1199
1200    def test_write_non_blocking(self):
1201        raw = self.MockNonBlockWriterIO()
1202        bufio = self.tp(raw, 8)
1203
1204        self.assertEqual(bufio.write(b"abcd"), 4)
1205        self.assertEqual(bufio.write(b"efghi"), 5)
1206        # 1 byte will be written, the rest will be buffered
1207        raw.block_on(b"k")
1208        self.assertEqual(bufio.write(b"jklmn"), 5)
1209
1210        # 8 bytes will be written, 8 will be buffered and the rest will be lost
1211        raw.block_on(b"0")
1212        try:
1213            bufio.write(b"opqrwxyz0123456789")
1214        except self.BlockingIOError as e:
1215            written = e.characters_written
1216        else:
1217            self.fail("BlockingIOError should have been raised")
1218        self.assertEqual(written, 16)
1219        self.assertEqual(raw.pop_written(),
1220            b"abcdefghijklmnopqrwxyz")
1221
1222        self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
1223        s = raw.pop_written()
1224        # Previously buffered bytes were flushed
1225        self.assertTrue(s.startswith(b"01234567A"), s)
1226
1227    def test_write_and_rewind(self):
1228        raw = io.BytesIO()
1229        bufio = self.tp(raw, 4)
1230        self.assertEqual(bufio.write(b"abcdef"), 6)
1231        self.assertEqual(bufio.tell(), 6)
1232        bufio.seek(0, 0)
1233        self.assertEqual(bufio.write(b"XY"), 2)
1234        bufio.seek(6, 0)
1235        self.assertEqual(raw.getvalue(), b"XYcdef")
1236        self.assertEqual(bufio.write(b"123456"), 6)
1237        bufio.flush()
1238        self.assertEqual(raw.getvalue(), b"XYcdef123456")
1239
1240    def test_flush(self):
1241        writer = self.MockRawIO()
1242        bufio = self.tp(writer, 8)
1243        bufio.write(b"abc")
1244        bufio.flush()
1245        self.assertEqual(b"abc", writer._write_stack[0])
1246
1247    def test_writelines(self):
1248        l = [b'ab', b'cd', b'ef']
1249        writer = self.MockRawIO()
1250        bufio = self.tp(writer, 8)
1251        bufio.writelines(l)
1252        bufio.flush()
1253        self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1254
1255    def test_writelines_userlist(self):
1256        l = UserList([b'ab', b'cd', b'ef'])
1257        writer = self.MockRawIO()
1258        bufio = self.tp(writer, 8)
1259        bufio.writelines(l)
1260        bufio.flush()
1261        self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1262
1263    def test_writelines_error(self):
1264        writer = self.MockRawIO()
1265        bufio = self.tp(writer, 8)
1266        self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1267        self.assertRaises(TypeError, bufio.writelines, None)
1268
1269    def test_destructor(self):
1270        writer = self.MockRawIO()
1271        bufio = self.tp(writer, 8)
1272        bufio.write(b"abc")
1273        del bufio
1274        support.gc_collect()
1275        self.assertEqual(b"abc", writer._write_stack[0])
1276
1277    def test_truncate(self):
1278        # Truncate implicitly flushes the buffer.
1279        with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
1280            bufio = self.tp(raw, 8)
1281            bufio.write(b"abcdef")
1282            self.assertEqual(bufio.truncate(3), 3)
1283            self.assertEqual(bufio.tell(), 6)
1284        with self.open(support.TESTFN, "rb", buffering=0) as f:
1285            self.assertEqual(f.read(), b"abc")
1286
1287    @unittest.skipUnless(threading, 'Threading required for this test.')
1288    @support.requires_resource('cpu')
1289    def test_threads(self):
1290        try:
1291            # Write out many bytes from many threads and test they were
1292            # all flushed.
1293            N = 1000
1294            contents = bytes(range(256)) * N
1295            sizes = cycle([1, 19])
1296            n = 0
1297            queue = deque()
1298            while n < len(contents):
1299                size = next(sizes)
1300                queue.append(contents[n:n+size])
1301                n += size
1302            del contents
1303            # We use a real file object because it allows us to
1304            # exercise situations where the GIL is released before
1305            # writing the buffer to the raw streams. This is in addition
1306            # to concurrency issues due to switching threads in the middle
1307            # of Python code.
1308            with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
1309                bufio = self.tp(raw, 8)
1310                errors = []
1311                def f():
1312                    try:
1313                        while True:
1314                            try:
1315                                s = queue.popleft()
1316                            except IndexError:
1317                                return
1318                            bufio.write(s)
1319                    except Exception as e:
1320                        errors.append(e)
1321                        raise
1322                threads = [threading.Thread(target=f) for x in range(20)]
1323                with support.start_threads(threads):
1324                    time.sleep(0.02) # yield
1325                self.assertFalse(errors,
1326                    "the following exceptions were caught: %r" % errors)
1327                bufio.close()
1328            with self.open(support.TESTFN, "rb") as f:
1329                s = f.read()
1330            for i in range(256):
1331                self.assertEqual(s.count(bytes([i])), N)
1332        finally:
1333            support.unlink(support.TESTFN)
1334
1335    def test_misbehaved_io(self):
1336        rawio = self.MisbehavedRawIO()
1337        bufio = self.tp(rawio, 5)
1338        self.assertRaises(IOError, bufio.seek, 0)
1339        self.assertRaises(IOError, bufio.tell)
1340        self.assertRaises(IOError, bufio.write, b"abcdef")
1341
1342    def test_max_buffer_size_deprecation(self):
1343        with support.check_warnings(("max_buffer_size is deprecated",
1344                                     DeprecationWarning)):
1345            self.tp(self.MockRawIO(), 8, 12)
1346
1347    def test_write_error_on_close(self):
1348        raw = self.MockRawIO()
1349        def bad_write(b):
1350            raise IOError()
1351        raw.write = bad_write
1352        b = self.tp(raw)
1353        b.write(b'spam')
1354        self.assertRaises(IOError, b.close) # exception not swallowed
1355        self.assertTrue(b.closed)
1356
1357
1358class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
1359    tp = io.BufferedWriter
1360
1361    def test_constructor(self):
1362        BufferedWriterTest.test_constructor(self)
1363        # The allocation can succeed on 32-bit builds, e.g. with more
1364        # than 2GB RAM and a 64-bit kernel.
1365        if sys.maxsize > 0x7FFFFFFF:
1366            rawio = self.MockRawIO()
1367            bufio = self.tp(rawio)
1368            self.assertRaises((OverflowError, MemoryError, ValueError),
1369                bufio.__init__, rawio, sys.maxsize)
1370
1371    def test_initialization(self):
1372        rawio = self.MockRawIO()
1373        bufio = self.tp(rawio)
1374        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1375        self.assertRaises(ValueError, bufio.write, b"def")
1376        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1377        self.assertRaises(ValueError, bufio.write, b"def")
1378        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1379        self.assertRaises(ValueError, bufio.write, b"def")
1380
1381    def test_garbage_collection(self):
1382        # C BufferedWriter objects are collected, and collecting them flushes
1383        # all data to disk.
1384        # The Python version has __del__, so it ends into gc.garbage instead
1385        rawio = self.FileIO(support.TESTFN, "w+b")
1386        f = self.tp(rawio)
1387        f.write(b"123xxx")
1388        f.x = f
1389        wr = weakref.ref(f)
1390        del f
1391        support.gc_collect()
1392        self.assertIsNone(wr(), wr)
1393        with self.open(support.TESTFN, "rb") as f:
1394            self.assertEqual(f.read(), b"123xxx")
1395
1396    def test_args_error(self):
1397        # Issue #17275
1398        with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
1399            self.tp(io.BytesIO(), 1024, 1024, 1024)
1400
1401
1402class PyBufferedWriterTest(BufferedWriterTest):
1403    tp = pyio.BufferedWriter
1404
1405class BufferedRWPairTest(unittest.TestCase):
1406
1407    def test_constructor(self):
1408        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1409        self.assertFalse(pair.closed)
1410
1411    def test_uninitialized(self):
1412        pair = self.tp.__new__(self.tp)
1413        del pair
1414        pair = self.tp.__new__(self.tp)
1415        self.assertRaisesRegexp((ValueError, AttributeError),
1416                                'uninitialized|has no attribute',
1417                                pair.read, 0)
1418        self.assertRaisesRegexp((ValueError, AttributeError),
1419                                'uninitialized|has no attribute',
1420                                pair.write, b'')
1421        pair.__init__(self.MockRawIO(), self.MockRawIO())
1422        self.assertEqual(pair.read(0), b'')
1423        self.assertEqual(pair.write(b''), 0)
1424
1425    def test_detach(self):
1426        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1427        self.assertRaises(self.UnsupportedOperation, pair.detach)
1428
1429    def test_constructor_max_buffer_size_deprecation(self):
1430        with support.check_warnings(("max_buffer_size is deprecated",
1431                                     DeprecationWarning)):
1432            self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1433
1434    def test_constructor_with_not_readable(self):
1435        class NotReadable(MockRawIO):
1436            def readable(self):
1437                return False
1438
1439        self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1440
1441    def test_constructor_with_not_writeable(self):
1442        class NotWriteable(MockRawIO):
1443            def writable(self):
1444                return False
1445
1446        self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1447
1448    def test_read(self):
1449        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1450
1451        self.assertEqual(pair.read(3), b"abc")
1452        self.assertEqual(pair.read(1), b"d")
1453        self.assertEqual(pair.read(), b"ef")
1454        pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1455        self.assertEqual(pair.read(None), b"abc")
1456
1457    def test_readlines(self):
1458        pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1459        self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1460        self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1461        self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
1462
1463    def test_read1(self):
1464        # .read1() is delegated to the underlying reader object, so this test
1465        # can be shallow.
1466        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1467
1468        self.assertEqual(pair.read1(3), b"abc")
1469
1470    def test_readinto(self):
1471        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1472
1473        data = byteslike(5)
1474        self.assertEqual(pair.readinto(data), 5)
1475        self.assertEqual(data.tobytes(), b"abcde")
1476
1477    def test_write(self):
1478        w = self.MockRawIO()
1479        pair = self.tp(self.MockRawIO(), w)
1480
1481        pair.write(b"abc")
1482        pair.flush()
1483        buffer = bytearray(b"def")
1484        pair.write(buffer)
1485        buffer[:] = b"***"  # Overwrite our copy of the data
1486        pair.flush()
1487        self.assertEqual(w._write_stack, [b"abc", b"def"])
1488
1489    def test_peek(self):
1490        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1491
1492        self.assertTrue(pair.peek(3).startswith(b"abc"))
1493        self.assertEqual(pair.read(3), b"abc")
1494
1495    def test_readable(self):
1496        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1497        self.assertTrue(pair.readable())
1498
1499    def test_writeable(self):
1500        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1501        self.assertTrue(pair.writable())
1502
1503    def test_seekable(self):
1504        # BufferedRWPairs are never seekable, even if their readers and writers
1505        # are.
1506        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1507        self.assertFalse(pair.seekable())
1508
1509    # .flush() is delegated to the underlying writer object and has been
1510    # tested in the test_write method.
1511
1512    def test_close_and_closed(self):
1513        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1514        self.assertFalse(pair.closed)
1515        pair.close()
1516        self.assertTrue(pair.closed)
1517
1518    def test_reader_close_error_on_close(self):
1519        def reader_close():
1520            reader_non_existing
1521        reader = self.MockRawIO()
1522        reader.close = reader_close
1523        writer = self.MockRawIO()
1524        pair = self.tp(reader, writer)
1525        with self.assertRaises(NameError) as err:
1526            pair.close()
1527        self.assertIn('reader_non_existing', str(err.exception))
1528        self.assertTrue(pair.closed)
1529        self.assertFalse(reader.closed)
1530        self.assertTrue(writer.closed)
1531
1532    def test_writer_close_error_on_close(self):
1533        def writer_close():
1534            writer_non_existing
1535        reader = self.MockRawIO()
1536        writer = self.MockRawIO()
1537        writer.close = writer_close
1538        pair = self.tp(reader, writer)
1539        with self.assertRaises(NameError) as err:
1540            pair.close()
1541        self.assertIn('writer_non_existing', str(err.exception))
1542        self.assertFalse(pair.closed)
1543        self.assertTrue(reader.closed)
1544        self.assertFalse(writer.closed)
1545
1546    def test_reader_writer_close_error_on_close(self):
1547        def reader_close():
1548            reader_non_existing
1549        def writer_close():
1550            writer_non_existing
1551        reader = self.MockRawIO()
1552        reader.close = reader_close
1553        writer = self.MockRawIO()
1554        writer.close = writer_close
1555        pair = self.tp(reader, writer)
1556        with self.assertRaises(NameError) as err:
1557            pair.close()
1558        self.assertIn('reader_non_existing', str(err.exception))
1559        self.assertFalse(pair.closed)
1560        self.assertFalse(reader.closed)
1561        self.assertFalse(writer.closed)
1562
1563    def test_isatty(self):
1564        class SelectableIsAtty(MockRawIO):
1565            def __init__(self, isatty):
1566                MockRawIO.__init__(self)
1567                self._isatty = isatty
1568
1569            def isatty(self):
1570                return self._isatty
1571
1572        pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1573        self.assertFalse(pair.isatty())
1574
1575        pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1576        self.assertTrue(pair.isatty())
1577
1578        pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1579        self.assertTrue(pair.isatty())
1580
1581        pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1582        self.assertTrue(pair.isatty())
1583
1584    def test_weakref_clearing(self):
1585        brw = self.tp(self.MockRawIO(), self.MockRawIO())
1586        ref = weakref.ref(brw)
1587        brw = None
1588        ref = None # Shouldn't segfault.
1589
1590class CBufferedRWPairTest(BufferedRWPairTest):
1591    tp = io.BufferedRWPair
1592
1593class PyBufferedRWPairTest(BufferedRWPairTest):
1594    tp = pyio.BufferedRWPair
1595
1596
1597class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1598    read_mode = "rb+"
1599    write_mode = "wb+"
1600
1601    def test_constructor(self):
1602        BufferedReaderTest.test_constructor(self)
1603        BufferedWriterTest.test_constructor(self)
1604
1605    def test_uninitialized(self):
1606        BufferedReaderTest.test_uninitialized(self)
1607        BufferedWriterTest.test_uninitialized(self)
1608
1609    def test_read_and_write(self):
1610        raw = self.MockRawIO((b"asdf", b"ghjk"))
1611        rw = self.tp(raw, 8)
1612
1613        self.assertEqual(b"as", rw.read(2))
1614        rw.write(b"ddd")
1615        rw.write(b"eee")
1616        self.assertFalse(raw._write_stack) # Buffer writes
1617        self.assertEqual(b"ghjk", rw.read())
1618        self.assertEqual(b"dddeee", raw._write_stack[0])
1619
1620    def test_seek_and_tell(self):
1621        raw = self.BytesIO(b"asdfghjkl")
1622        rw = self.tp(raw)
1623
1624        self.assertEqual(b"as", rw.read(2))
1625        self.assertEqual(2, rw.tell())
1626        rw.seek(0, 0)
1627        self.assertEqual(b"asdf", rw.read(4))
1628
1629        rw.write(b"123f")
1630        rw.seek(0, 0)
1631        self.assertEqual(b"asdf123fl", rw.read())
1632        self.assertEqual(9, rw.tell())
1633        rw.seek(-4, 2)
1634        self.assertEqual(5, rw.tell())
1635        rw.seek(2, 1)
1636        self.assertEqual(7, rw.tell())
1637        self.assertEqual(b"fl", rw.read(11))
1638        rw.flush()
1639        self.assertEqual(b"asdf123fl", raw.getvalue())
1640
1641        self.assertRaises(TypeError, rw.seek, 0.0)
1642
1643    def check_flush_and_read(self, read_func):
1644        raw = self.BytesIO(b"abcdefghi")
1645        bufio = self.tp(raw)
1646
1647        self.assertEqual(b"ab", read_func(bufio, 2))
1648        bufio.write(b"12")
1649        self.assertEqual(b"ef", read_func(bufio, 2))
1650        self.assertEqual(6, bufio.tell())
1651        bufio.flush()
1652        self.assertEqual(6, bufio.tell())
1653        self.assertEqual(b"ghi", read_func(bufio))
1654        raw.seek(0, 0)
1655        raw.write(b"XYZ")
1656        # flush() resets the read buffer
1657        bufio.flush()
1658        bufio.seek(0, 0)
1659        self.assertEqual(b"XYZ", read_func(bufio, 3))
1660
1661    def test_flush_and_read(self):
1662        self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1663
1664    def test_flush_and_readinto(self):
1665        def _readinto(bufio, n=-1):
1666            b = bytearray(n if n >= 0 else 9999)
1667            n = bufio.readinto(b)
1668            return bytes(b[:n])
1669        self.check_flush_and_read(_readinto)
1670
1671    def test_flush_and_peek(self):
1672        def _peek(bufio, n=-1):
1673            # This relies on the fact that the buffer can contain the whole
1674            # raw stream, otherwise peek() can return less.
1675            b = bufio.peek(n)
1676            if n != -1:
1677                b = b[:n]
1678            bufio.seek(len(b), 1)
1679            return b
1680        self.check_flush_and_read(_peek)
1681
1682    def test_flush_and_write(self):
1683        raw = self.BytesIO(b"abcdefghi")
1684        bufio = self.tp(raw)
1685
1686        bufio.write(b"123")
1687        bufio.flush()
1688        bufio.write(b"45")
1689        bufio.flush()
1690        bufio.seek(0, 0)
1691        self.assertEqual(b"12345fghi", raw.getvalue())
1692        self.assertEqual(b"12345fghi", bufio.read())
1693
1694    def test_threads(self):
1695        BufferedReaderTest.test_threads(self)
1696        BufferedWriterTest.test_threads(self)
1697
1698    def test_writes_and_peek(self):
1699        def _peek(bufio):
1700            bufio.peek(1)
1701        self.check_writes(_peek)
1702        def _peek(bufio):
1703            pos = bufio.tell()
1704            bufio.seek(-1, 1)
1705            bufio.peek(1)
1706            bufio.seek(pos, 0)
1707        self.check_writes(_peek)
1708
1709    def test_writes_and_reads(self):
1710        def _read(bufio):
1711            bufio.seek(-1, 1)
1712            bufio.read(1)
1713        self.check_writes(_read)
1714
1715    def test_writes_and_read1s(self):
1716        def _read1(bufio):
1717            bufio.seek(-1, 1)
1718            bufio.read1(1)
1719        self.check_writes(_read1)
1720
1721    def test_writes_and_readintos(self):
1722        def _read(bufio):
1723            bufio.seek(-1, 1)
1724            bufio.readinto(bytearray(1))
1725        self.check_writes(_read)
1726
1727    def test_write_after_readahead(self):
1728        # Issue #6629: writing after the buffer was filled by readahead should
1729        # first rewind the raw stream.
1730        for overwrite_size in [1, 5]:
1731            raw = self.BytesIO(b"A" * 10)
1732            bufio = self.tp(raw, 4)
1733            # Trigger readahead
1734            self.assertEqual(bufio.read(1), b"A")
1735            self.assertEqual(bufio.tell(), 1)
1736            # Overwriting should rewind the raw stream if it needs so
1737            bufio.write(b"B" * overwrite_size)
1738            self.assertEqual(bufio.tell(), overwrite_size + 1)
1739            # If the write size was smaller than the buffer size, flush() and
1740            # check that rewind happens.
1741            bufio.flush()
1742            self.assertEqual(bufio.tell(), overwrite_size + 1)
1743            s = raw.getvalue()
1744            self.assertEqual(s,
1745                b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1746
1747    def test_write_rewind_write(self):
1748        # Various combinations of reading / writing / seeking backwards / writing again
1749        def mutate(bufio, pos1, pos2):
1750            assert pos2 >= pos1
1751            # Fill the buffer
1752            bufio.seek(pos1)
1753            bufio.read(pos2 - pos1)
1754            bufio.write(b'\x02')
1755            # This writes earlier than the previous write, but still inside
1756            # the buffer.
1757            bufio.seek(pos1)
1758            bufio.write(b'\x01')
1759
1760        b = b"\x80\x81\x82\x83\x84"
1761        for i in range(0, len(b)):
1762            for j in range(i, len(b)):
1763                raw = self.BytesIO(b)
1764                bufio = self.tp(raw, 100)
1765                mutate(bufio, i, j)
1766                bufio.flush()
1767                expected = bytearray(b)
1768                expected[j] = 2
1769                expected[i] = 1
1770                self.assertEqual(raw.getvalue(), expected,
1771                                 "failed result for i=%d, j=%d" % (i, j))
1772
1773    def test_truncate_after_read_or_write(self):
1774        raw = self.BytesIO(b"A" * 10)
1775        bufio = self.tp(raw, 100)
1776        self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
1777        self.assertEqual(bufio.truncate(), 2)
1778        self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
1779        self.assertEqual(bufio.truncate(), 4)
1780
1781    def test_misbehaved_io(self):
1782        BufferedReaderTest.test_misbehaved_io(self)
1783        BufferedWriterTest.test_misbehaved_io(self)
1784
1785    def test_interleaved_read_write(self):
1786        # Test for issue #12213
1787        with self.BytesIO(b'abcdefgh') as raw:
1788            with self.tp(raw, 100) as f:
1789                f.write(b"1")
1790                self.assertEqual(f.read(1), b'b')
1791                f.write(b'2')
1792                self.assertEqual(f.read1(1), b'd')
1793                f.write(b'3')
1794                buf = bytearray(1)
1795                f.readinto(buf)
1796                self.assertEqual(buf, b'f')
1797                f.write(b'4')
1798                self.assertEqual(f.peek(1), b'h')
1799                f.flush()
1800                self.assertEqual(raw.getvalue(), b'1b2d3f4h')
1801
1802        with self.BytesIO(b'abc') as raw:
1803            with self.tp(raw, 100) as f:
1804                self.assertEqual(f.read(1), b'a')
1805                f.write(b"2")
1806                self.assertEqual(f.read(1), b'c')
1807                f.flush()
1808                self.assertEqual(raw.getvalue(), b'a2c')
1809
1810    def test_interleaved_readline_write(self):
1811        with self.BytesIO(b'ab\ncdef\ng\n') as raw:
1812            with self.tp(raw) as f:
1813                f.write(b'1')
1814                self.assertEqual(f.readline(), b'b\n')
1815                f.write(b'2')
1816                self.assertEqual(f.readline(), b'def\n')
1817                f.write(b'3')
1818                self.assertEqual(f.readline(), b'\n')
1819                f.flush()
1820                self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
1821
1822
1823class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
1824                          BufferedRandomTest, SizeofTest):
1825    tp = io.BufferedRandom
1826
1827    def test_constructor(self):
1828        BufferedRandomTest.test_constructor(self)
1829        # The allocation can succeed on 32-bit builds, e.g. with more
1830        # than 2GB RAM and a 64-bit kernel.
1831        if sys.maxsize > 0x7FFFFFFF:
1832            rawio = self.MockRawIO()
1833            bufio = self.tp(rawio)
1834            self.assertRaises((OverflowError, MemoryError, ValueError),
1835                bufio.__init__, rawio, sys.maxsize)
1836
1837    def test_garbage_collection(self):
1838        CBufferedReaderTest.test_garbage_collection(self)
1839        CBufferedWriterTest.test_garbage_collection(self)
1840
1841    def test_args_error(self):
1842        # Issue #17275
1843        with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
1844            self.tp(io.BytesIO(), 1024, 1024, 1024)
1845
1846
1847class PyBufferedRandomTest(BufferedRandomTest):
1848    tp = pyio.BufferedRandom
1849
1850
1851# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1852# properties:
1853#   - A single output character can correspond to many bytes of input.
1854#   - The number of input bytes to complete the character can be
1855#     undetermined until the last input byte is received.
1856#   - The number of input bytes can vary depending on previous input.
1857#   - A single input byte can correspond to many characters of output.
1858#   - The number of output characters can be undetermined until the
1859#     last input byte is received.
1860#   - The number of output characters can vary depending on previous input.
1861
1862class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1863    """
1864    For testing seek/tell behavior with a stateful, buffering decoder.
1865
1866    Input is a sequence of words.  Words may be fixed-length (length set
1867    by input) or variable-length (period-terminated).  In variable-length
1868    mode, extra periods are ignored.  Possible words are:
1869      - 'i' followed by a number sets the input length, I (maximum 99).
1870        When I is set to 0, words are space-terminated.
1871      - 'o' followed by a number sets the output length, O (maximum 99).
1872      - Any other word is converted into a word followed by a period on
1873        the output.  The output word consists of the input word truncated
1874        or padded out with hyphens to make its length equal to O.  If O
1875        is 0, the word is output verbatim without truncating or padding.
1876    I and O are initially set to 1.  When I changes, any buffered input is
1877    re-scanned according to the new I.  EOF also terminates the last word.
1878    """
1879
1880    def __init__(self, errors='strict'):
1881        codecs.IncrementalDecoder.__init__(self, errors)
1882        self.reset()
1883
1884    def __repr__(self):
1885        return '<SID %x>' % id(self)
1886
1887    def reset(self):
1888        self.i = 1
1889        self.o = 1
1890        self.buffer = bytearray()
1891
1892    def getstate(self):
1893        i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1894        return bytes(self.buffer), i*100 + o
1895
1896    def setstate(self, state):
1897        buffer, io = state
1898        self.buffer = bytearray(buffer)
1899        i, o = divmod(io, 100)
1900        self.i, self.o = i ^ 1, o ^ 1
1901
1902    def decode(self, input, final=False):
1903        output = ''
1904        for b in input:
1905            if self.i == 0: # variable-length, terminated with period
1906                if b == '.':
1907                    if self.buffer:
1908                        output += self.process_word()
1909                else:
1910                    self.buffer.append(b)
1911            else: # fixed-length, terminate after self.i bytes
1912                self.buffer.append(b)
1913                if len(self.buffer) == self.i:
1914                    output += self.process_word()
1915        if final and self.buffer: # EOF terminates the last word
1916            output += self.process_word()
1917        return output
1918
1919    def process_word(self):
1920        output = ''
1921        if self.buffer[0] == ord('i'):
1922            self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1923        elif self.buffer[0] == ord('o'):
1924            self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1925        else:
1926            output = self.buffer.decode('ascii')
1927            if len(output) < self.o:
1928                output += '-'*self.o # pad out with hyphens
1929            if self.o:
1930                output = output[:self.o] # truncate to output length
1931            output += '.'
1932        self.buffer = bytearray()
1933        return output
1934
1935    codecEnabled = False
1936
1937    @classmethod
1938    def lookupTestDecoder(cls, name):
1939        if cls.codecEnabled and name == 'test_decoder':
1940            latin1 = codecs.lookup('latin-1')
1941            return codecs.CodecInfo(
1942                name='test_decoder', encode=latin1.encode, decode=None,
1943                incrementalencoder=None,
1944                streamreader=None, streamwriter=None,
1945                incrementaldecoder=cls)
1946
1947# Register the previous decoder for testing.
1948# Disabled by default, tests will enable it.
1949codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1950
1951
1952class StatefulIncrementalDecoderTest(unittest.TestCase):
1953    """
1954    Make sure the StatefulIncrementalDecoder actually works.
1955    """
1956
1957    test_cases = [
1958        # I=1, O=1 (fixed-length input == fixed-length output)
1959        (b'abcd', False, 'a.b.c.d.'),
1960        # I=0, O=0 (variable-length input, variable-length output)
1961        (b'oiabcd', True, 'abcd.'),
1962        # I=0, O=0 (should ignore extra periods)
1963        (b'oi...abcd...', True, 'abcd.'),
1964        # I=0, O=6 (variable-length input, fixed-length output)
1965        (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1966        # I=2, O=6 (fixed-length input < fixed-length output)
1967        (b'i.i2.o6xyz', True, 'xy----.z-----.'),
1968        # I=6, O=3 (fixed-length input > fixed-length output)
1969        (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1970        # I=0, then 3; O=29, then 15 (with longer output)
1971        (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1972         'a----------------------------.' +
1973         'b----------------------------.' +
1974         'cde--------------------------.' +
1975         'abcdefghijabcde.' +
1976         'a.b------------.' +
1977         '.c.------------.' +
1978         'd.e------------.' +
1979         'k--------------.' +
1980         'l--------------.' +
1981         'm--------------.')
1982    ]
1983
1984    def test_decoder(self):
1985        # Try a few one-shot test cases.
1986        for input, eof, output in self.test_cases:
1987            d = StatefulIncrementalDecoder()
1988            self.assertEqual(d.decode(input, eof), output)
1989
1990        # Also test an unfinished decode, followed by forcing EOF.
1991        d = StatefulIncrementalDecoder()
1992        self.assertEqual(d.decode(b'oiabcd'), '')
1993        self.assertEqual(d.decode(b'', 1), 'abcd.')
1994
1995class TextIOWrapperTest(unittest.TestCase):
1996
1997    def setUp(self):
1998        self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1999        self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
2000        support.unlink(support.TESTFN)
2001
2002    def tearDown(self):
2003        support.unlink(support.TESTFN)
2004
2005    def test_constructor(self):
2006        r = self.BytesIO(b"\xc3\xa9\n\n")
2007        b = self.BufferedReader(r, 1000)
2008        t = self.TextIOWrapper(b)
2009        t.__init__(b, encoding="latin1", newline="\r\n")
2010        self.assertEqual(t.encoding, "latin1")
2011        self.assertEqual(t.line_buffering, False)
2012        t.__init__(b, encoding="utf8", line_buffering=True)
2013        self.assertEqual(t.encoding, "utf8")
2014        self.assertEqual(t.line_buffering, True)
2015        self.assertEqual("\xe9\n", t.readline())
2016        self.assertRaises(TypeError, t.__init__, b, newline=42)
2017        self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2018
2019    def test_uninitialized(self):
2020        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2021        del t
2022        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2023        self.assertRaises(Exception, repr, t)
2024        self.assertRaisesRegexp((ValueError, AttributeError),
2025                                'uninitialized|has no attribute',
2026                                t.read, 0)
2027        t.__init__(self.MockRawIO())
2028        self.assertEqual(t.read(0), u'')
2029
2030    def test_non_text_encoding_codecs_are_rejected(self):
2031        # Ensure the constructor complains if passed a codec that isn't
2032        # marked as a text encoding
2033        # http://bugs.python.org/issue20404
2034        r = self.BytesIO()
2035        b = self.BufferedWriter(r)
2036        with support.check_py3k_warnings():
2037            self.TextIOWrapper(b, encoding="hex_codec")
2038
2039    def test_detach(self):
2040        r = self.BytesIO()
2041        b = self.BufferedWriter(r)
2042        t = self.TextIOWrapper(b)
2043        self.assertIs(t.detach(), b)
2044
2045        t = self.TextIOWrapper(b, encoding="ascii")
2046        t.write("howdy")
2047        self.assertFalse(r.getvalue())
2048        t.detach()
2049        self.assertEqual(r.getvalue(), b"howdy")
2050        self.assertRaises(ValueError, t.detach)
2051
2052        # Operations independent of the detached stream should still work
2053        repr(t)
2054        self.assertEqual(t.encoding, "ascii")
2055        self.assertEqual(t.errors, "strict")
2056        self.assertFalse(t.line_buffering)
2057
2058    def test_repr(self):
2059        raw = self.BytesIO("hello".encode("utf-8"))
2060        b = self.BufferedReader(raw)
2061        t = self.TextIOWrapper(b, encoding="utf-8")
2062        modname = self.TextIOWrapper.__module__
2063        self.assertEqual(repr(t),
2064                         "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2065        raw.name = "dummy"
2066        self.assertEqual(repr(t),
2067                         "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
2068        raw.name = b"dummy"
2069        self.assertEqual(repr(t),
2070                         "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
2071
2072        t.buffer.detach()
2073        repr(t)  # Should not raise an exception
2074
2075    def test_line_buffering(self):
2076        r = self.BytesIO()
2077        b = self.BufferedWriter(r, 1000)
2078        t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
2079        t.write("X")
2080        self.assertEqual(r.getvalue(), b"")  # No flush happened
2081        t.write("Y\nZ")
2082        self.assertEqual(r.getvalue(), b"XY\nZ")  # All got flushed
2083        t.write("A\rB")
2084        self.assertEqual(r.getvalue(), b"XY\nZA\rB")
2085
2086    def test_encoding(self):
2087        # Check the encoding attribute is always set, and valid
2088        b = self.BytesIO()
2089        t = self.TextIOWrapper(b, encoding="utf8")
2090        self.assertEqual(t.encoding, "utf8")
2091        t = self.TextIOWrapper(b)
2092        self.assertIsNotNone(t.encoding)
2093        codecs.lookup(t.encoding)
2094
2095    def test_encoding_errors_reading(self):
2096        # (1) default
2097        b = self.BytesIO(b"abc\n\xff\n")
2098        t = self.TextIOWrapper(b, encoding="ascii")
2099        self.assertRaises(UnicodeError, t.read)
2100        # (2) explicit strict
2101        b = self.BytesIO(b"abc\n\xff\n")
2102        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2103        self.assertRaises(UnicodeError, t.read)
2104        # (3) ignore
2105        b = self.BytesIO(b"abc\n\xff\n")
2106        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
2107        self.assertEqual(t.read(), "abc\n\n")
2108        # (4) replace
2109        b = self.BytesIO(b"abc\n\xff\n")
2110        t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
2111        self.assertEqual(t.read(), "abc\n\ufffd\n")
2112
2113    def test_encoding_errors_writing(self):
2114        # (1) default
2115        b = self.BytesIO()
2116        t = self.TextIOWrapper(b, encoding="ascii")
2117        self.assertRaises(UnicodeError, t.write, "\xff")
2118        # (2) explicit strict
2119        b = self.BytesIO()
2120        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2121        self.assertRaises(UnicodeError, t.write, "\xff")
2122        # (3) ignore
2123        b = self.BytesIO()
2124        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
2125                             newline="\n")
2126        t.write("abc\xffdef\n")
2127        t.flush()
2128        self.assertEqual(b.getvalue(), b"abcdef\n")
2129        # (4) replace
2130        b = self.BytesIO()
2131        t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
2132                             newline="\n")
2133        t.write("abc\xffdef\n")
2134        t.flush()
2135        self.assertEqual(b.getvalue(), b"abc?def\n")
2136
2137    def test_newlines(self):
2138        input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2139
2140        tests = [
2141            [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
2142            [ '', input_lines ],
2143            [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2144            [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2145            [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
2146        ]
2147        encodings = (
2148            'utf-8', 'latin-1',
2149            'utf-16', 'utf-16-le', 'utf-16-be',
2150            'utf-32', 'utf-32-le', 'utf-32-be',
2151        )
2152
2153        # Try a range of buffer sizes to test the case where \r is the last
2154        # character in TextIOWrapper._pending_line.
2155        for encoding in encodings:
2156            # XXX: str.encode() should return bytes
2157            data = bytes(''.join(input_lines).encode(encoding))
2158            for do_reads in (False, True):
2159                for bufsize in range(1, 10):
2160                    for newline, exp_lines in tests:
2161                        bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2162                        textio = self.TextIOWrapper(bufio, newline=newline,
2163                                                  encoding=encoding)
2164                        if do_reads:
2165                            got_lines = []
2166                            while True:
2167                                c2 = textio.read(2)
2168                                if c2 == '':
2169                                    break
2170                                self.assertEqual(len(c2), 2)
2171                                got_lines.append(c2 + textio.readline())
2172                        else:
2173                            got_lines = list(textio)
2174
2175                        for got_line, exp_line in zip(got_lines, exp_lines):
2176                            self.assertEqual(got_line, exp_line)
2177                        self.assertEqual(len(got_lines), len(exp_lines))
2178
2179    def test_newlines_input(self):
2180        testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
2181        normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2182        for newline, expected in [
2183            (None, normalized.decode("ascii").splitlines(True)),
2184            ("", testdata.decode("ascii").splitlines(True)),
2185            ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2186            ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2187            ("\r",  ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
2188            ]:
2189            buf = self.BytesIO(testdata)
2190            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2191            self.assertEqual(txt.readlines(), expected)
2192            txt.seek(0)
2193            self.assertEqual(txt.read(), "".join(expected))
2194
2195    def test_newlines_output(self):
2196        testdict = {
2197            "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2198            "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2199            "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2200            "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2201            }
2202        tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2203        for newline, expected in tests:
2204            buf = self.BytesIO()
2205            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2206            txt.write("AAA\nB")
2207            txt.write("BB\nCCC\n")
2208            txt.write("X\rY\r\nZ")
2209            txt.flush()
2210            self.assertEqual(buf.closed, False)
2211            self.assertEqual(buf.getvalue(), expected)
2212
2213    def test_destructor(self):
2214        l = []
2215        base = self.BytesIO
2216        class MyBytesIO(base):
2217            def close(self):
2218                l.append(self.getvalue())
2219                base.close(self)
2220        b = MyBytesIO()
2221        t = self.TextIOWrapper(b, encoding="ascii")
2222        t.write("abc")
2223        del t
2224        support.gc_collect()
2225        self.assertEqual([b"abc"], l)
2226
2227    def test_override_destructor(self):
2228        record = []
2229        class MyTextIO(self.TextIOWrapper):
2230            def __del__(self):
2231                record.append(1)
2232                try:
2233                    f = super(MyTextIO, self).__del__
2234                except AttributeError:
2235                    pass
2236                else:
2237                    f()
2238            def close(self):
2239                record.append(2)
2240                super(MyTextIO, self).close()
2241            def flush(self):
2242                record.append(3)
2243                super(MyTextIO, self).flush()
2244        b = self.BytesIO()
2245        t = MyTextIO(b, encoding="ascii")
2246        del t
2247        support.gc_collect()
2248        self.assertEqual(record, [1, 2, 3])
2249
2250    def test_error_through_destructor(self):
2251        # Test that the exception state is not modified by a destructor,
2252        # even if close() fails.
2253        rawio = self.CloseFailureIO()
2254        def f():
2255            self.TextIOWrapper(rawio).xyzzy
2256        with support.captured_output("stderr") as s:
2257            self.assertRaises(AttributeError, f)
2258        s = s.getvalue().strip()
2259        if s:
2260            # The destructor *may* have printed an unraisable error, check it
2261            self.assertEqual(len(s.splitlines()), 1)
2262            self.assertTrue(s.startswith("Exception IOError: "), s)
2263            self.assertTrue(s.endswith(" ignored"), s)
2264
2265    # Systematic tests of the text I/O API
2266
2267    def test_basic_io(self):
2268        for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2269            for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
2270                f = self.open(support.TESTFN, "w+", encoding=enc)
2271                f._CHUNK_SIZE = chunksize
2272                self.assertEqual(f.write("abc"), 3)
2273                f.close()
2274                f = self.open(support.TESTFN, "r+", encoding=enc)
2275                f._CHUNK_SIZE = chunksize
2276                self.assertEqual(f.tell(), 0)
2277                self.assertEqual(f.read(), "abc")
2278                cookie = f.tell()
2279                self.assertEqual(f.seek(0), 0)
2280                self.assertEqual(f.read(None), "abc")
2281                f.seek(0)
2282                self.assertEqual(f.read(2), "ab")
2283                self.assertEqual(f.read(1), "c")
2284                self.assertEqual(f.read(1), "")
2285                self.assertEqual(f.read(), "")
2286                self.assertEqual(f.tell(), cookie)
2287                self.assertEqual(f.seek(0), 0)
2288                self.assertEqual(f.seek(0, 2), cookie)
2289                self.assertEqual(f.write("def"), 3)
2290                self.assertEqual(f.seek(cookie), cookie)
2291                self.assertEqual(f.read(), "def")
2292                if enc.startswith("utf"):
2293                    self.multi_line_test(f, enc)
2294                f.close()
2295
2296    def multi_line_test(self, f, enc):
2297        f.seek(0)
2298        f.truncate()
2299        sample = "s\xff\u0fff\uffff"
2300        wlines = []
2301        for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2302            chars = []
2303            for i in range(size):
2304                chars.append(sample[i % len(sample)])
2305            line = "".join(chars) + "\n"
2306            wlines.append((f.tell(), line))
2307            f.write(line)
2308        f.seek(0)
2309        rlines = []
2310        while True:
2311            pos = f.tell()
2312            line = f.readline()
2313            if not line:
2314                break
2315            rlines.append((pos, line))
2316        self.assertEqual(rlines, wlines)
2317
2318    def test_telling(self):
2319        f = self.open(support.TESTFN, "w+", encoding="utf8")
2320        p0 = f.tell()
2321        f.write("\xff\n")
2322        p1 = f.tell()
2323        f.write("\xff\n")
2324        p2 = f.tell()
2325        f.seek(0)
2326        self.assertEqual(f.tell(), p0)
2327        self.assertEqual(f.readline(), "\xff\n")
2328        self.assertEqual(f.tell(), p1)
2329        self.assertEqual(f.readline(), "\xff\n")
2330        self.assertEqual(f.tell(), p2)
2331        f.seek(0)
2332        for line in f:
2333            self.assertEqual(line, "\xff\n")
2334            self.assertRaises(IOError, f.tell)
2335        self.assertEqual(f.tell(), p2)
2336        f.close()
2337
2338    def test_seeking(self):
2339        chunk_size = _default_chunk_size()
2340        prefix_size = chunk_size - 2
2341        u_prefix = "a" * prefix_size
2342        prefix = bytes(u_prefix.encode("utf-8"))
2343        self.assertEqual(len(u_prefix), len(prefix))
2344        u_suffix = "\u8888\n"
2345        suffix = bytes(u_suffix.encode("utf-8"))
2346        line = prefix + suffix
2347        f = self.open(support.TESTFN, "wb")
2348        f.write(line*2)
2349        f.close()
2350        f = self.open(support.TESTFN, "r", encoding="utf-8")
2351        s = f.read(prefix_size)
2352        self.assertEqual(s, prefix.decode("ascii"))
2353        self.assertEqual(f.tell(), prefix_size)
2354        self.assertEqual(f.readline(), u_suffix)
2355
2356    def test_seeking_too(self):
2357        # Regression test for a specific bug
2358        data = b'\xe0\xbf\xbf\n'
2359        f = self.open(support.TESTFN, "wb")
2360        f.write(data)
2361        f.close()
2362        f = self.open(support.TESTFN, "r", encoding="utf-8")
2363        f._CHUNK_SIZE  # Just test that it exists
2364        f._CHUNK_SIZE = 2
2365        f.readline()
2366        f.tell()
2367
2368    def test_seek_and_tell(self):
2369        #Test seek/tell using the StatefulIncrementalDecoder.
2370        # Make test faster by doing smaller seeks
2371        CHUNK_SIZE = 128
2372
2373        def test_seek_and_tell_with_data(data, min_pos=0):
2374            """Tell/seek to various points within a data stream and ensure
2375            that the decoded data returned by read() is consistent."""
2376            f = self.open(support.TESTFN, 'wb')
2377            f.write(data)
2378            f.close()
2379            f = self.open(support.TESTFN, encoding='test_decoder')
2380            f._CHUNK_SIZE = CHUNK_SIZE
2381            decoded = f.read()
2382            f.close()
2383
2384            for i in range(min_pos, len(decoded) + 1): # seek positions
2385                for j in [1, 5, len(decoded) - i]: # read lengths
2386                    f = self.open(support.TESTFN, encoding='test_decoder')
2387                    self.assertEqual(f.read(i), decoded[:i])
2388                    cookie = f.tell()
2389                    self.assertEqual(f.read(j), decoded[i:i + j])
2390                    f.seek(cookie)
2391                    self.assertEqual(f.read(), decoded[i:])
2392                    f.close()
2393
2394        # Enable the test decoder.
2395        StatefulIncrementalDecoder.codecEnabled = 1
2396
2397        # Run the tests.
2398        try:
2399            # Try each test case.
2400            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2401                test_seek_and_tell_with_data(input)
2402
2403            # Position each test case so that it crosses a chunk boundary.
2404            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2405                offset = CHUNK_SIZE - len(input)//2
2406                prefix = b'.'*offset
2407                # Don't bother seeking into the prefix (takes too long).
2408                min_pos = offset*2
2409                test_seek_and_tell_with_data(prefix + input, min_pos)
2410
2411        # Ensure our test decoder won't interfere with subsequent tests.
2412        finally:
2413            StatefulIncrementalDecoder.codecEnabled = 0
2414
2415    def test_encoded_writes(self):
2416        data = "1234567890"
2417        tests = ("utf-16",
2418                 "utf-16-le",
2419                 "utf-16-be",
2420                 "utf-32",
2421                 "utf-32-le",
2422                 "utf-32-be")
2423        for encoding in tests:
2424            buf = self.BytesIO()
2425            f = self.TextIOWrapper(buf, encoding=encoding)
2426            # Check if the BOM is written only once (see issue1753).
2427            f.write(data)
2428            f.write(data)
2429            f.seek(0)
2430            self.assertEqual(f.read(), data * 2)
2431            f.seek(0)
2432            self.assertEqual(f.read(), data * 2)
2433            self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
2434
2435    def test_unreadable(self):
2436        class UnReadable(self.BytesIO):
2437            def readable(self):
2438                return False
2439        txt = self.TextIOWrapper(UnReadable())
2440        self.assertRaises(IOError, txt.read)
2441
2442    def test_read_one_by_one(self):
2443        txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
2444        reads = ""
2445        while True:
2446            c = txt.read(1)
2447            if not c:
2448                break
2449            reads += c
2450        self.assertEqual(reads, "AA\nBB")
2451
2452    def test_readlines(self):
2453        txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
2454        self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
2455        txt.seek(0)
2456        self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
2457        txt.seek(0)
2458        self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
2459
2460    # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
2461    def test_read_by_chunk(self):
2462        # make sure "\r\n" straddles 128 char boundary.
2463        txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
2464        reads = ""
2465        while True:
2466            c = txt.read(128)
2467            if not c:
2468                break
2469            reads += c
2470        self.assertEqual(reads, "A"*127+"\nB")
2471
2472    def test_writelines(self):
2473        l = ['ab', 'cd', 'ef']
2474        buf = self.BytesIO()
2475        txt = self.TextIOWrapper(buf)
2476        txt.writelines(l)
2477        txt.flush()
2478        self.assertEqual(buf.getvalue(), b'abcdef')
2479
2480    def test_writelines_userlist(self):
2481        l = UserList(['ab', 'cd', 'ef'])
2482        buf = self.BytesIO()
2483        txt = self.TextIOWrapper(buf)
2484        txt.writelines(l)
2485        txt.flush()
2486        self.assertEqual(buf.getvalue(), b'abcdef')
2487
2488    def test_writelines_error(self):
2489        txt = self.TextIOWrapper(self.BytesIO())
2490        self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
2491        self.assertRaises(TypeError, txt.writelines, None)
2492        self.assertRaises(TypeError, txt.writelines, b'abc')
2493
2494    def test_issue1395_1(self):
2495        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2496
2497        # read one char at a time
2498        reads = ""
2499        while True:
2500            c = txt.read(1)
2501            if not c:
2502                break
2503            reads += c
2504        self.assertEqual(reads, self.normalized)
2505
2506    def test_issue1395_2(self):
2507        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2508        txt._CHUNK_SIZE = 4
2509
2510        reads = ""
2511        while True:
2512            c = txt.read(4)
2513            if not c:
2514                break
2515            reads += c
2516        self.assertEqual(reads, self.normalized)
2517
2518    def test_issue1395_3(self):
2519        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2520        txt._CHUNK_SIZE = 4
2521
2522        reads = txt.read(4)
2523        reads += txt.read(4)
2524        reads += txt.readline()
2525        reads += txt.readline()
2526        reads += txt.readline()
2527        self.assertEqual(reads, self.normalized)
2528
2529    def test_issue1395_4(self):
2530        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2531        txt._CHUNK_SIZE = 4
2532
2533        reads = txt.read(4)
2534        reads += txt.read()
2535        self.assertEqual(reads, self.normalized)
2536
2537    def test_issue1395_5(self):
2538        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2539        txt._CHUNK_SIZE = 4
2540
2541        reads = txt.read(4)
2542        pos = txt.tell()
2543        txt.seek(0)
2544        txt.seek(pos)
2545        self.assertEqual(txt.read(4), "BBB\n")
2546
2547    def test_issue2282(self):
2548        buffer = self.BytesIO(self.testdata)
2549        txt = self.TextIOWrapper(buffer, encoding="ascii")
2550
2551        self.assertEqual(buffer.seekable(), txt.seekable())
2552
2553    def test_append_bom(self):
2554        # The BOM is not written again when appending to a non-empty file
2555        filename = support.TESTFN
2556        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2557            with self.open(filename, 'w', encoding=charset) as f:
2558                f.write('aaa')
2559                pos = f.tell()
2560            with self.open(filename, 'rb') as f:
2561                self.assertEqual(f.read(), 'aaa'.encode(charset))
2562
2563            with self.open(filename, 'a', encoding=charset) as f:
2564                f.write('xxx')
2565            with self.open(filename, 'rb') as f:
2566                self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
2567
2568    def test_seek_bom(self):
2569        # Same test, but when seeking manually
2570        filename = support.TESTFN
2571        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2572            with self.open(filename, 'w', encoding=charset) as f:
2573                f.write('aaa')
2574                pos = f.tell()
2575            with self.open(filename, 'r+', encoding=charset) as f:
2576                f.seek(pos)
2577                f.write('zzz')
2578                f.seek(0)
2579                f.write('bbb')
2580            with self.open(filename, 'rb') as f:
2581                self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
2582
2583    def test_errors_property(self):
2584        with self.open(support.TESTFN, "w") as f:
2585            self.assertEqual(f.errors, "strict")
2586        with self.open(support.TESTFN, "w", errors="replace") as f:
2587            self.assertEqual(f.errors, "replace")
2588
2589    @unittest.skipUnless(threading, 'Threading required for this test.')
2590    def test_threads_write(self):
2591        # Issue6750: concurrent writes could duplicate data
2592        event = threading.Event()
2593        with self.open(support.TESTFN, "w", buffering=1) as f:
2594            def run(n):
2595                text = "Thread%03d\n" % n
2596                event.wait()
2597                f.write(text)
2598            threads = [threading.Thread(target=run, args=(x,))
2599                       for x in range(20)]
2600            with support.start_threads(threads, event.set):
2601                time.sleep(0.02)
2602        with self.open(support.TESTFN) as f:
2603            content = f.read()
2604            for n in range(20):
2605                self.assertEqual(content.count("Thread%03d\n" % n), 1)
2606
2607    def test_flush_error_on_close(self):
2608        # Test that text file is closed despite failed flush
2609        # and that flush() is called before file closed.
2610        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2611        closed = []
2612        def bad_flush():
2613            closed[:] = [txt.closed, txt.buffer.closed]
2614            raise IOError()
2615        txt.flush = bad_flush
2616        self.assertRaises(IOError, txt.close) # exception not swallowed
2617        self.assertTrue(txt.closed)
2618        self.assertTrue(txt.buffer.closed)
2619        self.assertTrue(closed)      # flush() called
2620        self.assertFalse(closed[0])  # flush() called before file closed
2621        self.assertFalse(closed[1])
2622        txt.flush = lambda: None  # break reference loop
2623
2624    def test_multi_close(self):
2625        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2626        txt.close()
2627        txt.close()
2628        txt.close()
2629        self.assertRaises(ValueError, txt.flush)
2630
2631    def test_readonly_attributes(self):
2632        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
2633        buf = self.BytesIO(self.testdata)
2634        with self.assertRaises((AttributeError, TypeError)):
2635            txt.buffer = buf
2636
2637    def test_read_nonbytes(self):
2638        # Issue #17106
2639        # Crash when underlying read() returns non-bytes
2640        class NonbytesStream(self.StringIO):
2641            read1 = self.StringIO.read
2642        class NonbytesStream(self.StringIO):
2643            read1 = self.StringIO.read
2644        t = self.TextIOWrapper(NonbytesStream('a'))
2645        with self.maybeRaises(TypeError):
2646            t.read(1)
2647        t = self.TextIOWrapper(NonbytesStream('a'))
2648        with self.maybeRaises(TypeError):
2649            t.readline()
2650        t = self.TextIOWrapper(NonbytesStream('a'))
2651        self.assertEqual(t.read(), u'a')
2652
2653    def test_illegal_decoder(self):
2654        # Issue #17106
2655        # Bypass the early encoding check added in issue 20404
2656        def _make_illegal_wrapper():
2657            quopri = codecs.lookup("quopri_codec")
2658            quopri._is_text_encoding = True
2659            try:
2660                t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
2661                                       newline='\n', encoding="quopri_codec")
2662            finally:
2663                quopri._is_text_encoding = False
2664            return t
2665        # Crash when decoder returns non-string
2666        with support.check_py3k_warnings():
2667            t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2668                                   encoding='quopri_codec')
2669        with self.maybeRaises(TypeError):
2670            t.read(1)
2671        with support.check_py3k_warnings():
2672            t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2673                                   encoding='quopri_codec')
2674        with self.maybeRaises(TypeError):
2675            t.readline()
2676        with support.check_py3k_warnings():
2677            t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
2678                                   encoding='quopri_codec')
2679        with self.maybeRaises(TypeError):
2680            t.read()
2681        #else:
2682            #t = _make_illegal_wrapper()
2683            #self.assertRaises(TypeError, t.read, 1)
2684            #t = _make_illegal_wrapper()
2685            #self.assertRaises(TypeError, t.readline)
2686            #t = _make_illegal_wrapper()
2687            #self.assertRaises(TypeError, t.read)
2688
2689
2690class CTextIOWrapperTest(TextIOWrapperTest):
2691
2692    def test_initialization(self):
2693        r = self.BytesIO(b"\xc3\xa9\n\n")
2694        b = self.BufferedReader(r, 1000)
2695        t = self.TextIOWrapper(b)
2696        self.assertRaises(TypeError, t.__init__, b, newline=42)
2697        self.assertRaises(ValueError, t.read)
2698        self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2699        self.assertRaises(ValueError, t.read)
2700
2701        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2702        self.assertRaises(Exception, repr, t)
2703
2704    def test_garbage_collection(self):
2705        # C TextIOWrapper objects are collected, and collecting them flushes
2706        # all data to disk.
2707        # The Python version has __del__, so it ends in gc.garbage instead.
2708        rawio = io.FileIO(support.TESTFN, "wb")
2709        b = self.BufferedWriter(rawio)
2710        t = self.TextIOWrapper(b, encoding="ascii")
2711        t.write("456def")
2712        t.x = t
2713        wr = weakref.ref(t)
2714        del t
2715        support.gc_collect()
2716        self.assertIsNone(wr(), wr)
2717        with self.open(support.TESTFN, "rb") as f:
2718            self.assertEqual(f.read(), b"456def")
2719
2720    def test_rwpair_cleared_before_textio(self):
2721        # Issue 13070: TextIOWrapper's finalization would crash when called
2722        # after the reference to the underlying BufferedRWPair's writer got
2723        # cleared by the GC.
2724        for i in range(1000):
2725            b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2726            t1 = self.TextIOWrapper(b1, encoding="ascii")
2727            b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
2728            t2 = self.TextIOWrapper(b2, encoding="ascii")
2729            # circular references
2730            t1.buddy = t2
2731            t2.buddy = t1
2732        support.gc_collect()
2733
2734    maybeRaises = unittest.TestCase.assertRaises
2735
2736
2737class PyTextIOWrapperTest(TextIOWrapperTest):
2738    @contextlib.contextmanager
2739    def maybeRaises(self, *args, **kwds):
2740        yield
2741
2742
2743class IncrementalNewlineDecoderTest(unittest.TestCase):
2744
2745    def check_newline_decoding_utf8(self, decoder):
2746        # UTF-8 specific tests for a newline decoder
2747        def _check_decode(b, s, **kwargs):
2748            # We exercise getstate() / setstate() as well as decode()
2749            state = decoder.getstate()
2750            self.assertEqual(decoder.decode(b, **kwargs), s)
2751            decoder.setstate(state)
2752            self.assertEqual(decoder.decode(b, **kwargs), s)
2753
2754        _check_decode(b'\xe8\xa2\x88', "\u8888")
2755
2756        _check_decode(b'\xe8', "")
2757        _check_decode(b'\xa2', "")
2758        _check_decode(b'\x88', "\u8888")
2759
2760        _check_decode(b'\xe8', "")
2761        _check_decode(b'\xa2', "")
2762        _check_decode(b'\x88', "\u8888")
2763
2764        _check_decode(b'\xe8', "")
2765        self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2766
2767        decoder.reset()
2768        _check_decode(b'\n', "\n")
2769        _check_decode(b'\r', "")
2770        _check_decode(b'', "\n", final=True)
2771        _check_decode(b'\r', "\n", final=True)
2772
2773        _check_decode(b'\r', "")
2774        _check_decode(b'a', "\na")
2775
2776        _check_decode(b'\r\r\n', "\n\n")
2777        _check_decode(b'\r', "")
2778        _check_decode(b'\r', "\n")
2779        _check_decode(b'\na', "\na")
2780
2781        _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2782        _check_decode(b'\xe8\xa2\x88', "\u8888")
2783        _check_decode(b'\n', "\n")
2784        _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2785        _check_decode(b'\n', "\n")
2786
2787    def check_newline_decoding(self, decoder, encoding):
2788        result = []
2789        if encoding is not None:
2790            encoder = codecs.getincrementalencoder(encoding)()
2791            def _decode_bytewise(s):
2792                # Decode one byte at a time
2793                for b in encoder.encode(s):
2794                    result.append(decoder.decode(b))
2795        else:
2796            encoder = None
2797            def _decode_bytewise(s):
2798                # Decode one char at a time
2799                for c in s:
2800                    result.append(decoder.decode(c))
2801        self.assertEqual(decoder.newlines, None)
2802        _decode_bytewise("abc\n\r")
2803        self.assertEqual(decoder.newlines, '\n')
2804        _decode_bytewise("\nabc")
2805        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
2806        _decode_bytewise("abc\r")
2807        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
2808        _decode_bytewise("abc")
2809        self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
2810        _decode_bytewise("abc\r")
2811        self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
2812        decoder.reset()
2813        input = "abc"
2814        if encoder is not None:
2815            encoder.reset()
2816            input = encoder.encode(input)
2817        self.assertEqual(decoder.decode(input), "abc")
2818        self.assertEqual(decoder.newlines, None)
2819
2820    def test_newline_decoder(self):
2821        encodings = (
2822            # None meaning the IncrementalNewlineDecoder takes unicode input
2823            # rather than bytes input
2824            None, 'utf-8', 'latin-1',
2825            'utf-16', 'utf-16-le', 'utf-16-be',
2826            'utf-32', 'utf-32-le', 'utf-32-be',
2827        )
2828        for enc in encodings:
2829            decoder = enc and codecs.getincrementaldecoder(enc)()
2830            decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2831            self.check_newline_decoding(decoder, enc)
2832        decoder = codecs.getincrementaldecoder("utf-8")()
2833        decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2834        self.check_newline_decoding_utf8(decoder)
2835
2836    def test_newline_bytes(self):
2837        # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2838        def _check(dec):
2839            self.assertEqual(dec.newlines, None)
2840            self.assertEqual(dec.decode("\u0D00"), "\u0D00")
2841            self.assertEqual(dec.newlines, None)
2842            self.assertEqual(dec.decode("\u0A00"), "\u0A00")
2843            self.assertEqual(dec.newlines, None)
2844        dec = self.IncrementalNewlineDecoder(None, translate=False)
2845        _check(dec)
2846        dec = self.IncrementalNewlineDecoder(None, translate=True)
2847        _check(dec)
2848
2849class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2850    pass
2851
2852class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2853    pass
2854
2855
2856# XXX Tests for open()
2857
2858class MiscIOTest(unittest.TestCase):
2859
2860    def tearDown(self):
2861        support.unlink(support.TESTFN)
2862
2863    def test___all__(self):
2864        for name in self.io.__all__:
2865            obj = getattr(self.io, name, None)
2866            self.assertIsNotNone(obj, name)
2867            if name == "open":
2868                continue
2869            elif "error" in name.lower() or name == "UnsupportedOperation":
2870                self.assertTrue(issubclass(obj, Exception), name)
2871            elif not name.startswith("SEEK_"):
2872                self.assertTrue(issubclass(obj, self.IOBase))
2873
2874    def test_attributes(self):
2875        f = self.open(support.TESTFN, "wb", buffering=0)
2876        self.assertEqual(f.mode, "wb")
2877        f.close()
2878
2879        f = self.open(support.TESTFN, "U")
2880        self.assertEqual(f.name,            support.TESTFN)
2881        self.assertEqual(f.buffer.name,     support.TESTFN)
2882        self.assertEqual(f.buffer.raw.name, support.TESTFN)
2883        self.assertEqual(f.mode,            "U")
2884        self.assertEqual(f.buffer.mode,     "rb")
2885        self.assertEqual(f.buffer.raw.mode, "rb")
2886        f.close()
2887
2888        f = self.open(support.TESTFN, "w+")
2889        self.assertEqual(f.mode,            "w+")
2890        self.assertEqual(f.buffer.mode,     "rb+") # Does it really matter?
2891        self.assertEqual(f.buffer.raw.mode, "rb+")
2892
2893        g = self.open(f.fileno(), "wb", closefd=False)
2894        self.assertEqual(g.mode,     "wb")
2895        self.assertEqual(g.raw.mode, "wb")
2896        self.assertEqual(g.name,     f.fileno())
2897        self.assertEqual(g.raw.name, f.fileno())
2898        f.close()
2899        g.close()
2900
2901    def test_io_after_close(self):
2902        for kwargs in [
2903                {"mode": "w"},
2904                {"mode": "wb"},
2905                {"mode": "w", "buffering": 1},
2906                {"mode": "w", "buffering": 2},
2907                {"mode": "wb", "buffering": 0},
2908                {"mode": "r"},
2909                {"mode": "rb"},
2910                {"mode": "r", "buffering": 1},
2911                {"mode": "r", "buffering": 2},
2912                {"mode": "rb", "buffering": 0},
2913                {"mode": "w+"},
2914                {"mode": "w+b"},
2915                {"mode": "w+", "buffering": 1},
2916                {"mode": "w+", "buffering": 2},
2917                {"mode": "w+b", "buffering": 0},
2918            ]:
2919            f = self.open(support.TESTFN, **kwargs)
2920            f.close()
2921            self.assertRaises(ValueError, f.flush)
2922            self.assertRaises(ValueError, f.fileno)
2923            self.assertRaises(ValueError, f.isatty)
2924            self.assertRaises(ValueError, f.__iter__)
2925            if hasattr(f, "peek"):
2926                self.assertRaises(ValueError, f.peek, 1)
2927            self.assertRaises(ValueError, f.read)
2928            if hasattr(f, "read1"):
2929                self.assertRaises(ValueError, f.read1, 1024)
2930            if hasattr(f, "readall"):
2931                self.assertRaises(ValueError, f.readall)
2932            if hasattr(f, "readinto"):
2933                self.assertRaises(ValueError, f.readinto, bytearray(1024))
2934            self.assertRaises(ValueError, f.readline)
2935            self.assertRaises(ValueError, f.readlines)
2936            self.assertRaises(ValueError, f.seek, 0)
2937            self.assertRaises(ValueError, f.tell)
2938            self.assertRaises(ValueError, f.truncate)
2939            self.assertRaises(ValueError, f.write,
2940                              b"" if "b" in kwargs['mode'] else "")
2941            self.assertRaises(ValueError, f.writelines, [])
2942            self.assertRaises(ValueError, next, f)
2943
2944    def test_blockingioerror(self):
2945        # Various BlockingIOError issues
2946        self.assertRaises(TypeError, self.BlockingIOError)
2947        self.assertRaises(TypeError, self.BlockingIOError, 1)
2948        self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2949        self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2950        b = self.BlockingIOError(1, "")
2951        self.assertEqual(b.characters_written, 0)
2952        class C(unicode):
2953            pass
2954        c = C("")
2955        b = self.BlockingIOError(1, c)
2956        c.b = b
2957        b.c = c
2958        wr = weakref.ref(c)
2959        del c, b
2960        support.gc_collect()
2961        self.assertIsNone(wr(), wr)
2962
2963    def test_abcs(self):
2964        # Test the visible base classes are ABCs.
2965        self.assertIsInstance(self.IOBase, abc.ABCMeta)
2966        self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
2967        self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
2968        self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
2969
2970    def _check_abc_inheritance(self, abcmodule):
2971        with self.open(support.TESTFN, "wb", buffering=0) as f:
2972            self.assertIsInstance(f, abcmodule.IOBase)
2973            self.assertIsInstance(f, abcmodule.RawIOBase)
2974            self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2975            self.assertNotIsInstance(f, abcmodule.TextIOBase)
2976        with self.open(support.TESTFN, "wb") as f:
2977            self.assertIsInstance(f, abcmodule.IOBase)
2978            self.assertNotIsInstance(f, abcmodule.RawIOBase)
2979            self.assertIsInstance(f, abcmodule.BufferedIOBase)
2980            self.assertNotIsInstance(f, abcmodule.TextIOBase)
2981        with self.open(support.TESTFN, "w") as f:
2982            self.assertIsInstance(f, abcmodule.IOBase)
2983            self.assertNotIsInstance(f, abcmodule.RawIOBase)
2984            self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
2985            self.assertIsInstance(f, abcmodule.TextIOBase)
2986
2987    def test_abc_inheritance(self):
2988        # Test implementations inherit from their respective ABCs
2989        self._check_abc_inheritance(self)
2990
2991    def test_abc_inheritance_official(self):
2992        # Test implementations inherit from the official ABCs of the
2993        # baseline "io" module.
2994        self._check_abc_inheritance(io)
2995
2996    @unittest.skipUnless(fcntl, 'fcntl required for this test')
2997    def test_nonblock_pipe_write_bigbuf(self):
2998        self._test_nonblock_pipe_write(16*1024)
2999
3000    @unittest.skipUnless(fcntl, 'fcntl required for this test')
3001    def test_nonblock_pipe_write_smallbuf(self):
3002        self._test_nonblock_pipe_write(1024)
3003
3004    def _set_non_blocking(self, fd):
3005        flags = fcntl.fcntl(fd, fcntl.F_GETFL)
3006        self.assertNotEqual(flags, -1)
3007        res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
3008        self.assertEqual(res, 0)
3009
3010    def _test_nonblock_pipe_write(self, bufsize):
3011        sent = []
3012        received = []
3013        r, w = os.pipe()
3014        self._set_non_blocking(r)
3015        self._set_non_blocking(w)
3016
3017        # To exercise all code paths in the C implementation we need
3018        # to play with buffer sizes.  For instance, if we choose a
3019        # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3020        # then we will never get a partial write of the buffer.
3021        rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
3022        wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
3023
3024        with rf, wf:
3025            for N in 9999, 73, 7574:
3026                try:
3027                    i = 0
3028                    while True:
3029                        msg = bytes([i % 26 + 97]) * N
3030                        sent.append(msg)
3031                        wf.write(msg)
3032                        i += 1
3033
3034                except self.BlockingIOError as e:
3035                    self.assertEqual(e.args[0], errno.EAGAIN)
3036                    sent[-1] = sent[-1][:e.characters_written]
3037                    received.append(rf.read())
3038                    msg = b'BLOCKED'
3039                    wf.write(msg)
3040                    sent.append(msg)
3041
3042            while True:
3043                try:
3044                    wf.flush()
3045                    break
3046                except self.BlockingIOError as e:
3047                    self.assertEqual(e.args[0], errno.EAGAIN)
3048                    self.assertEqual(e.characters_written, 0)
3049                    received.append(rf.read())
3050
3051            received += iter(rf.read, None)
3052
3053        sent, received = b''.join(sent), b''.join(received)
3054        self.assertEqual(sent, received)
3055        self.assertTrue(wf.closed)
3056        self.assertTrue(rf.closed)
3057
3058class CMiscIOTest(MiscIOTest):
3059    io = io
3060    shutdown_error = "RuntimeError: could not find io module state"
3061
3062class PyMiscIOTest(MiscIOTest):
3063    io = pyio
3064    shutdown_error = "LookupError: unknown encoding: ascii"
3065
3066
3067@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
3068class SignalsTest(unittest.TestCase):
3069
3070    def setUp(self):
3071        self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
3072
3073    def tearDown(self):
3074        signal.signal(signal.SIGALRM, self.oldalrm)
3075
3076    def alarm_interrupt(self, sig, frame):
3077        1 // 0
3078
3079    @unittest.skipUnless(threading, 'Threading required for this test.')
3080    @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
3081                     'issue #12429: skip test on FreeBSD <= 7')
3082    def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
3083        """Check that a partial write, when it gets interrupted, properly
3084        invokes the signal handler, and bubbles up the exception raised
3085        in the latter."""
3086        read_results = []
3087        def _read():
3088            s = os.read(r, 1)
3089            read_results.append(s)
3090        t = threading.Thread(target=_read)
3091        t.daemon = True
3092        r, w = os.pipe()
3093        try:
3094            wio = self.io.open(w, **fdopen_kwargs)
3095            t.start()
3096            signal.alarm(1)
3097            # Fill the pipe enough that the write will be blocking.
3098            # It will be interrupted by the timer armed above.  Since the
3099            # other thread has read one byte, the low-level write will
3100            # return with a successful (partial) result rather than an EINTR.
3101            # The buffered IO layer must check for pending signal
3102            # handlers, which in this case will invoke alarm_interrupt().
3103            try:
3104                with self.assertRaises(ZeroDivisionError):
3105                    wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1))
3106            finally:
3107                t.join()
3108            # We got one byte, get another one and check that it isn't a
3109            # repeat of the first one.
3110            read_results.append(os.read(r, 1))
3111            self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
3112        finally:
3113            os.close(w)
3114            os.close(r)
3115            # This is deliberate. If we didn't close the file descriptor
3116            # before closing wio, wio would try to flush its internal
3117            # buffer, and block again.
3118            try:
3119                wio.close()
3120            except IOError as e:
3121                if e.errno != errno.EBADF:
3122                    raise
3123
3124    def test_interrupted_write_unbuffered(self):
3125        self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
3126
3127    def test_interrupted_write_buffered(self):
3128        self.check_interrupted_write(b"xy", b"xy", mode="wb")
3129
3130    def test_interrupted_write_text(self):
3131        self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
3132
3133    def check_reentrant_write(self, data, **fdopen_kwargs):
3134        def on_alarm(*args):
3135            # Will be called reentrantly from the same thread
3136            wio.write(data)
3137            1//0
3138        signal.signal(signal.SIGALRM, on_alarm)
3139        r, w = os.pipe()
3140        wio = self.io.open(w, **fdopen_kwargs)
3141        try:
3142            signal.alarm(1)
3143            # Either the reentrant call to wio.write() fails with RuntimeError,
3144            # or the signal handler raises ZeroDivisionError.
3145            with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
3146                while 1:
3147                    for i in range(100):
3148                        wio.write(data)
3149                        wio.flush()
3150                    # Make sure the buffer doesn't fill up and block further writes
3151                    os.read(r, len(data) * 100)
3152            exc = cm.exception
3153            if isinstance(exc, RuntimeError):
3154                self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
3155        finally:
3156            wio.close()
3157            os.close(r)
3158
3159    def test_reentrant_write_buffered(self):
3160        self.check_reentrant_write(b"xy", mode="wb")
3161
3162    def test_reentrant_write_text(self):
3163        self.check_reentrant_write("xy", mode="w", encoding="ascii")
3164
3165    def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
3166        """Check that a buffered read, when it gets interrupted (either
3167        returning a partial result or EINTR), properly invokes the signal
3168        handler and retries if the latter returned successfully."""
3169        r, w = os.pipe()
3170        fdopen_kwargs["closefd"] = False
3171        def alarm_handler(sig, frame):
3172            os.write(w, b"bar")
3173        signal.signal(signal.SIGALRM, alarm_handler)
3174        try:
3175            rio = self.io.open(r, **fdopen_kwargs)
3176            os.write(w, b"foo")
3177            signal.alarm(1)
3178            # Expected behaviour:
3179            # - first raw read() returns partial b"foo"
3180            # - second raw read() returns EINTR
3181            # - third raw read() returns b"bar"
3182            self.assertEqual(decode(rio.read(6)), "foobar")
3183        finally:
3184            rio.close()
3185            os.close(w)
3186            os.close(r)
3187
3188    def test_interrupterd_read_retry_buffered(self):
3189        self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
3190                                          mode="rb")
3191
3192    def test_interrupterd_read_retry_text(self):
3193        self.check_interrupted_read_retry(lambda x: x,
3194                                          mode="r")
3195
3196    @unittest.skipUnless(threading, 'Threading required for this test.')
3197    def check_interrupted_write_retry(self, item, **fdopen_kwargs):
3198        """Check that a buffered write, when it gets interrupted (either
3199        returning a partial result or EINTR), properly invokes the signal
3200        handler and retries if the latter returned successfully."""
3201        select = support.import_module("select")
3202        # A quantity that exceeds the buffer size of an anonymous pipe's
3203        # write end.
3204        N = support.PIPE_MAX_SIZE
3205        r, w = os.pipe()
3206        fdopen_kwargs["closefd"] = False
3207        # We need a separate thread to read from the pipe and allow the
3208        # write() to finish.  This thread is started after the SIGALRM is
3209        # received (forcing a first EINTR in write()).
3210        read_results = []
3211        write_finished = False
3212        error = [None]
3213        def _read():
3214            try:
3215                while not write_finished:
3216                    while r in select.select([r], [], [], 1.0)[0]:
3217                        s = os.read(r, 1024)
3218                        read_results.append(s)
3219            except BaseException as exc:
3220                error[0] = exc
3221        t = threading.Thread(target=_read)
3222        t.daemon = True
3223        def alarm1(sig, frame):
3224            signal.signal(signal.SIGALRM, alarm2)
3225            signal.alarm(1)
3226        def alarm2(sig, frame):
3227            t.start()
3228        signal.signal(signal.SIGALRM, alarm1)
3229        try:
3230            wio = self.io.open(w, **fdopen_kwargs)
3231            signal.alarm(1)
3232            # Expected behaviour:
3233            # - first raw write() is partial (because of the limited pipe buffer
3234            #   and the first alarm)
3235            # - second raw write() returns EINTR (because of the second alarm)
3236            # - subsequent write()s are successful (either partial or complete)
3237            self.assertEqual(N, wio.write(item * N))
3238            wio.flush()
3239            write_finished = True
3240            t.join()
3241
3242            self.assertIsNone(error[0])
3243            self.assertEqual(N, sum(len(x) for x in read_results))
3244        finally:
3245            write_finished = True
3246            os.close(w)
3247            os.close(r)
3248            # This is deliberate. If we didn't close the file descriptor
3249            # before closing wio, wio would try to flush its internal
3250            # buffer, and could block (in case of failure).
3251            try:
3252                wio.close()
3253            except IOError as e:
3254                if e.errno != errno.EBADF:
3255                    raise
3256
3257    def test_interrupterd_write_retry_buffered(self):
3258        self.check_interrupted_write_retry(b"x", mode="wb")
3259
3260    def test_interrupterd_write_retry_text(self):
3261        self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
3262
3263
3264class CSignalsTest(SignalsTest):
3265    io = io
3266
3267class PySignalsTest(SignalsTest):
3268    io = pyio
3269
3270    # Handling reentrancy issues would slow down _pyio even more, so the
3271    # tests are disabled.
3272    test_reentrant_write_buffered = None
3273    test_reentrant_write_text = None
3274
3275
3276def test_main():
3277    tests = (CIOTest, PyIOTest,
3278             CBufferedReaderTest, PyBufferedReaderTest,
3279             CBufferedWriterTest, PyBufferedWriterTest,
3280             CBufferedRWPairTest, PyBufferedRWPairTest,
3281             CBufferedRandomTest, PyBufferedRandomTest,
3282             StatefulIncrementalDecoderTest,
3283             CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
3284             CTextIOWrapperTest, PyTextIOWrapperTest,
3285             CMiscIOTest, PyMiscIOTest,
3286             CSignalsTest, PySignalsTest,
3287             )
3288
3289    # Put the namespaces of the IO module we are testing and some useful mock
3290    # classes in the __dict__ of each test.
3291    mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
3292             MockNonBlockWriterIO, MockRawIOWithoutRead)
3293    all_members = io.__all__ + ["IncrementalNewlineDecoder"]
3294    c_io_ns = dict((name, getattr(io, name)) for name in all_members)
3295    py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
3296    globs = globals()
3297    c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
3298    py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
3299    # Avoid turning open into a bound method.
3300    py_io_ns["open"] = pyio.OpenWrapper
3301    for test in tests:
3302        if test.__name__.startswith("C"):
3303            for name, obj in c_io_ns.items():
3304                setattr(test, name, obj)
3305        elif test.__name__.startswith("Py"):
3306            for name, obj in py_io_ns.items():
3307                setattr(test, name, obj)
3308
3309    support.run_unittest(*tests)
3310
3311if __name__ == "__main__":
3312    test_main()
3313