1"""Unit tests for the io module.""" 2 3# Tests of io are scattered over the test suite: 4# * test_bufio - tests file buffering 5# * test_memoryio - tests BytesIO and StringIO 6# * test_fileio - tests FileIO 7# * test_file - tests the file interface 8# * test_io - tests everything else in the io module 9# * test_univnewlines - tests universal newline support 10# * test_largefile - tests operations on a file greater than 2**32 bytes 11# (only enabled with -ulargefile) 12 13################################################################################ 14# ATTENTION TEST WRITERS!!! 15################################################################################ 16# When writing tests for io, it's important to test both the C and Python 17# implementations. This is usually done by writing a base test that refers to 18# the type it is testing as an attribute. Then it provides custom subclasses to 19# test both implementations. This file has lots of examples. 20################################################################################ 21 22from __future__ import print_function 23from __future__ import unicode_literals 24 25import os 26import sys 27import time 28import array 29import random 30import unittest 31import weakref 32import warnings 33import abc 34import signal 35import errno 36from itertools import cycle, count 37from collections import deque 38from UserList import UserList 39from test import test_support as support 40import contextlib 41 42import codecs 43import io # C implementation of io 44import _pyio as pyio # Python implementation of io 45try: 46 import threading 47except ImportError: 48 threading = None 49try: 50 import fcntl 51except ImportError: 52 fcntl = None 53 54__metaclass__ = type 55bytes = support.py3k_bytes 56 57def byteslike(*pos, **kw): 58 return memoryview(bytearray(*pos, **kw)) 59 60def _default_chunk_size(): 61 """Get the default TextIOWrapper chunk size""" 62 with io.open(__file__, "r", encoding="latin1") as f: 63 return f._CHUNK_SIZE 64 65 66class MockRawIOWithoutRead: 67 """A RawIO implementation without read(), so as to exercise the default 68 RawIO.read() which calls readinto().""" 69 70 def __init__(self, read_stack=()): 71 self._read_stack = list(read_stack) 72 self._write_stack = [] 73 self._reads = 0 74 self._extraneous_reads = 0 75 76 def write(self, b): 77 self._write_stack.append(bytes(b)) 78 return len(b) 79 80 def writable(self): 81 return True 82 83 def fileno(self): 84 return 42 85 86 def readable(self): 87 return True 88 89 def seekable(self): 90 return True 91 92 def seek(self, pos, whence): 93 return 0 # wrong but we gotta return something 94 95 def tell(self): 96 return 0 # same comment as above 97 98 def readinto(self, buf): 99 self._reads += 1 100 max_len = len(buf) 101 try: 102 data = self._read_stack[0] 103 except IndexError: 104 self._extraneous_reads += 1 105 return 0 106 if data is None: 107 del self._read_stack[0] 108 return None 109 n = len(data) 110 if len(data) <= max_len: 111 del self._read_stack[0] 112 buf[:n] = data 113 return n 114 else: 115 buf[:] = data[:max_len] 116 self._read_stack[0] = data[max_len:] 117 return max_len 118 119 def truncate(self, pos=None): 120 return pos 121 122class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase): 123 pass 124 125class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase): 126 pass 127 128 129class MockRawIO(MockRawIOWithoutRead): 130 131 def read(self, n=None): 132 self._reads += 1 133 try: 134 return self._read_stack.pop(0) 135 except: 136 self._extraneous_reads += 1 137 return b"" 138 139class CMockRawIO(MockRawIO, io.RawIOBase): 140 pass 141 142class PyMockRawIO(MockRawIO, pyio.RawIOBase): 143 pass 144 145 146class MisbehavedRawIO(MockRawIO): 147 def write(self, b): 148 return MockRawIO.write(self, b) * 2 149 150 def read(self, n=None): 151 return MockRawIO.read(self, n) * 2 152 153 def seek(self, pos, whence): 154 return -123 155 156 def tell(self): 157 return -456 158 159 def readinto(self, buf): 160 MockRawIO.readinto(self, buf) 161 return len(buf) * 5 162 163class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase): 164 pass 165 166class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase): 167 pass 168 169 170class CloseFailureIO(MockRawIO): 171 closed = 0 172 173 def close(self): 174 if not self.closed: 175 self.closed = 1 176 raise IOError 177 178class CCloseFailureIO(CloseFailureIO, io.RawIOBase): 179 pass 180 181class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase): 182 pass 183 184 185class MockFileIO: 186 187 def __init__(self, data): 188 self.read_history = [] 189 super(MockFileIO, self).__init__(data) 190 191 def read(self, n=None): 192 res = super(MockFileIO, self).read(n) 193 self.read_history.append(None if res is None else len(res)) 194 return res 195 196 def readinto(self, b): 197 res = super(MockFileIO, self).readinto(b) 198 self.read_history.append(res) 199 return res 200 201class CMockFileIO(MockFileIO, io.BytesIO): 202 pass 203 204class PyMockFileIO(MockFileIO, pyio.BytesIO): 205 pass 206 207 208class MockNonBlockWriterIO: 209 210 def __init__(self): 211 self._write_stack = [] 212 self._blocker_char = None 213 214 def pop_written(self): 215 s = b"".join(self._write_stack) 216 self._write_stack[:] = [] 217 return s 218 219 def block_on(self, char): 220 """Block when a given char is encountered.""" 221 self._blocker_char = char 222 223 def readable(self): 224 return True 225 226 def seekable(self): 227 return True 228 229 def writable(self): 230 return True 231 232 def write(self, b): 233 b = bytes(b) 234 n = -1 235 if self._blocker_char: 236 try: 237 n = b.index(self._blocker_char) 238 except ValueError: 239 pass 240 else: 241 if n > 0: 242 # write data up to the first blocker 243 self._write_stack.append(b[:n]) 244 return n 245 else: 246 # cancel blocker and indicate would block 247 self._blocker_char = None 248 return None 249 self._write_stack.append(b) 250 return len(b) 251 252class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase): 253 BlockingIOError = io.BlockingIOError 254 255class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase): 256 BlockingIOError = pyio.BlockingIOError 257 258 259class IOTest(unittest.TestCase): 260 261 def setUp(self): 262 support.unlink(support.TESTFN) 263 264 def tearDown(self): 265 support.unlink(support.TESTFN) 266 267 def write_ops(self, f): 268 self.assertEqual(f.write(b"blah."), 5) 269 f.truncate(0) 270 self.assertEqual(f.tell(), 5) 271 f.seek(0) 272 273 self.assertEqual(f.write(b"blah."), 5) 274 self.assertEqual(f.seek(0), 0) 275 self.assertEqual(f.write(b"Hello."), 6) 276 self.assertEqual(f.tell(), 6) 277 self.assertEqual(f.seek(-1, 1), 5) 278 self.assertEqual(f.tell(), 5) 279 buffer = bytearray(b" world\n\n\n") 280 self.assertEqual(f.write(buffer), 9) 281 buffer[:] = b"*" * 9 # Overwrite our copy of the data 282 self.assertEqual(f.seek(0), 0) 283 self.assertEqual(f.write(b"h"), 1) 284 self.assertEqual(f.seek(-1, 2), 13) 285 self.assertEqual(f.tell(), 13) 286 287 self.assertEqual(f.truncate(12), 12) 288 self.assertEqual(f.tell(), 13) 289 self.assertRaises(TypeError, f.seek, 0.0) 290 291 def read_ops(self, f, buffered=False): 292 data = f.read(5) 293 self.assertEqual(data, b"hello") 294 data = byteslike(data) 295 self.assertEqual(f.readinto(data), 5) 296 self.assertEqual(data.tobytes(), b" worl") 297 data = bytearray(5) 298 self.assertEqual(f.readinto(data), 2) 299 self.assertEqual(len(data), 5) 300 self.assertEqual(data[:2], b"d\n") 301 self.assertEqual(f.seek(0), 0) 302 self.assertEqual(f.read(20), b"hello world\n") 303 self.assertEqual(f.read(1), b"") 304 self.assertEqual(f.readinto(byteslike(b"x")), 0) 305 self.assertEqual(f.seek(-6, 2), 6) 306 self.assertEqual(f.read(5), b"world") 307 self.assertEqual(f.read(0), b"") 308 self.assertEqual(f.readinto(byteslike()), 0) 309 self.assertEqual(f.seek(-6, 1), 5) 310 self.assertEqual(f.read(5), b" worl") 311 self.assertEqual(f.tell(), 10) 312 self.assertRaises(TypeError, f.seek, 0.0) 313 if buffered: 314 f.seek(0) 315 self.assertEqual(f.read(), b"hello world\n") 316 f.seek(6) 317 self.assertEqual(f.read(), b"world\n") 318 self.assertEqual(f.read(), b"") 319 320 LARGE = 2**31 321 322 def large_file_ops(self, f): 323 assert f.readable() 324 assert f.writable() 325 self.assertEqual(f.seek(self.LARGE), self.LARGE) 326 self.assertEqual(f.tell(), self.LARGE) 327 self.assertEqual(f.write(b"xxx"), 3) 328 self.assertEqual(f.tell(), self.LARGE + 3) 329 self.assertEqual(f.seek(-1, 1), self.LARGE + 2) 330 self.assertEqual(f.truncate(), self.LARGE + 2) 331 self.assertEqual(f.tell(), self.LARGE + 2) 332 self.assertEqual(f.seek(0, 2), self.LARGE + 2) 333 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1) 334 self.assertEqual(f.tell(), self.LARGE + 2) 335 self.assertEqual(f.seek(0, 2), self.LARGE + 1) 336 self.assertEqual(f.seek(-1, 2), self.LARGE) 337 self.assertEqual(f.read(2), b"x") 338 339 def test_invalid_operations(self): 340 # Try writing on a file opened in read mode and vice-versa. 341 for mode in ("w", "wb"): 342 with self.open(support.TESTFN, mode) as fp: 343 self.assertRaises(IOError, fp.read) 344 self.assertRaises(IOError, fp.readline) 345 with self.open(support.TESTFN, "rb") as fp: 346 self.assertRaises(IOError, fp.write, b"blah") 347 self.assertRaises(IOError, fp.writelines, [b"blah\n"]) 348 with self.open(support.TESTFN, "r") as fp: 349 self.assertRaises(IOError, fp.write, "blah") 350 self.assertRaises(IOError, fp.writelines, ["blah\n"]) 351 352 def test_open_handles_NUL_chars(self): 353 fn_with_NUL = 'foo\0bar' 354 self.assertRaises(TypeError, self.open, fn_with_NUL, 'w') 355 356 bytes_fn = fn_with_NUL.encode('ascii') 357 with warnings.catch_warnings(): 358 warnings.simplefilter("ignore", DeprecationWarning) 359 self.assertRaises(TypeError, self.open, bytes_fn, 'w') 360 361 def test_raw_file_io(self): 362 with self.open(support.TESTFN, "wb", buffering=0) as f: 363 self.assertEqual(f.readable(), False) 364 self.assertEqual(f.writable(), True) 365 self.assertEqual(f.seekable(), True) 366 self.write_ops(f) 367 with self.open(support.TESTFN, "rb", buffering=0) as f: 368 self.assertEqual(f.readable(), True) 369 self.assertEqual(f.writable(), False) 370 self.assertEqual(f.seekable(), True) 371 self.read_ops(f) 372 373 def test_buffered_file_io(self): 374 with self.open(support.TESTFN, "wb") as f: 375 self.assertEqual(f.readable(), False) 376 self.assertEqual(f.writable(), True) 377 self.assertEqual(f.seekable(), True) 378 self.write_ops(f) 379 with self.open(support.TESTFN, "rb") as f: 380 self.assertEqual(f.readable(), True) 381 self.assertEqual(f.writable(), False) 382 self.assertEqual(f.seekable(), True) 383 self.read_ops(f, True) 384 385 def test_readline(self): 386 with self.open(support.TESTFN, "wb") as f: 387 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line") 388 with self.open(support.TESTFN, "rb") as f: 389 self.assertEqual(f.readline(), b"abc\n") 390 self.assertEqual(f.readline(10), b"def\n") 391 self.assertEqual(f.readline(2), b"xy") 392 self.assertEqual(f.readline(4), b"zzy\n") 393 self.assertEqual(f.readline(), b"foo\x00bar\n") 394 self.assertEqual(f.readline(None), b"another line") 395 self.assertRaises(TypeError, f.readline, 5.3) 396 with self.open(support.TESTFN, "r") as f: 397 self.assertRaises(TypeError, f.readline, 5.3) 398 399 def test_raw_bytes_io(self): 400 f = self.BytesIO() 401 self.write_ops(f) 402 data = f.getvalue() 403 self.assertEqual(data, b"hello world\n") 404 f = self.BytesIO(data) 405 self.read_ops(f, True) 406 407 def test_large_file_ops(self): 408 # On Windows and Mac OSX this test comsumes large resources; It takes 409 # a long time to build the >2GB file and takes >2GB of disk space 410 # therefore the resource must be enabled to run this test. 411 if sys.platform[:3] == 'win' or sys.platform == 'darwin': 412 support.requires( 413 'largefile', 414 'test requires %s bytes and a long time to run' % self.LARGE) 415 with self.open(support.TESTFN, "w+b", 0) as f: 416 self.large_file_ops(f) 417 with self.open(support.TESTFN, "w+b") as f: 418 self.large_file_ops(f) 419 420 def test_with_open(self): 421 for bufsize in (0, 1, 100): 422 f = None 423 with self.open(support.TESTFN, "wb", bufsize) as f: 424 f.write(b"xxx") 425 self.assertEqual(f.closed, True) 426 f = None 427 try: 428 with self.open(support.TESTFN, "wb", bufsize) as f: 429 1 // 0 430 except ZeroDivisionError: 431 self.assertEqual(f.closed, True) 432 else: 433 self.fail("1 // 0 didn't raise an exception") 434 435 # issue 5008 436 def test_append_mode_tell(self): 437 with self.open(support.TESTFN, "wb") as f: 438 f.write(b"xxx") 439 with self.open(support.TESTFN, "ab", buffering=0) as f: 440 self.assertEqual(f.tell(), 3) 441 with self.open(support.TESTFN, "ab") as f: 442 self.assertEqual(f.tell(), 3) 443 with self.open(support.TESTFN, "a") as f: 444 self.assertGreater(f.tell(), 0) 445 446 def test_destructor(self): 447 record = [] 448 class MyFileIO(self.FileIO): 449 def __del__(self): 450 record.append(1) 451 try: 452 f = super(MyFileIO, self).__del__ 453 except AttributeError: 454 pass 455 else: 456 f() 457 def close(self): 458 record.append(2) 459 super(MyFileIO, self).close() 460 def flush(self): 461 record.append(3) 462 super(MyFileIO, self).flush() 463 f = MyFileIO(support.TESTFN, "wb") 464 f.write(b"xxx") 465 del f 466 support.gc_collect() 467 self.assertEqual(record, [1, 2, 3]) 468 with self.open(support.TESTFN, "rb") as f: 469 self.assertEqual(f.read(), b"xxx") 470 471 def _check_base_destructor(self, base): 472 record = [] 473 class MyIO(base): 474 def __init__(self): 475 # This exercises the availability of attributes on object 476 # destruction. 477 # (in the C version, close() is called by the tp_dealloc 478 # function, not by __del__) 479 self.on_del = 1 480 self.on_close = 2 481 self.on_flush = 3 482 def __del__(self): 483 record.append(self.on_del) 484 try: 485 f = super(MyIO, self).__del__ 486 except AttributeError: 487 pass 488 else: 489 f() 490 def close(self): 491 record.append(self.on_close) 492 super(MyIO, self).close() 493 def flush(self): 494 record.append(self.on_flush) 495 super(MyIO, self).flush() 496 f = MyIO() 497 del f 498 support.gc_collect() 499 self.assertEqual(record, [1, 2, 3]) 500 501 def test_IOBase_destructor(self): 502 self._check_base_destructor(self.IOBase) 503 504 def test_RawIOBase_destructor(self): 505 self._check_base_destructor(self.RawIOBase) 506 507 def test_BufferedIOBase_destructor(self): 508 self._check_base_destructor(self.BufferedIOBase) 509 510 def test_TextIOBase_destructor(self): 511 self._check_base_destructor(self.TextIOBase) 512 513 def test_close_flushes(self): 514 with self.open(support.TESTFN, "wb") as f: 515 f.write(b"xxx") 516 with self.open(support.TESTFN, "rb") as f: 517 self.assertEqual(f.read(), b"xxx") 518 519 def test_array_writes(self): 520 a = array.array(b'i', range(10)) 521 n = len(a.tostring()) 522 with self.open(support.TESTFN, "wb", 0) as f: 523 self.assertEqual(f.write(a), n) 524 with self.open(support.TESTFN, "wb") as f: 525 self.assertEqual(f.write(a), n) 526 527 def test_closefd(self): 528 self.assertRaises(ValueError, self.open, support.TESTFN, 'w', 529 closefd=False) 530 531 def test_read_closed(self): 532 with self.open(support.TESTFN, "w") as f: 533 f.write("egg\n") 534 with self.open(support.TESTFN, "r") as f: 535 file = self.open(f.fileno(), "r", closefd=False) 536 self.assertEqual(file.read(), "egg\n") 537 file.seek(0) 538 file.close() 539 self.assertRaises(ValueError, file.read) 540 541 def test_no_closefd_with_filename(self): 542 # can't use closefd in combination with a file name 543 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False) 544 545 def test_closefd_attr(self): 546 with self.open(support.TESTFN, "wb") as f: 547 f.write(b"egg\n") 548 with self.open(support.TESTFN, "r") as f: 549 self.assertEqual(f.buffer.raw.closefd, True) 550 file = self.open(f.fileno(), "r", closefd=False) 551 self.assertEqual(file.buffer.raw.closefd, False) 552 553 def test_garbage_collection(self): 554 # FileIO objects are collected, and collecting them flushes 555 # all data to disk. 556 f = self.FileIO(support.TESTFN, "wb") 557 f.write(b"abcxxx") 558 f.f = f 559 wr = weakref.ref(f) 560 del f 561 support.gc_collect() 562 self.assertIsNone(wr(), wr) 563 with self.open(support.TESTFN, "rb") as f: 564 self.assertEqual(f.read(), b"abcxxx") 565 566 def test_unbounded_file(self): 567 # Issue #1174606: reading from an unbounded stream such as /dev/zero. 568 zero = "/dev/zero" 569 if not os.path.exists(zero): 570 self.skipTest("{0} does not exist".format(zero)) 571 if sys.maxsize > 0x7FFFFFFF: 572 self.skipTest("test can only run in a 32-bit address space") 573 if support.real_max_memuse < support._2G: 574 self.skipTest("test requires at least 2GB of memory") 575 with self.open(zero, "rb", buffering=0) as f: 576 self.assertRaises(OverflowError, f.read) 577 with self.open(zero, "rb") as f: 578 self.assertRaises(OverflowError, f.read) 579 with self.open(zero, "r") as f: 580 self.assertRaises(OverflowError, f.read) 581 582 def check_flush_error_on_close(self, *args, **kwargs): 583 # Test that the file is closed despite failed flush 584 # and that flush() is called before file closed. 585 f = self.open(*args, **kwargs) 586 closed = [] 587 def bad_flush(): 588 closed[:] = [f.closed] 589 raise IOError() 590 f.flush = bad_flush 591 self.assertRaises(IOError, f.close) # exception not swallowed 592 self.assertTrue(f.closed) 593 self.assertTrue(closed) # flush() called 594 self.assertFalse(closed[0]) # flush() called before file closed 595 f.flush = lambda: None # break reference loop 596 597 def test_flush_error_on_close(self): 598 # raw file 599 # Issue #5700: io.FileIO calls flush() after file closed 600 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0) 601 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 602 self.check_flush_error_on_close(fd, 'wb', buffering=0) 603 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 604 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False) 605 os.close(fd) 606 # buffered io 607 self.check_flush_error_on_close(support.TESTFN, 'wb') 608 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 609 self.check_flush_error_on_close(fd, 'wb') 610 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 611 self.check_flush_error_on_close(fd, 'wb', closefd=False) 612 os.close(fd) 613 # text io 614 self.check_flush_error_on_close(support.TESTFN, 'w') 615 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 616 self.check_flush_error_on_close(fd, 'w') 617 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 618 self.check_flush_error_on_close(fd, 'w', closefd=False) 619 os.close(fd) 620 621 def test_multi_close(self): 622 f = self.open(support.TESTFN, "wb", buffering=0) 623 f.close() 624 f.close() 625 f.close() 626 self.assertRaises(ValueError, f.flush) 627 628 def test_RawIOBase_read(self): 629 # Exercise the default RawIOBase.read() implementation (which calls 630 # readinto() internally). 631 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None)) 632 self.assertEqual(rawio.read(2), b"ab") 633 self.assertEqual(rawio.read(2), b"c") 634 self.assertEqual(rawio.read(2), b"d") 635 self.assertEqual(rawio.read(2), None) 636 self.assertEqual(rawio.read(2), b"ef") 637 self.assertEqual(rawio.read(2), b"g") 638 self.assertEqual(rawio.read(2), None) 639 self.assertEqual(rawio.read(2), b"") 640 641 def test_fileio_closefd(self): 642 # Issue #4841 643 with self.open(__file__, 'rb') as f1, \ 644 self.open(__file__, 'rb') as f2: 645 fileio = self.FileIO(f1.fileno(), closefd=False) 646 # .__init__() must not close f1 647 fileio.__init__(f2.fileno(), closefd=False) 648 f1.readline() 649 # .close() must not close f2 650 fileio.close() 651 f2.readline() 652 653 def test_nonbuffered_textio(self): 654 with warnings.catch_warnings(record=True) as recorded: 655 with self.assertRaises(ValueError): 656 self.open(support.TESTFN, 'w', buffering=0) 657 support.gc_collect() 658 self.assertEqual(recorded, []) 659 660 def test_invalid_newline(self): 661 with warnings.catch_warnings(record=True) as recorded: 662 with self.assertRaises(ValueError): 663 self.open(support.TESTFN, 'w', newline='invalid') 664 support.gc_collect() 665 self.assertEqual(recorded, []) 666 667 def test_buffered_readinto_mixin(self): 668 # Test the implementation provided by BufferedIOBase 669 class Stream(self.BufferedIOBase): 670 def read(self, size): 671 return b"12345" 672 stream = Stream() 673 buffer = byteslike(5) 674 self.assertEqual(stream.readinto(buffer), 5) 675 self.assertEqual(buffer.tobytes(), b"12345") 676 677 678class CIOTest(IOTest): 679 680 def test_IOBase_finalize(self): 681 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a 682 # class which inherits IOBase and an object of this class are caught 683 # in a reference cycle and close() is already in the method cache. 684 class MyIO(self.IOBase): 685 def close(self): 686 pass 687 688 # create an instance to populate the method cache 689 MyIO() 690 obj = MyIO() 691 obj.obj = obj 692 wr = weakref.ref(obj) 693 del MyIO 694 del obj 695 support.gc_collect() 696 self.assertIsNone(wr(), wr) 697 698class PyIOTest(IOTest): 699 test_array_writes = unittest.skip( 700 "len(array.array) returns number of elements rather than bytelength" 701 )(IOTest.test_array_writes) 702 703 704class CommonBufferedTests: 705 # Tests common to BufferedReader, BufferedWriter and BufferedRandom 706 707 def test_detach(self): 708 raw = self.MockRawIO() 709 buf = self.tp(raw) 710 self.assertIs(buf.detach(), raw) 711 self.assertRaises(ValueError, buf.detach) 712 713 repr(buf) # Should still work 714 715 def test_fileno(self): 716 rawio = self.MockRawIO() 717 bufio = self.tp(rawio) 718 719 self.assertEqual(42, bufio.fileno()) 720 721 def test_invalid_args(self): 722 rawio = self.MockRawIO() 723 bufio = self.tp(rawio) 724 # Invalid whence 725 self.assertRaises(ValueError, bufio.seek, 0, -1) 726 self.assertRaises(ValueError, bufio.seek, 0, 3) 727 728 def test_override_destructor(self): 729 tp = self.tp 730 record = [] 731 class MyBufferedIO(tp): 732 def __del__(self): 733 record.append(1) 734 try: 735 f = super(MyBufferedIO, self).__del__ 736 except AttributeError: 737 pass 738 else: 739 f() 740 def close(self): 741 record.append(2) 742 super(MyBufferedIO, self).close() 743 def flush(self): 744 record.append(3) 745 super(MyBufferedIO, self).flush() 746 rawio = self.MockRawIO() 747 bufio = MyBufferedIO(rawio) 748 writable = bufio.writable() 749 del bufio 750 support.gc_collect() 751 if writable: 752 self.assertEqual(record, [1, 2, 3]) 753 else: 754 self.assertEqual(record, [1, 2]) 755 756 def test_context_manager(self): 757 # Test usability as a context manager 758 rawio = self.MockRawIO() 759 bufio = self.tp(rawio) 760 def _with(): 761 with bufio: 762 pass 763 _with() 764 # bufio should now be closed, and using it a second time should raise 765 # a ValueError. 766 self.assertRaises(ValueError, _with) 767 768 def test_error_through_destructor(self): 769 # Test that the exception state is not modified by a destructor, 770 # even if close() fails. 771 rawio = self.CloseFailureIO() 772 def f(): 773 self.tp(rawio).xyzzy 774 with support.captured_output("stderr") as s: 775 self.assertRaises(AttributeError, f) 776 s = s.getvalue().strip() 777 if s: 778 # The destructor *may* have printed an unraisable error, check it 779 self.assertEqual(len(s.splitlines()), 1) 780 self.assertTrue(s.startswith("Exception IOError: "), s) 781 self.assertTrue(s.endswith(" ignored"), s) 782 783 def test_repr(self): 784 raw = self.MockRawIO() 785 b = self.tp(raw) 786 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__) 787 self.assertEqual(repr(b), "<%s>" % clsname) 788 raw.name = "dummy" 789 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname) 790 raw.name = b"dummy" 791 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname) 792 793 def test_flush_error_on_close(self): 794 # Test that buffered file is closed despite failed flush 795 # and that flush() is called before file closed. 796 raw = self.MockRawIO() 797 closed = [] 798 def bad_flush(): 799 closed[:] = [b.closed, raw.closed] 800 raise IOError() 801 raw.flush = bad_flush 802 b = self.tp(raw) 803 self.assertRaises(IOError, b.close) # exception not swallowed 804 self.assertTrue(b.closed) 805 self.assertTrue(raw.closed) 806 self.assertTrue(closed) # flush() called 807 self.assertFalse(closed[0]) # flush() called before file closed 808 self.assertFalse(closed[1]) 809 raw.flush = lambda: None # break reference loop 810 811 def test_close_error_on_close(self): 812 raw = self.MockRawIO() 813 def bad_flush(): 814 raise IOError('flush') 815 def bad_close(): 816 raise IOError('close') 817 raw.close = bad_close 818 b = self.tp(raw) 819 b.flush = bad_flush 820 with self.assertRaises(IOError) as err: # exception not swallowed 821 b.close() 822 self.assertEqual(err.exception.args, ('close',)) 823 self.assertFalse(b.closed) 824 825 def test_multi_close(self): 826 raw = self.MockRawIO() 827 b = self.tp(raw) 828 b.close() 829 b.close() 830 b.close() 831 self.assertRaises(ValueError, b.flush) 832 833 def test_readonly_attributes(self): 834 raw = self.MockRawIO() 835 buf = self.tp(raw) 836 x = self.MockRawIO() 837 with self.assertRaises((AttributeError, TypeError)): 838 buf.raw = x 839 840 841class SizeofTest: 842 843 @support.cpython_only 844 def test_sizeof(self): 845 bufsize1 = 4096 846 bufsize2 = 8192 847 rawio = self.MockRawIO() 848 bufio = self.tp(rawio, buffer_size=bufsize1) 849 size = sys.getsizeof(bufio) - bufsize1 850 rawio = self.MockRawIO() 851 bufio = self.tp(rawio, buffer_size=bufsize2) 852 self.assertEqual(sys.getsizeof(bufio), size + bufsize2) 853 854 855class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): 856 read_mode = "rb" 857 858 def test_constructor(self): 859 rawio = self.MockRawIO([b"abc"]) 860 bufio = self.tp(rawio) 861 bufio.__init__(rawio) 862 bufio.__init__(rawio, buffer_size=1024) 863 bufio.__init__(rawio, buffer_size=16) 864 self.assertEqual(b"abc", bufio.read()) 865 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 866 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 867 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 868 rawio = self.MockRawIO([b"abc"]) 869 bufio.__init__(rawio) 870 self.assertEqual(b"abc", bufio.read()) 871 872 def test_uninitialized(self): 873 bufio = self.tp.__new__(self.tp) 874 del bufio 875 bufio = self.tp.__new__(self.tp) 876 self.assertRaisesRegexp((ValueError, AttributeError), 877 'uninitialized|has no attribute', 878 bufio.read, 0) 879 bufio.__init__(self.MockRawIO()) 880 self.assertEqual(bufio.read(0), b'') 881 882 def test_read(self): 883 for arg in (None, 7): 884 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 885 bufio = self.tp(rawio) 886 self.assertEqual(b"abcdefg", bufio.read(arg)) 887 # Invalid args 888 self.assertRaises(ValueError, bufio.read, -2) 889 890 def test_read1(self): 891 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 892 bufio = self.tp(rawio) 893 self.assertEqual(b"a", bufio.read(1)) 894 self.assertEqual(b"b", bufio.read1(1)) 895 self.assertEqual(rawio._reads, 1) 896 self.assertEqual(b"c", bufio.read1(100)) 897 self.assertEqual(rawio._reads, 1) 898 self.assertEqual(b"d", bufio.read1(100)) 899 self.assertEqual(rawio._reads, 2) 900 self.assertEqual(b"efg", bufio.read1(100)) 901 self.assertEqual(rawio._reads, 3) 902 self.assertEqual(b"", bufio.read1(100)) 903 self.assertEqual(rawio._reads, 4) 904 # Invalid args 905 self.assertRaises(ValueError, bufio.read1, -1) 906 907 def test_readinto(self): 908 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 909 bufio = self.tp(rawio) 910 b = bytearray(2) 911 self.assertEqual(bufio.readinto(b), 2) 912 self.assertEqual(b, b"ab") 913 self.assertEqual(bufio.readinto(b), 2) 914 self.assertEqual(b, b"cd") 915 self.assertEqual(bufio.readinto(b), 2) 916 self.assertEqual(b, b"ef") 917 self.assertEqual(bufio.readinto(b), 1) 918 self.assertEqual(b, b"gf") 919 self.assertEqual(bufio.readinto(b), 0) 920 self.assertEqual(b, b"gf") 921 922 def test_readlines(self): 923 def bufio(): 924 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef")) 925 return self.tp(rawio) 926 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"]) 927 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"]) 928 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"]) 929 930 def test_buffering(self): 931 data = b"abcdefghi" 932 dlen = len(data) 933 934 tests = [ 935 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ], 936 [ 100, [ 3, 3, 3], [ dlen ] ], 937 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ], 938 ] 939 940 for bufsize, buf_read_sizes, raw_read_sizes in tests: 941 rawio = self.MockFileIO(data) 942 bufio = self.tp(rawio, buffer_size=bufsize) 943 pos = 0 944 for nbytes in buf_read_sizes: 945 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes]) 946 pos += nbytes 947 # this is mildly implementation-dependent 948 self.assertEqual(rawio.read_history, raw_read_sizes) 949 950 def test_read_non_blocking(self): 951 # Inject some None's in there to simulate EWOULDBLOCK 952 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None)) 953 bufio = self.tp(rawio) 954 self.assertEqual(b"abcd", bufio.read(6)) 955 self.assertEqual(b"e", bufio.read(1)) 956 self.assertEqual(b"fg", bufio.read()) 957 self.assertEqual(b"", bufio.peek(1)) 958 self.assertIsNone(bufio.read()) 959 self.assertEqual(b"", bufio.read()) 960 961 rawio = self.MockRawIO((b"a", None, None)) 962 self.assertEqual(b"a", rawio.readall()) 963 self.assertIsNone(rawio.readall()) 964 965 def test_read_past_eof(self): 966 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 967 bufio = self.tp(rawio) 968 969 self.assertEqual(b"abcdefg", bufio.read(9000)) 970 971 def test_read_all(self): 972 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 973 bufio = self.tp(rawio) 974 975 self.assertEqual(b"abcdefg", bufio.read()) 976 977 @unittest.skipUnless(threading, 'Threading required for this test.') 978 @support.requires_resource('cpu') 979 def test_threads(self): 980 try: 981 # Write out many bytes with exactly the same number of 0's, 982 # 1's... 255's. This will help us check that concurrent reading 983 # doesn't duplicate or forget contents. 984 N = 1000 985 l = list(range(256)) * N 986 random.shuffle(l) 987 s = bytes(bytearray(l)) 988 with self.open(support.TESTFN, "wb") as f: 989 f.write(s) 990 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw: 991 bufio = self.tp(raw, 8) 992 errors = [] 993 results = [] 994 def f(): 995 try: 996 # Intra-buffer read then buffer-flushing read 997 for n in cycle([1, 19]): 998 s = bufio.read(n) 999 if not s: 1000 break 1001 # list.append() is atomic 1002 results.append(s) 1003 except Exception as e: 1004 errors.append(e) 1005 raise 1006 threads = [threading.Thread(target=f) for x in range(20)] 1007 with support.start_threads(threads): 1008 time.sleep(0.02) # yield 1009 self.assertFalse(errors, 1010 "the following exceptions were caught: %r" % errors) 1011 s = b''.join(results) 1012 for i in range(256): 1013 c = bytes(bytearray([i])) 1014 self.assertEqual(s.count(c), N) 1015 finally: 1016 support.unlink(support.TESTFN) 1017 1018 def test_misbehaved_io(self): 1019 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 1020 bufio = self.tp(rawio) 1021 self.assertRaises(IOError, bufio.seek, 0) 1022 self.assertRaises(IOError, bufio.tell) 1023 1024 def test_no_extraneous_read(self): 1025 # Issue #9550; when the raw IO object has satisfied the read request, 1026 # we should not issue any additional reads, otherwise it may block 1027 # (e.g. socket). 1028 bufsize = 16 1029 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2): 1030 rawio = self.MockRawIO([b"x" * n]) 1031 bufio = self.tp(rawio, bufsize) 1032 self.assertEqual(bufio.read(n), b"x" * n) 1033 # Simple case: one raw read is enough to satisfy the request. 1034 self.assertEqual(rawio._extraneous_reads, 0, 1035 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 1036 # A more complex case where two raw reads are needed to satisfy 1037 # the request. 1038 rawio = self.MockRawIO([b"x" * (n - 1), b"x"]) 1039 bufio = self.tp(rawio, bufsize) 1040 self.assertEqual(bufio.read(n), b"x" * n) 1041 self.assertEqual(rawio._extraneous_reads, 0, 1042 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 1043 1044 1045class CBufferedReaderTest(BufferedReaderTest, SizeofTest): 1046 tp = io.BufferedReader 1047 1048 def test_constructor(self): 1049 BufferedReaderTest.test_constructor(self) 1050 # The allocation can succeed on 32-bit builds, e.g. with more 1051 # than 2GB RAM and a 64-bit kernel. 1052 if sys.maxsize > 0x7FFFFFFF: 1053 rawio = self.MockRawIO() 1054 bufio = self.tp(rawio) 1055 self.assertRaises((OverflowError, MemoryError, ValueError), 1056 bufio.__init__, rawio, sys.maxsize) 1057 1058 def test_initialization(self): 1059 rawio = self.MockRawIO([b"abc"]) 1060 bufio = self.tp(rawio) 1061 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1062 self.assertRaises(ValueError, bufio.read) 1063 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1064 self.assertRaises(ValueError, bufio.read) 1065 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1066 self.assertRaises(ValueError, bufio.read) 1067 1068 def test_misbehaved_io_read(self): 1069 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 1070 bufio = self.tp(rawio) 1071 # _pyio.BufferedReader seems to implement reading different, so that 1072 # checking this is not so easy. 1073 self.assertRaises(IOError, bufio.read, 10) 1074 1075 def test_garbage_collection(self): 1076 # C BufferedReader objects are collected. 1077 # The Python version has __del__, so it ends into gc.garbage instead 1078 rawio = self.FileIO(support.TESTFN, "w+b") 1079 f = self.tp(rawio) 1080 f.f = f 1081 wr = weakref.ref(f) 1082 del f 1083 support.gc_collect() 1084 self.assertIsNone(wr(), wr) 1085 1086 def test_args_error(self): 1087 # Issue #17275 1088 with self.assertRaisesRegexp(TypeError, "BufferedReader"): 1089 self.tp(io.BytesIO(), 1024, 1024, 1024) 1090 1091 1092class PyBufferedReaderTest(BufferedReaderTest): 1093 tp = pyio.BufferedReader 1094 1095 1096class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): 1097 write_mode = "wb" 1098 1099 def test_constructor(self): 1100 rawio = self.MockRawIO() 1101 bufio = self.tp(rawio) 1102 bufio.__init__(rawio) 1103 bufio.__init__(rawio, buffer_size=1024) 1104 bufio.__init__(rawio, buffer_size=16) 1105 self.assertEqual(3, bufio.write(b"abc")) 1106 bufio.flush() 1107 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1108 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1109 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1110 bufio.__init__(rawio) 1111 self.assertEqual(3, bufio.write(b"ghi")) 1112 bufio.flush() 1113 self.assertEqual(b"".join(rawio._write_stack), b"abcghi") 1114 1115 def test_uninitialized(self): 1116 bufio = self.tp.__new__(self.tp) 1117 del bufio 1118 bufio = self.tp.__new__(self.tp) 1119 self.assertRaisesRegexp((ValueError, AttributeError), 1120 'uninitialized|has no attribute', 1121 bufio.write, b'') 1122 bufio.__init__(self.MockRawIO()) 1123 self.assertEqual(bufio.write(b''), 0) 1124 1125 def test_detach_flush(self): 1126 raw = self.MockRawIO() 1127 buf = self.tp(raw) 1128 buf.write(b"howdy!") 1129 self.assertFalse(raw._write_stack) 1130 buf.detach() 1131 self.assertEqual(raw._write_stack, [b"howdy!"]) 1132 1133 def test_write(self): 1134 # Write to the buffered IO but don't overflow the buffer. 1135 writer = self.MockRawIO() 1136 bufio = self.tp(writer, 8) 1137 bufio.write(b"abc") 1138 self.assertFalse(writer._write_stack) 1139 buffer = bytearray(b"def") 1140 bufio.write(buffer) 1141 buffer[:] = b"***" # Overwrite our copy of the data 1142 bufio.flush() 1143 self.assertEqual(b"".join(writer._write_stack), b"abcdef") 1144 1145 def test_write_overflow(self): 1146 writer = self.MockRawIO() 1147 bufio = self.tp(writer, 8) 1148 contents = b"abcdefghijklmnop" 1149 for n in range(0, len(contents), 3): 1150 bufio.write(contents[n:n+3]) 1151 flushed = b"".join(writer._write_stack) 1152 # At least (total - 8) bytes were implicitly flushed, perhaps more 1153 # depending on the implementation. 1154 self.assertTrue(flushed.startswith(contents[:-8]), flushed) 1155 1156 def check_writes(self, intermediate_func): 1157 # Lots of writes, test the flushed output is as expected. 1158 contents = bytes(range(256)) * 1000 1159 n = 0 1160 writer = self.MockRawIO() 1161 bufio = self.tp(writer, 13) 1162 # Generator of write sizes: repeat each N 15 times then proceed to N+1 1163 def gen_sizes(): 1164 for size in count(1): 1165 for i in range(15): 1166 yield size 1167 sizes = gen_sizes() 1168 while n < len(contents): 1169 size = min(next(sizes), len(contents) - n) 1170 self.assertEqual(bufio.write(contents[n:n+size]), size) 1171 intermediate_func(bufio) 1172 n += size 1173 bufio.flush() 1174 self.assertEqual(contents, 1175 b"".join(writer._write_stack)) 1176 1177 def test_writes(self): 1178 self.check_writes(lambda bufio: None) 1179 1180 def test_writes_and_flushes(self): 1181 self.check_writes(lambda bufio: bufio.flush()) 1182 1183 def test_writes_and_seeks(self): 1184 def _seekabs(bufio): 1185 pos = bufio.tell() 1186 bufio.seek(pos + 1, 0) 1187 bufio.seek(pos - 1, 0) 1188 bufio.seek(pos, 0) 1189 self.check_writes(_seekabs) 1190 def _seekrel(bufio): 1191 pos = bufio.seek(0, 1) 1192 bufio.seek(+1, 1) 1193 bufio.seek(-1, 1) 1194 bufio.seek(pos, 0) 1195 self.check_writes(_seekrel) 1196 1197 def test_writes_and_truncates(self): 1198 self.check_writes(lambda bufio: bufio.truncate(bufio.tell())) 1199 1200 def test_write_non_blocking(self): 1201 raw = self.MockNonBlockWriterIO() 1202 bufio = self.tp(raw, 8) 1203 1204 self.assertEqual(bufio.write(b"abcd"), 4) 1205 self.assertEqual(bufio.write(b"efghi"), 5) 1206 # 1 byte will be written, the rest will be buffered 1207 raw.block_on(b"k") 1208 self.assertEqual(bufio.write(b"jklmn"), 5) 1209 1210 # 8 bytes will be written, 8 will be buffered and the rest will be lost 1211 raw.block_on(b"0") 1212 try: 1213 bufio.write(b"opqrwxyz0123456789") 1214 except self.BlockingIOError as e: 1215 written = e.characters_written 1216 else: 1217 self.fail("BlockingIOError should have been raised") 1218 self.assertEqual(written, 16) 1219 self.assertEqual(raw.pop_written(), 1220 b"abcdefghijklmnopqrwxyz") 1221 1222 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9) 1223 s = raw.pop_written() 1224 # Previously buffered bytes were flushed 1225 self.assertTrue(s.startswith(b"01234567A"), s) 1226 1227 def test_write_and_rewind(self): 1228 raw = io.BytesIO() 1229 bufio = self.tp(raw, 4) 1230 self.assertEqual(bufio.write(b"abcdef"), 6) 1231 self.assertEqual(bufio.tell(), 6) 1232 bufio.seek(0, 0) 1233 self.assertEqual(bufio.write(b"XY"), 2) 1234 bufio.seek(6, 0) 1235 self.assertEqual(raw.getvalue(), b"XYcdef") 1236 self.assertEqual(bufio.write(b"123456"), 6) 1237 bufio.flush() 1238 self.assertEqual(raw.getvalue(), b"XYcdef123456") 1239 1240 def test_flush(self): 1241 writer = self.MockRawIO() 1242 bufio = self.tp(writer, 8) 1243 bufio.write(b"abc") 1244 bufio.flush() 1245 self.assertEqual(b"abc", writer._write_stack[0]) 1246 1247 def test_writelines(self): 1248 l = [b'ab', b'cd', b'ef'] 1249 writer = self.MockRawIO() 1250 bufio = self.tp(writer, 8) 1251 bufio.writelines(l) 1252 bufio.flush() 1253 self.assertEqual(b''.join(writer._write_stack), b'abcdef') 1254 1255 def test_writelines_userlist(self): 1256 l = UserList([b'ab', b'cd', b'ef']) 1257 writer = self.MockRawIO() 1258 bufio = self.tp(writer, 8) 1259 bufio.writelines(l) 1260 bufio.flush() 1261 self.assertEqual(b''.join(writer._write_stack), b'abcdef') 1262 1263 def test_writelines_error(self): 1264 writer = self.MockRawIO() 1265 bufio = self.tp(writer, 8) 1266 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3]) 1267 self.assertRaises(TypeError, bufio.writelines, None) 1268 1269 def test_destructor(self): 1270 writer = self.MockRawIO() 1271 bufio = self.tp(writer, 8) 1272 bufio.write(b"abc") 1273 del bufio 1274 support.gc_collect() 1275 self.assertEqual(b"abc", writer._write_stack[0]) 1276 1277 def test_truncate(self): 1278 # Truncate implicitly flushes the buffer. 1279 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw: 1280 bufio = self.tp(raw, 8) 1281 bufio.write(b"abcdef") 1282 self.assertEqual(bufio.truncate(3), 3) 1283 self.assertEqual(bufio.tell(), 6) 1284 with self.open(support.TESTFN, "rb", buffering=0) as f: 1285 self.assertEqual(f.read(), b"abc") 1286 1287 @unittest.skipUnless(threading, 'Threading required for this test.') 1288 @support.requires_resource('cpu') 1289 def test_threads(self): 1290 try: 1291 # Write out many bytes from many threads and test they were 1292 # all flushed. 1293 N = 1000 1294 contents = bytes(range(256)) * N 1295 sizes = cycle([1, 19]) 1296 n = 0 1297 queue = deque() 1298 while n < len(contents): 1299 size = next(sizes) 1300 queue.append(contents[n:n+size]) 1301 n += size 1302 del contents 1303 # We use a real file object because it allows us to 1304 # exercise situations where the GIL is released before 1305 # writing the buffer to the raw streams. This is in addition 1306 # to concurrency issues due to switching threads in the middle 1307 # of Python code. 1308 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw: 1309 bufio = self.tp(raw, 8) 1310 errors = [] 1311 def f(): 1312 try: 1313 while True: 1314 try: 1315 s = queue.popleft() 1316 except IndexError: 1317 return 1318 bufio.write(s) 1319 except Exception as e: 1320 errors.append(e) 1321 raise 1322 threads = [threading.Thread(target=f) for x in range(20)] 1323 with support.start_threads(threads): 1324 time.sleep(0.02) # yield 1325 self.assertFalse(errors, 1326 "the following exceptions were caught: %r" % errors) 1327 bufio.close() 1328 with self.open(support.TESTFN, "rb") as f: 1329 s = f.read() 1330 for i in range(256): 1331 self.assertEqual(s.count(bytes([i])), N) 1332 finally: 1333 support.unlink(support.TESTFN) 1334 1335 def test_misbehaved_io(self): 1336 rawio = self.MisbehavedRawIO() 1337 bufio = self.tp(rawio, 5) 1338 self.assertRaises(IOError, bufio.seek, 0) 1339 self.assertRaises(IOError, bufio.tell) 1340 self.assertRaises(IOError, bufio.write, b"abcdef") 1341 1342 def test_max_buffer_size_deprecation(self): 1343 with support.check_warnings(("max_buffer_size is deprecated", 1344 DeprecationWarning)): 1345 self.tp(self.MockRawIO(), 8, 12) 1346 1347 def test_write_error_on_close(self): 1348 raw = self.MockRawIO() 1349 def bad_write(b): 1350 raise IOError() 1351 raw.write = bad_write 1352 b = self.tp(raw) 1353 b.write(b'spam') 1354 self.assertRaises(IOError, b.close) # exception not swallowed 1355 self.assertTrue(b.closed) 1356 1357 1358class CBufferedWriterTest(BufferedWriterTest, SizeofTest): 1359 tp = io.BufferedWriter 1360 1361 def test_constructor(self): 1362 BufferedWriterTest.test_constructor(self) 1363 # The allocation can succeed on 32-bit builds, e.g. with more 1364 # than 2GB RAM and a 64-bit kernel. 1365 if sys.maxsize > 0x7FFFFFFF: 1366 rawio = self.MockRawIO() 1367 bufio = self.tp(rawio) 1368 self.assertRaises((OverflowError, MemoryError, ValueError), 1369 bufio.__init__, rawio, sys.maxsize) 1370 1371 def test_initialization(self): 1372 rawio = self.MockRawIO() 1373 bufio = self.tp(rawio) 1374 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1375 self.assertRaises(ValueError, bufio.write, b"def") 1376 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1377 self.assertRaises(ValueError, bufio.write, b"def") 1378 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1379 self.assertRaises(ValueError, bufio.write, b"def") 1380 1381 def test_garbage_collection(self): 1382 # C BufferedWriter objects are collected, and collecting them flushes 1383 # all data to disk. 1384 # The Python version has __del__, so it ends into gc.garbage instead 1385 rawio = self.FileIO(support.TESTFN, "w+b") 1386 f = self.tp(rawio) 1387 f.write(b"123xxx") 1388 f.x = f 1389 wr = weakref.ref(f) 1390 del f 1391 support.gc_collect() 1392 self.assertIsNone(wr(), wr) 1393 with self.open(support.TESTFN, "rb") as f: 1394 self.assertEqual(f.read(), b"123xxx") 1395 1396 def test_args_error(self): 1397 # Issue #17275 1398 with self.assertRaisesRegexp(TypeError, "BufferedWriter"): 1399 self.tp(io.BytesIO(), 1024, 1024, 1024) 1400 1401 1402class PyBufferedWriterTest(BufferedWriterTest): 1403 tp = pyio.BufferedWriter 1404 1405class BufferedRWPairTest(unittest.TestCase): 1406 1407 def test_constructor(self): 1408 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1409 self.assertFalse(pair.closed) 1410 1411 def test_uninitialized(self): 1412 pair = self.tp.__new__(self.tp) 1413 del pair 1414 pair = self.tp.__new__(self.tp) 1415 self.assertRaisesRegexp((ValueError, AttributeError), 1416 'uninitialized|has no attribute', 1417 pair.read, 0) 1418 self.assertRaisesRegexp((ValueError, AttributeError), 1419 'uninitialized|has no attribute', 1420 pair.write, b'') 1421 pair.__init__(self.MockRawIO(), self.MockRawIO()) 1422 self.assertEqual(pair.read(0), b'') 1423 self.assertEqual(pair.write(b''), 0) 1424 1425 def test_detach(self): 1426 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1427 self.assertRaises(self.UnsupportedOperation, pair.detach) 1428 1429 def test_constructor_max_buffer_size_deprecation(self): 1430 with support.check_warnings(("max_buffer_size is deprecated", 1431 DeprecationWarning)): 1432 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12) 1433 1434 def test_constructor_with_not_readable(self): 1435 class NotReadable(MockRawIO): 1436 def readable(self): 1437 return False 1438 1439 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO()) 1440 1441 def test_constructor_with_not_writeable(self): 1442 class NotWriteable(MockRawIO): 1443 def writable(self): 1444 return False 1445 1446 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable()) 1447 1448 def test_read(self): 1449 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1450 1451 self.assertEqual(pair.read(3), b"abc") 1452 self.assertEqual(pair.read(1), b"d") 1453 self.assertEqual(pair.read(), b"ef") 1454 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO()) 1455 self.assertEqual(pair.read(None), b"abc") 1456 1457 def test_readlines(self): 1458 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO()) 1459 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 1460 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 1461 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"]) 1462 1463 def test_read1(self): 1464 # .read1() is delegated to the underlying reader object, so this test 1465 # can be shallow. 1466 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1467 1468 self.assertEqual(pair.read1(3), b"abc") 1469 1470 def test_readinto(self): 1471 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1472 1473 data = byteslike(5) 1474 self.assertEqual(pair.readinto(data), 5) 1475 self.assertEqual(data.tobytes(), b"abcde") 1476 1477 def test_write(self): 1478 w = self.MockRawIO() 1479 pair = self.tp(self.MockRawIO(), w) 1480 1481 pair.write(b"abc") 1482 pair.flush() 1483 buffer = bytearray(b"def") 1484 pair.write(buffer) 1485 buffer[:] = b"***" # Overwrite our copy of the data 1486 pair.flush() 1487 self.assertEqual(w._write_stack, [b"abc", b"def"]) 1488 1489 def test_peek(self): 1490 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1491 1492 self.assertTrue(pair.peek(3).startswith(b"abc")) 1493 self.assertEqual(pair.read(3), b"abc") 1494 1495 def test_readable(self): 1496 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1497 self.assertTrue(pair.readable()) 1498 1499 def test_writeable(self): 1500 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1501 self.assertTrue(pair.writable()) 1502 1503 def test_seekable(self): 1504 # BufferedRWPairs are never seekable, even if their readers and writers 1505 # are. 1506 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1507 self.assertFalse(pair.seekable()) 1508 1509 # .flush() is delegated to the underlying writer object and has been 1510 # tested in the test_write method. 1511 1512 def test_close_and_closed(self): 1513 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1514 self.assertFalse(pair.closed) 1515 pair.close() 1516 self.assertTrue(pair.closed) 1517 1518 def test_reader_close_error_on_close(self): 1519 def reader_close(): 1520 reader_non_existing 1521 reader = self.MockRawIO() 1522 reader.close = reader_close 1523 writer = self.MockRawIO() 1524 pair = self.tp(reader, writer) 1525 with self.assertRaises(NameError) as err: 1526 pair.close() 1527 self.assertIn('reader_non_existing', str(err.exception)) 1528 self.assertTrue(pair.closed) 1529 self.assertFalse(reader.closed) 1530 self.assertTrue(writer.closed) 1531 1532 def test_writer_close_error_on_close(self): 1533 def writer_close(): 1534 writer_non_existing 1535 reader = self.MockRawIO() 1536 writer = self.MockRawIO() 1537 writer.close = writer_close 1538 pair = self.tp(reader, writer) 1539 with self.assertRaises(NameError) as err: 1540 pair.close() 1541 self.assertIn('writer_non_existing', str(err.exception)) 1542 self.assertFalse(pair.closed) 1543 self.assertTrue(reader.closed) 1544 self.assertFalse(writer.closed) 1545 1546 def test_reader_writer_close_error_on_close(self): 1547 def reader_close(): 1548 reader_non_existing 1549 def writer_close(): 1550 writer_non_existing 1551 reader = self.MockRawIO() 1552 reader.close = reader_close 1553 writer = self.MockRawIO() 1554 writer.close = writer_close 1555 pair = self.tp(reader, writer) 1556 with self.assertRaises(NameError) as err: 1557 pair.close() 1558 self.assertIn('reader_non_existing', str(err.exception)) 1559 self.assertFalse(pair.closed) 1560 self.assertFalse(reader.closed) 1561 self.assertFalse(writer.closed) 1562 1563 def test_isatty(self): 1564 class SelectableIsAtty(MockRawIO): 1565 def __init__(self, isatty): 1566 MockRawIO.__init__(self) 1567 self._isatty = isatty 1568 1569 def isatty(self): 1570 return self._isatty 1571 1572 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False)) 1573 self.assertFalse(pair.isatty()) 1574 1575 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False)) 1576 self.assertTrue(pair.isatty()) 1577 1578 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True)) 1579 self.assertTrue(pair.isatty()) 1580 1581 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True)) 1582 self.assertTrue(pair.isatty()) 1583 1584 def test_weakref_clearing(self): 1585 brw = self.tp(self.MockRawIO(), self.MockRawIO()) 1586 ref = weakref.ref(brw) 1587 brw = None 1588 ref = None # Shouldn't segfault. 1589 1590class CBufferedRWPairTest(BufferedRWPairTest): 1591 tp = io.BufferedRWPair 1592 1593class PyBufferedRWPairTest(BufferedRWPairTest): 1594 tp = pyio.BufferedRWPair 1595 1596 1597class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest): 1598 read_mode = "rb+" 1599 write_mode = "wb+" 1600 1601 def test_constructor(self): 1602 BufferedReaderTest.test_constructor(self) 1603 BufferedWriterTest.test_constructor(self) 1604 1605 def test_uninitialized(self): 1606 BufferedReaderTest.test_uninitialized(self) 1607 BufferedWriterTest.test_uninitialized(self) 1608 1609 def test_read_and_write(self): 1610 raw = self.MockRawIO((b"asdf", b"ghjk")) 1611 rw = self.tp(raw, 8) 1612 1613 self.assertEqual(b"as", rw.read(2)) 1614 rw.write(b"ddd") 1615 rw.write(b"eee") 1616 self.assertFalse(raw._write_stack) # Buffer writes 1617 self.assertEqual(b"ghjk", rw.read()) 1618 self.assertEqual(b"dddeee", raw._write_stack[0]) 1619 1620 def test_seek_and_tell(self): 1621 raw = self.BytesIO(b"asdfghjkl") 1622 rw = self.tp(raw) 1623 1624 self.assertEqual(b"as", rw.read(2)) 1625 self.assertEqual(2, rw.tell()) 1626 rw.seek(0, 0) 1627 self.assertEqual(b"asdf", rw.read(4)) 1628 1629 rw.write(b"123f") 1630 rw.seek(0, 0) 1631 self.assertEqual(b"asdf123fl", rw.read()) 1632 self.assertEqual(9, rw.tell()) 1633 rw.seek(-4, 2) 1634 self.assertEqual(5, rw.tell()) 1635 rw.seek(2, 1) 1636 self.assertEqual(7, rw.tell()) 1637 self.assertEqual(b"fl", rw.read(11)) 1638 rw.flush() 1639 self.assertEqual(b"asdf123fl", raw.getvalue()) 1640 1641 self.assertRaises(TypeError, rw.seek, 0.0) 1642 1643 def check_flush_and_read(self, read_func): 1644 raw = self.BytesIO(b"abcdefghi") 1645 bufio = self.tp(raw) 1646 1647 self.assertEqual(b"ab", read_func(bufio, 2)) 1648 bufio.write(b"12") 1649 self.assertEqual(b"ef", read_func(bufio, 2)) 1650 self.assertEqual(6, bufio.tell()) 1651 bufio.flush() 1652 self.assertEqual(6, bufio.tell()) 1653 self.assertEqual(b"ghi", read_func(bufio)) 1654 raw.seek(0, 0) 1655 raw.write(b"XYZ") 1656 # flush() resets the read buffer 1657 bufio.flush() 1658 bufio.seek(0, 0) 1659 self.assertEqual(b"XYZ", read_func(bufio, 3)) 1660 1661 def test_flush_and_read(self): 1662 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args)) 1663 1664 def test_flush_and_readinto(self): 1665 def _readinto(bufio, n=-1): 1666 b = bytearray(n if n >= 0 else 9999) 1667 n = bufio.readinto(b) 1668 return bytes(b[:n]) 1669 self.check_flush_and_read(_readinto) 1670 1671 def test_flush_and_peek(self): 1672 def _peek(bufio, n=-1): 1673 # This relies on the fact that the buffer can contain the whole 1674 # raw stream, otherwise peek() can return less. 1675 b = bufio.peek(n) 1676 if n != -1: 1677 b = b[:n] 1678 bufio.seek(len(b), 1) 1679 return b 1680 self.check_flush_and_read(_peek) 1681 1682 def test_flush_and_write(self): 1683 raw = self.BytesIO(b"abcdefghi") 1684 bufio = self.tp(raw) 1685 1686 bufio.write(b"123") 1687 bufio.flush() 1688 bufio.write(b"45") 1689 bufio.flush() 1690 bufio.seek(0, 0) 1691 self.assertEqual(b"12345fghi", raw.getvalue()) 1692 self.assertEqual(b"12345fghi", bufio.read()) 1693 1694 def test_threads(self): 1695 BufferedReaderTest.test_threads(self) 1696 BufferedWriterTest.test_threads(self) 1697 1698 def test_writes_and_peek(self): 1699 def _peek(bufio): 1700 bufio.peek(1) 1701 self.check_writes(_peek) 1702 def _peek(bufio): 1703 pos = bufio.tell() 1704 bufio.seek(-1, 1) 1705 bufio.peek(1) 1706 bufio.seek(pos, 0) 1707 self.check_writes(_peek) 1708 1709 def test_writes_and_reads(self): 1710 def _read(bufio): 1711 bufio.seek(-1, 1) 1712 bufio.read(1) 1713 self.check_writes(_read) 1714 1715 def test_writes_and_read1s(self): 1716 def _read1(bufio): 1717 bufio.seek(-1, 1) 1718 bufio.read1(1) 1719 self.check_writes(_read1) 1720 1721 def test_writes_and_readintos(self): 1722 def _read(bufio): 1723 bufio.seek(-1, 1) 1724 bufio.readinto(bytearray(1)) 1725 self.check_writes(_read) 1726 1727 def test_write_after_readahead(self): 1728 # Issue #6629: writing after the buffer was filled by readahead should 1729 # first rewind the raw stream. 1730 for overwrite_size in [1, 5]: 1731 raw = self.BytesIO(b"A" * 10) 1732 bufio = self.tp(raw, 4) 1733 # Trigger readahead 1734 self.assertEqual(bufio.read(1), b"A") 1735 self.assertEqual(bufio.tell(), 1) 1736 # Overwriting should rewind the raw stream if it needs so 1737 bufio.write(b"B" * overwrite_size) 1738 self.assertEqual(bufio.tell(), overwrite_size + 1) 1739 # If the write size was smaller than the buffer size, flush() and 1740 # check that rewind happens. 1741 bufio.flush() 1742 self.assertEqual(bufio.tell(), overwrite_size + 1) 1743 s = raw.getvalue() 1744 self.assertEqual(s, 1745 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size)) 1746 1747 def test_write_rewind_write(self): 1748 # Various combinations of reading / writing / seeking backwards / writing again 1749 def mutate(bufio, pos1, pos2): 1750 assert pos2 >= pos1 1751 # Fill the buffer 1752 bufio.seek(pos1) 1753 bufio.read(pos2 - pos1) 1754 bufio.write(b'\x02') 1755 # This writes earlier than the previous write, but still inside 1756 # the buffer. 1757 bufio.seek(pos1) 1758 bufio.write(b'\x01') 1759 1760 b = b"\x80\x81\x82\x83\x84" 1761 for i in range(0, len(b)): 1762 for j in range(i, len(b)): 1763 raw = self.BytesIO(b) 1764 bufio = self.tp(raw, 100) 1765 mutate(bufio, i, j) 1766 bufio.flush() 1767 expected = bytearray(b) 1768 expected[j] = 2 1769 expected[i] = 1 1770 self.assertEqual(raw.getvalue(), expected, 1771 "failed result for i=%d, j=%d" % (i, j)) 1772 1773 def test_truncate_after_read_or_write(self): 1774 raw = self.BytesIO(b"A" * 10) 1775 bufio = self.tp(raw, 100) 1776 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled 1777 self.assertEqual(bufio.truncate(), 2) 1778 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases 1779 self.assertEqual(bufio.truncate(), 4) 1780 1781 def test_misbehaved_io(self): 1782 BufferedReaderTest.test_misbehaved_io(self) 1783 BufferedWriterTest.test_misbehaved_io(self) 1784 1785 def test_interleaved_read_write(self): 1786 # Test for issue #12213 1787 with self.BytesIO(b'abcdefgh') as raw: 1788 with self.tp(raw, 100) as f: 1789 f.write(b"1") 1790 self.assertEqual(f.read(1), b'b') 1791 f.write(b'2') 1792 self.assertEqual(f.read1(1), b'd') 1793 f.write(b'3') 1794 buf = bytearray(1) 1795 f.readinto(buf) 1796 self.assertEqual(buf, b'f') 1797 f.write(b'4') 1798 self.assertEqual(f.peek(1), b'h') 1799 f.flush() 1800 self.assertEqual(raw.getvalue(), b'1b2d3f4h') 1801 1802 with self.BytesIO(b'abc') as raw: 1803 with self.tp(raw, 100) as f: 1804 self.assertEqual(f.read(1), b'a') 1805 f.write(b"2") 1806 self.assertEqual(f.read(1), b'c') 1807 f.flush() 1808 self.assertEqual(raw.getvalue(), b'a2c') 1809 1810 def test_interleaved_readline_write(self): 1811 with self.BytesIO(b'ab\ncdef\ng\n') as raw: 1812 with self.tp(raw) as f: 1813 f.write(b'1') 1814 self.assertEqual(f.readline(), b'b\n') 1815 f.write(b'2') 1816 self.assertEqual(f.readline(), b'def\n') 1817 f.write(b'3') 1818 self.assertEqual(f.readline(), b'\n') 1819 f.flush() 1820 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n') 1821 1822 1823class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, 1824 BufferedRandomTest, SizeofTest): 1825 tp = io.BufferedRandom 1826 1827 def test_constructor(self): 1828 BufferedRandomTest.test_constructor(self) 1829 # The allocation can succeed on 32-bit builds, e.g. with more 1830 # than 2GB RAM and a 64-bit kernel. 1831 if sys.maxsize > 0x7FFFFFFF: 1832 rawio = self.MockRawIO() 1833 bufio = self.tp(rawio) 1834 self.assertRaises((OverflowError, MemoryError, ValueError), 1835 bufio.__init__, rawio, sys.maxsize) 1836 1837 def test_garbage_collection(self): 1838 CBufferedReaderTest.test_garbage_collection(self) 1839 CBufferedWriterTest.test_garbage_collection(self) 1840 1841 def test_args_error(self): 1842 # Issue #17275 1843 with self.assertRaisesRegexp(TypeError, "BufferedRandom"): 1844 self.tp(io.BytesIO(), 1024, 1024, 1024) 1845 1846 1847class PyBufferedRandomTest(BufferedRandomTest): 1848 tp = pyio.BufferedRandom 1849 1850 1851# To fully exercise seek/tell, the StatefulIncrementalDecoder has these 1852# properties: 1853# - A single output character can correspond to many bytes of input. 1854# - The number of input bytes to complete the character can be 1855# undetermined until the last input byte is received. 1856# - The number of input bytes can vary depending on previous input. 1857# - A single input byte can correspond to many characters of output. 1858# - The number of output characters can be undetermined until the 1859# last input byte is received. 1860# - The number of output characters can vary depending on previous input. 1861 1862class StatefulIncrementalDecoder(codecs.IncrementalDecoder): 1863 """ 1864 For testing seek/tell behavior with a stateful, buffering decoder. 1865 1866 Input is a sequence of words. Words may be fixed-length (length set 1867 by input) or variable-length (period-terminated). In variable-length 1868 mode, extra periods are ignored. Possible words are: 1869 - 'i' followed by a number sets the input length, I (maximum 99). 1870 When I is set to 0, words are space-terminated. 1871 - 'o' followed by a number sets the output length, O (maximum 99). 1872 - Any other word is converted into a word followed by a period on 1873 the output. The output word consists of the input word truncated 1874 or padded out with hyphens to make its length equal to O. If O 1875 is 0, the word is output verbatim without truncating or padding. 1876 I and O are initially set to 1. When I changes, any buffered input is 1877 re-scanned according to the new I. EOF also terminates the last word. 1878 """ 1879 1880 def __init__(self, errors='strict'): 1881 codecs.IncrementalDecoder.__init__(self, errors) 1882 self.reset() 1883 1884 def __repr__(self): 1885 return '<SID %x>' % id(self) 1886 1887 def reset(self): 1888 self.i = 1 1889 self.o = 1 1890 self.buffer = bytearray() 1891 1892 def getstate(self): 1893 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset() 1894 return bytes(self.buffer), i*100 + o 1895 1896 def setstate(self, state): 1897 buffer, io = state 1898 self.buffer = bytearray(buffer) 1899 i, o = divmod(io, 100) 1900 self.i, self.o = i ^ 1, o ^ 1 1901 1902 def decode(self, input, final=False): 1903 output = '' 1904 for b in input: 1905 if self.i == 0: # variable-length, terminated with period 1906 if b == '.': 1907 if self.buffer: 1908 output += self.process_word() 1909 else: 1910 self.buffer.append(b) 1911 else: # fixed-length, terminate after self.i bytes 1912 self.buffer.append(b) 1913 if len(self.buffer) == self.i: 1914 output += self.process_word() 1915 if final and self.buffer: # EOF terminates the last word 1916 output += self.process_word() 1917 return output 1918 1919 def process_word(self): 1920 output = '' 1921 if self.buffer[0] == ord('i'): 1922 self.i = min(99, int(self.buffer[1:] or 0)) # set input length 1923 elif self.buffer[0] == ord('o'): 1924 self.o = min(99, int(self.buffer[1:] or 0)) # set output length 1925 else: 1926 output = self.buffer.decode('ascii') 1927 if len(output) < self.o: 1928 output += '-'*self.o # pad out with hyphens 1929 if self.o: 1930 output = output[:self.o] # truncate to output length 1931 output += '.' 1932 self.buffer = bytearray() 1933 return output 1934 1935 codecEnabled = False 1936 1937 @classmethod 1938 def lookupTestDecoder(cls, name): 1939 if cls.codecEnabled and name == 'test_decoder': 1940 latin1 = codecs.lookup('latin-1') 1941 return codecs.CodecInfo( 1942 name='test_decoder', encode=latin1.encode, decode=None, 1943 incrementalencoder=None, 1944 streamreader=None, streamwriter=None, 1945 incrementaldecoder=cls) 1946 1947# Register the previous decoder for testing. 1948# Disabled by default, tests will enable it. 1949codecs.register(StatefulIncrementalDecoder.lookupTestDecoder) 1950 1951 1952class StatefulIncrementalDecoderTest(unittest.TestCase): 1953 """ 1954 Make sure the StatefulIncrementalDecoder actually works. 1955 """ 1956 1957 test_cases = [ 1958 # I=1, O=1 (fixed-length input == fixed-length output) 1959 (b'abcd', False, 'a.b.c.d.'), 1960 # I=0, O=0 (variable-length input, variable-length output) 1961 (b'oiabcd', True, 'abcd.'), 1962 # I=0, O=0 (should ignore extra periods) 1963 (b'oi...abcd...', True, 'abcd.'), 1964 # I=0, O=6 (variable-length input, fixed-length output) 1965 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'), 1966 # I=2, O=6 (fixed-length input < fixed-length output) 1967 (b'i.i2.o6xyz', True, 'xy----.z-----.'), 1968 # I=6, O=3 (fixed-length input > fixed-length output) 1969 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'), 1970 # I=0, then 3; O=29, then 15 (with longer output) 1971 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True, 1972 'a----------------------------.' + 1973 'b----------------------------.' + 1974 'cde--------------------------.' + 1975 'abcdefghijabcde.' + 1976 'a.b------------.' + 1977 '.c.------------.' + 1978 'd.e------------.' + 1979 'k--------------.' + 1980 'l--------------.' + 1981 'm--------------.') 1982 ] 1983 1984 def test_decoder(self): 1985 # Try a few one-shot test cases. 1986 for input, eof, output in self.test_cases: 1987 d = StatefulIncrementalDecoder() 1988 self.assertEqual(d.decode(input, eof), output) 1989 1990 # Also test an unfinished decode, followed by forcing EOF. 1991 d = StatefulIncrementalDecoder() 1992 self.assertEqual(d.decode(b'oiabcd'), '') 1993 self.assertEqual(d.decode(b'', 1), 'abcd.') 1994 1995class TextIOWrapperTest(unittest.TestCase): 1996 1997 def setUp(self): 1998 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n" 1999 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii") 2000 support.unlink(support.TESTFN) 2001 2002 def tearDown(self): 2003 support.unlink(support.TESTFN) 2004 2005 def test_constructor(self): 2006 r = self.BytesIO(b"\xc3\xa9\n\n") 2007 b = self.BufferedReader(r, 1000) 2008 t = self.TextIOWrapper(b) 2009 t.__init__(b, encoding="latin1", newline="\r\n") 2010 self.assertEqual(t.encoding, "latin1") 2011 self.assertEqual(t.line_buffering, False) 2012 t.__init__(b, encoding="utf8", line_buffering=True) 2013 self.assertEqual(t.encoding, "utf8") 2014 self.assertEqual(t.line_buffering, True) 2015 self.assertEqual("\xe9\n", t.readline()) 2016 self.assertRaises(TypeError, t.__init__, b, newline=42) 2017 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') 2018 2019 def test_uninitialized(self): 2020 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 2021 del t 2022 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 2023 self.assertRaises(Exception, repr, t) 2024 self.assertRaisesRegexp((ValueError, AttributeError), 2025 'uninitialized|has no attribute', 2026 t.read, 0) 2027 t.__init__(self.MockRawIO()) 2028 self.assertEqual(t.read(0), u'') 2029 2030 def test_non_text_encoding_codecs_are_rejected(self): 2031 # Ensure the constructor complains if passed a codec that isn't 2032 # marked as a text encoding 2033 # http://bugs.python.org/issue20404 2034 r = self.BytesIO() 2035 b = self.BufferedWriter(r) 2036 with support.check_py3k_warnings(): 2037 self.TextIOWrapper(b, encoding="hex_codec") 2038 2039 def test_detach(self): 2040 r = self.BytesIO() 2041 b = self.BufferedWriter(r) 2042 t = self.TextIOWrapper(b) 2043 self.assertIs(t.detach(), b) 2044 2045 t = self.TextIOWrapper(b, encoding="ascii") 2046 t.write("howdy") 2047 self.assertFalse(r.getvalue()) 2048 t.detach() 2049 self.assertEqual(r.getvalue(), b"howdy") 2050 self.assertRaises(ValueError, t.detach) 2051 2052 # Operations independent of the detached stream should still work 2053 repr(t) 2054 self.assertEqual(t.encoding, "ascii") 2055 self.assertEqual(t.errors, "strict") 2056 self.assertFalse(t.line_buffering) 2057 2058 def test_repr(self): 2059 raw = self.BytesIO("hello".encode("utf-8")) 2060 b = self.BufferedReader(raw) 2061 t = self.TextIOWrapper(b, encoding="utf-8") 2062 modname = self.TextIOWrapper.__module__ 2063 self.assertEqual(repr(t), 2064 "<%s.TextIOWrapper encoding='utf-8'>" % modname) 2065 raw.name = "dummy" 2066 self.assertEqual(repr(t), 2067 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname) 2068 raw.name = b"dummy" 2069 self.assertEqual(repr(t), 2070 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname) 2071 2072 t.buffer.detach() 2073 repr(t) # Should not raise an exception 2074 2075 def test_line_buffering(self): 2076 r = self.BytesIO() 2077 b = self.BufferedWriter(r, 1000) 2078 t = self.TextIOWrapper(b, newline="\n", line_buffering=True) 2079 t.write("X") 2080 self.assertEqual(r.getvalue(), b"") # No flush happened 2081 t.write("Y\nZ") 2082 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed 2083 t.write("A\rB") 2084 self.assertEqual(r.getvalue(), b"XY\nZA\rB") 2085 2086 def test_encoding(self): 2087 # Check the encoding attribute is always set, and valid 2088 b = self.BytesIO() 2089 t = self.TextIOWrapper(b, encoding="utf8") 2090 self.assertEqual(t.encoding, "utf8") 2091 t = self.TextIOWrapper(b) 2092 self.assertIsNotNone(t.encoding) 2093 codecs.lookup(t.encoding) 2094 2095 def test_encoding_errors_reading(self): 2096 # (1) default 2097 b = self.BytesIO(b"abc\n\xff\n") 2098 t = self.TextIOWrapper(b, encoding="ascii") 2099 self.assertRaises(UnicodeError, t.read) 2100 # (2) explicit strict 2101 b = self.BytesIO(b"abc\n\xff\n") 2102 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 2103 self.assertRaises(UnicodeError, t.read) 2104 # (3) ignore 2105 b = self.BytesIO(b"abc\n\xff\n") 2106 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore") 2107 self.assertEqual(t.read(), "abc\n\n") 2108 # (4) replace 2109 b = self.BytesIO(b"abc\n\xff\n") 2110 t = self.TextIOWrapper(b, encoding="ascii", errors="replace") 2111 self.assertEqual(t.read(), "abc\n\ufffd\n") 2112 2113 def test_encoding_errors_writing(self): 2114 # (1) default 2115 b = self.BytesIO() 2116 t = self.TextIOWrapper(b, encoding="ascii") 2117 self.assertRaises(UnicodeError, t.write, "\xff") 2118 # (2) explicit strict 2119 b = self.BytesIO() 2120 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 2121 self.assertRaises(UnicodeError, t.write, "\xff") 2122 # (3) ignore 2123 b = self.BytesIO() 2124 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore", 2125 newline="\n") 2126 t.write("abc\xffdef\n") 2127 t.flush() 2128 self.assertEqual(b.getvalue(), b"abcdef\n") 2129 # (4) replace 2130 b = self.BytesIO() 2131 t = self.TextIOWrapper(b, encoding="ascii", errors="replace", 2132 newline="\n") 2133 t.write("abc\xffdef\n") 2134 t.flush() 2135 self.assertEqual(b.getvalue(), b"abc?def\n") 2136 2137 def test_newlines(self): 2138 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ] 2139 2140 tests = [ 2141 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ], 2142 [ '', input_lines ], 2143 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ], 2144 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ], 2145 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ], 2146 ] 2147 encodings = ( 2148 'utf-8', 'latin-1', 2149 'utf-16', 'utf-16-le', 'utf-16-be', 2150 'utf-32', 'utf-32-le', 'utf-32-be', 2151 ) 2152 2153 # Try a range of buffer sizes to test the case where \r is the last 2154 # character in TextIOWrapper._pending_line. 2155 for encoding in encodings: 2156 # XXX: str.encode() should return bytes 2157 data = bytes(''.join(input_lines).encode(encoding)) 2158 for do_reads in (False, True): 2159 for bufsize in range(1, 10): 2160 for newline, exp_lines in tests: 2161 bufio = self.BufferedReader(self.BytesIO(data), bufsize) 2162 textio = self.TextIOWrapper(bufio, newline=newline, 2163 encoding=encoding) 2164 if do_reads: 2165 got_lines = [] 2166 while True: 2167 c2 = textio.read(2) 2168 if c2 == '': 2169 break 2170 self.assertEqual(len(c2), 2) 2171 got_lines.append(c2 + textio.readline()) 2172 else: 2173 got_lines = list(textio) 2174 2175 for got_line, exp_line in zip(got_lines, exp_lines): 2176 self.assertEqual(got_line, exp_line) 2177 self.assertEqual(len(got_lines), len(exp_lines)) 2178 2179 def test_newlines_input(self): 2180 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" 2181 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") 2182 for newline, expected in [ 2183 (None, normalized.decode("ascii").splitlines(True)), 2184 ("", testdata.decode("ascii").splitlines(True)), 2185 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 2186 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 2187 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), 2188 ]: 2189 buf = self.BytesIO(testdata) 2190 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 2191 self.assertEqual(txt.readlines(), expected) 2192 txt.seek(0) 2193 self.assertEqual(txt.read(), "".join(expected)) 2194 2195 def test_newlines_output(self): 2196 testdict = { 2197 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 2198 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 2199 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ", 2200 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ", 2201 } 2202 tests = [(None, testdict[os.linesep])] + sorted(testdict.items()) 2203 for newline, expected in tests: 2204 buf = self.BytesIO() 2205 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 2206 txt.write("AAA\nB") 2207 txt.write("BB\nCCC\n") 2208 txt.write("X\rY\r\nZ") 2209 txt.flush() 2210 self.assertEqual(buf.closed, False) 2211 self.assertEqual(buf.getvalue(), expected) 2212 2213 def test_destructor(self): 2214 l = [] 2215 base = self.BytesIO 2216 class MyBytesIO(base): 2217 def close(self): 2218 l.append(self.getvalue()) 2219 base.close(self) 2220 b = MyBytesIO() 2221 t = self.TextIOWrapper(b, encoding="ascii") 2222 t.write("abc") 2223 del t 2224 support.gc_collect() 2225 self.assertEqual([b"abc"], l) 2226 2227 def test_override_destructor(self): 2228 record = [] 2229 class MyTextIO(self.TextIOWrapper): 2230 def __del__(self): 2231 record.append(1) 2232 try: 2233 f = super(MyTextIO, self).__del__ 2234 except AttributeError: 2235 pass 2236 else: 2237 f() 2238 def close(self): 2239 record.append(2) 2240 super(MyTextIO, self).close() 2241 def flush(self): 2242 record.append(3) 2243 super(MyTextIO, self).flush() 2244 b = self.BytesIO() 2245 t = MyTextIO(b, encoding="ascii") 2246 del t 2247 support.gc_collect() 2248 self.assertEqual(record, [1, 2, 3]) 2249 2250 def test_error_through_destructor(self): 2251 # Test that the exception state is not modified by a destructor, 2252 # even if close() fails. 2253 rawio = self.CloseFailureIO() 2254 def f(): 2255 self.TextIOWrapper(rawio).xyzzy 2256 with support.captured_output("stderr") as s: 2257 self.assertRaises(AttributeError, f) 2258 s = s.getvalue().strip() 2259 if s: 2260 # The destructor *may* have printed an unraisable error, check it 2261 self.assertEqual(len(s.splitlines()), 1) 2262 self.assertTrue(s.startswith("Exception IOError: "), s) 2263 self.assertTrue(s.endswith(" ignored"), s) 2264 2265 # Systematic tests of the text I/O API 2266 2267 def test_basic_io(self): 2268 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65): 2269 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le": 2270 f = self.open(support.TESTFN, "w+", encoding=enc) 2271 f._CHUNK_SIZE = chunksize 2272 self.assertEqual(f.write("abc"), 3) 2273 f.close() 2274 f = self.open(support.TESTFN, "r+", encoding=enc) 2275 f._CHUNK_SIZE = chunksize 2276 self.assertEqual(f.tell(), 0) 2277 self.assertEqual(f.read(), "abc") 2278 cookie = f.tell() 2279 self.assertEqual(f.seek(0), 0) 2280 self.assertEqual(f.read(None), "abc") 2281 f.seek(0) 2282 self.assertEqual(f.read(2), "ab") 2283 self.assertEqual(f.read(1), "c") 2284 self.assertEqual(f.read(1), "") 2285 self.assertEqual(f.read(), "") 2286 self.assertEqual(f.tell(), cookie) 2287 self.assertEqual(f.seek(0), 0) 2288 self.assertEqual(f.seek(0, 2), cookie) 2289 self.assertEqual(f.write("def"), 3) 2290 self.assertEqual(f.seek(cookie), cookie) 2291 self.assertEqual(f.read(), "def") 2292 if enc.startswith("utf"): 2293 self.multi_line_test(f, enc) 2294 f.close() 2295 2296 def multi_line_test(self, f, enc): 2297 f.seek(0) 2298 f.truncate() 2299 sample = "s\xff\u0fff\uffff" 2300 wlines = [] 2301 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000): 2302 chars = [] 2303 for i in range(size): 2304 chars.append(sample[i % len(sample)]) 2305 line = "".join(chars) + "\n" 2306 wlines.append((f.tell(), line)) 2307 f.write(line) 2308 f.seek(0) 2309 rlines = [] 2310 while True: 2311 pos = f.tell() 2312 line = f.readline() 2313 if not line: 2314 break 2315 rlines.append((pos, line)) 2316 self.assertEqual(rlines, wlines) 2317 2318 def test_telling(self): 2319 f = self.open(support.TESTFN, "w+", encoding="utf8") 2320 p0 = f.tell() 2321 f.write("\xff\n") 2322 p1 = f.tell() 2323 f.write("\xff\n") 2324 p2 = f.tell() 2325 f.seek(0) 2326 self.assertEqual(f.tell(), p0) 2327 self.assertEqual(f.readline(), "\xff\n") 2328 self.assertEqual(f.tell(), p1) 2329 self.assertEqual(f.readline(), "\xff\n") 2330 self.assertEqual(f.tell(), p2) 2331 f.seek(0) 2332 for line in f: 2333 self.assertEqual(line, "\xff\n") 2334 self.assertRaises(IOError, f.tell) 2335 self.assertEqual(f.tell(), p2) 2336 f.close() 2337 2338 def test_seeking(self): 2339 chunk_size = _default_chunk_size() 2340 prefix_size = chunk_size - 2 2341 u_prefix = "a" * prefix_size 2342 prefix = bytes(u_prefix.encode("utf-8")) 2343 self.assertEqual(len(u_prefix), len(prefix)) 2344 u_suffix = "\u8888\n" 2345 suffix = bytes(u_suffix.encode("utf-8")) 2346 line = prefix + suffix 2347 f = self.open(support.TESTFN, "wb") 2348 f.write(line*2) 2349 f.close() 2350 f = self.open(support.TESTFN, "r", encoding="utf-8") 2351 s = f.read(prefix_size) 2352 self.assertEqual(s, prefix.decode("ascii")) 2353 self.assertEqual(f.tell(), prefix_size) 2354 self.assertEqual(f.readline(), u_suffix) 2355 2356 def test_seeking_too(self): 2357 # Regression test for a specific bug 2358 data = b'\xe0\xbf\xbf\n' 2359 f = self.open(support.TESTFN, "wb") 2360 f.write(data) 2361 f.close() 2362 f = self.open(support.TESTFN, "r", encoding="utf-8") 2363 f._CHUNK_SIZE # Just test that it exists 2364 f._CHUNK_SIZE = 2 2365 f.readline() 2366 f.tell() 2367 2368 def test_seek_and_tell(self): 2369 #Test seek/tell using the StatefulIncrementalDecoder. 2370 # Make test faster by doing smaller seeks 2371 CHUNK_SIZE = 128 2372 2373 def test_seek_and_tell_with_data(data, min_pos=0): 2374 """Tell/seek to various points within a data stream and ensure 2375 that the decoded data returned by read() is consistent.""" 2376 f = self.open(support.TESTFN, 'wb') 2377 f.write(data) 2378 f.close() 2379 f = self.open(support.TESTFN, encoding='test_decoder') 2380 f._CHUNK_SIZE = CHUNK_SIZE 2381 decoded = f.read() 2382 f.close() 2383 2384 for i in range(min_pos, len(decoded) + 1): # seek positions 2385 for j in [1, 5, len(decoded) - i]: # read lengths 2386 f = self.open(support.TESTFN, encoding='test_decoder') 2387 self.assertEqual(f.read(i), decoded[:i]) 2388 cookie = f.tell() 2389 self.assertEqual(f.read(j), decoded[i:i + j]) 2390 f.seek(cookie) 2391 self.assertEqual(f.read(), decoded[i:]) 2392 f.close() 2393 2394 # Enable the test decoder. 2395 StatefulIncrementalDecoder.codecEnabled = 1 2396 2397 # Run the tests. 2398 try: 2399 # Try each test case. 2400 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 2401 test_seek_and_tell_with_data(input) 2402 2403 # Position each test case so that it crosses a chunk boundary. 2404 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 2405 offset = CHUNK_SIZE - len(input)//2 2406 prefix = b'.'*offset 2407 # Don't bother seeking into the prefix (takes too long). 2408 min_pos = offset*2 2409 test_seek_and_tell_with_data(prefix + input, min_pos) 2410 2411 # Ensure our test decoder won't interfere with subsequent tests. 2412 finally: 2413 StatefulIncrementalDecoder.codecEnabled = 0 2414 2415 def test_encoded_writes(self): 2416 data = "1234567890" 2417 tests = ("utf-16", 2418 "utf-16-le", 2419 "utf-16-be", 2420 "utf-32", 2421 "utf-32-le", 2422 "utf-32-be") 2423 for encoding in tests: 2424 buf = self.BytesIO() 2425 f = self.TextIOWrapper(buf, encoding=encoding) 2426 # Check if the BOM is written only once (see issue1753). 2427 f.write(data) 2428 f.write(data) 2429 f.seek(0) 2430 self.assertEqual(f.read(), data * 2) 2431 f.seek(0) 2432 self.assertEqual(f.read(), data * 2) 2433 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding)) 2434 2435 def test_unreadable(self): 2436 class UnReadable(self.BytesIO): 2437 def readable(self): 2438 return False 2439 txt = self.TextIOWrapper(UnReadable()) 2440 self.assertRaises(IOError, txt.read) 2441 2442 def test_read_one_by_one(self): 2443 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB")) 2444 reads = "" 2445 while True: 2446 c = txt.read(1) 2447 if not c: 2448 break 2449 reads += c 2450 self.assertEqual(reads, "AA\nBB") 2451 2452 def test_readlines(self): 2453 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC")) 2454 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"]) 2455 txt.seek(0) 2456 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"]) 2457 txt.seek(0) 2458 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"]) 2459 2460 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128. 2461 def test_read_by_chunk(self): 2462 # make sure "\r\n" straddles 128 char boundary. 2463 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB")) 2464 reads = "" 2465 while True: 2466 c = txt.read(128) 2467 if not c: 2468 break 2469 reads += c 2470 self.assertEqual(reads, "A"*127+"\nB") 2471 2472 def test_writelines(self): 2473 l = ['ab', 'cd', 'ef'] 2474 buf = self.BytesIO() 2475 txt = self.TextIOWrapper(buf) 2476 txt.writelines(l) 2477 txt.flush() 2478 self.assertEqual(buf.getvalue(), b'abcdef') 2479 2480 def test_writelines_userlist(self): 2481 l = UserList(['ab', 'cd', 'ef']) 2482 buf = self.BytesIO() 2483 txt = self.TextIOWrapper(buf) 2484 txt.writelines(l) 2485 txt.flush() 2486 self.assertEqual(buf.getvalue(), b'abcdef') 2487 2488 def test_writelines_error(self): 2489 txt = self.TextIOWrapper(self.BytesIO()) 2490 self.assertRaises(TypeError, txt.writelines, [1, 2, 3]) 2491 self.assertRaises(TypeError, txt.writelines, None) 2492 self.assertRaises(TypeError, txt.writelines, b'abc') 2493 2494 def test_issue1395_1(self): 2495 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2496 2497 # read one char at a time 2498 reads = "" 2499 while True: 2500 c = txt.read(1) 2501 if not c: 2502 break 2503 reads += c 2504 self.assertEqual(reads, self.normalized) 2505 2506 def test_issue1395_2(self): 2507 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2508 txt._CHUNK_SIZE = 4 2509 2510 reads = "" 2511 while True: 2512 c = txt.read(4) 2513 if not c: 2514 break 2515 reads += c 2516 self.assertEqual(reads, self.normalized) 2517 2518 def test_issue1395_3(self): 2519 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2520 txt._CHUNK_SIZE = 4 2521 2522 reads = txt.read(4) 2523 reads += txt.read(4) 2524 reads += txt.readline() 2525 reads += txt.readline() 2526 reads += txt.readline() 2527 self.assertEqual(reads, self.normalized) 2528 2529 def test_issue1395_4(self): 2530 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2531 txt._CHUNK_SIZE = 4 2532 2533 reads = txt.read(4) 2534 reads += txt.read() 2535 self.assertEqual(reads, self.normalized) 2536 2537 def test_issue1395_5(self): 2538 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2539 txt._CHUNK_SIZE = 4 2540 2541 reads = txt.read(4) 2542 pos = txt.tell() 2543 txt.seek(0) 2544 txt.seek(pos) 2545 self.assertEqual(txt.read(4), "BBB\n") 2546 2547 def test_issue2282(self): 2548 buffer = self.BytesIO(self.testdata) 2549 txt = self.TextIOWrapper(buffer, encoding="ascii") 2550 2551 self.assertEqual(buffer.seekable(), txt.seekable()) 2552 2553 def test_append_bom(self): 2554 # The BOM is not written again when appending to a non-empty file 2555 filename = support.TESTFN 2556 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 2557 with self.open(filename, 'w', encoding=charset) as f: 2558 f.write('aaa') 2559 pos = f.tell() 2560 with self.open(filename, 'rb') as f: 2561 self.assertEqual(f.read(), 'aaa'.encode(charset)) 2562 2563 with self.open(filename, 'a', encoding=charset) as f: 2564 f.write('xxx') 2565 with self.open(filename, 'rb') as f: 2566 self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) 2567 2568 def test_seek_bom(self): 2569 # Same test, but when seeking manually 2570 filename = support.TESTFN 2571 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 2572 with self.open(filename, 'w', encoding=charset) as f: 2573 f.write('aaa') 2574 pos = f.tell() 2575 with self.open(filename, 'r+', encoding=charset) as f: 2576 f.seek(pos) 2577 f.write('zzz') 2578 f.seek(0) 2579 f.write('bbb') 2580 with self.open(filename, 'rb') as f: 2581 self.assertEqual(f.read(), 'bbbzzz'.encode(charset)) 2582 2583 def test_errors_property(self): 2584 with self.open(support.TESTFN, "w") as f: 2585 self.assertEqual(f.errors, "strict") 2586 with self.open(support.TESTFN, "w", errors="replace") as f: 2587 self.assertEqual(f.errors, "replace") 2588 2589 @unittest.skipUnless(threading, 'Threading required for this test.') 2590 def test_threads_write(self): 2591 # Issue6750: concurrent writes could duplicate data 2592 event = threading.Event() 2593 with self.open(support.TESTFN, "w", buffering=1) as f: 2594 def run(n): 2595 text = "Thread%03d\n" % n 2596 event.wait() 2597 f.write(text) 2598 threads = [threading.Thread(target=run, args=(x,)) 2599 for x in range(20)] 2600 with support.start_threads(threads, event.set): 2601 time.sleep(0.02) 2602 with self.open(support.TESTFN) as f: 2603 content = f.read() 2604 for n in range(20): 2605 self.assertEqual(content.count("Thread%03d\n" % n), 1) 2606 2607 def test_flush_error_on_close(self): 2608 # Test that text file is closed despite failed flush 2609 # and that flush() is called before file closed. 2610 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2611 closed = [] 2612 def bad_flush(): 2613 closed[:] = [txt.closed, txt.buffer.closed] 2614 raise IOError() 2615 txt.flush = bad_flush 2616 self.assertRaises(IOError, txt.close) # exception not swallowed 2617 self.assertTrue(txt.closed) 2618 self.assertTrue(txt.buffer.closed) 2619 self.assertTrue(closed) # flush() called 2620 self.assertFalse(closed[0]) # flush() called before file closed 2621 self.assertFalse(closed[1]) 2622 txt.flush = lambda: None # break reference loop 2623 2624 def test_multi_close(self): 2625 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2626 txt.close() 2627 txt.close() 2628 txt.close() 2629 self.assertRaises(ValueError, txt.flush) 2630 2631 def test_readonly_attributes(self): 2632 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2633 buf = self.BytesIO(self.testdata) 2634 with self.assertRaises((AttributeError, TypeError)): 2635 txt.buffer = buf 2636 2637 def test_read_nonbytes(self): 2638 # Issue #17106 2639 # Crash when underlying read() returns non-bytes 2640 class NonbytesStream(self.StringIO): 2641 read1 = self.StringIO.read 2642 class NonbytesStream(self.StringIO): 2643 read1 = self.StringIO.read 2644 t = self.TextIOWrapper(NonbytesStream('a')) 2645 with self.maybeRaises(TypeError): 2646 t.read(1) 2647 t = self.TextIOWrapper(NonbytesStream('a')) 2648 with self.maybeRaises(TypeError): 2649 t.readline() 2650 t = self.TextIOWrapper(NonbytesStream('a')) 2651 self.assertEqual(t.read(), u'a') 2652 2653 def test_illegal_decoder(self): 2654 # Issue #17106 2655 # Bypass the early encoding check added in issue 20404 2656 def _make_illegal_wrapper(): 2657 quopri = codecs.lookup("quopri_codec") 2658 quopri._is_text_encoding = True 2659 try: 2660 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), 2661 newline='\n', encoding="quopri_codec") 2662 finally: 2663 quopri._is_text_encoding = False 2664 return t 2665 # Crash when decoder returns non-string 2666 with support.check_py3k_warnings(): 2667 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n', 2668 encoding='quopri_codec') 2669 with self.maybeRaises(TypeError): 2670 t.read(1) 2671 with support.check_py3k_warnings(): 2672 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n', 2673 encoding='quopri_codec') 2674 with self.maybeRaises(TypeError): 2675 t.readline() 2676 with support.check_py3k_warnings(): 2677 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n', 2678 encoding='quopri_codec') 2679 with self.maybeRaises(TypeError): 2680 t.read() 2681 #else: 2682 #t = _make_illegal_wrapper() 2683 #self.assertRaises(TypeError, t.read, 1) 2684 #t = _make_illegal_wrapper() 2685 #self.assertRaises(TypeError, t.readline) 2686 #t = _make_illegal_wrapper() 2687 #self.assertRaises(TypeError, t.read) 2688 2689 2690class CTextIOWrapperTest(TextIOWrapperTest): 2691 2692 def test_initialization(self): 2693 r = self.BytesIO(b"\xc3\xa9\n\n") 2694 b = self.BufferedReader(r, 1000) 2695 t = self.TextIOWrapper(b) 2696 self.assertRaises(TypeError, t.__init__, b, newline=42) 2697 self.assertRaises(ValueError, t.read) 2698 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') 2699 self.assertRaises(ValueError, t.read) 2700 2701 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 2702 self.assertRaises(Exception, repr, t) 2703 2704 def test_garbage_collection(self): 2705 # C TextIOWrapper objects are collected, and collecting them flushes 2706 # all data to disk. 2707 # The Python version has __del__, so it ends in gc.garbage instead. 2708 rawio = io.FileIO(support.TESTFN, "wb") 2709 b = self.BufferedWriter(rawio) 2710 t = self.TextIOWrapper(b, encoding="ascii") 2711 t.write("456def") 2712 t.x = t 2713 wr = weakref.ref(t) 2714 del t 2715 support.gc_collect() 2716 self.assertIsNone(wr(), wr) 2717 with self.open(support.TESTFN, "rb") as f: 2718 self.assertEqual(f.read(), b"456def") 2719 2720 def test_rwpair_cleared_before_textio(self): 2721 # Issue 13070: TextIOWrapper's finalization would crash when called 2722 # after the reference to the underlying BufferedRWPair's writer got 2723 # cleared by the GC. 2724 for i in range(1000): 2725 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) 2726 t1 = self.TextIOWrapper(b1, encoding="ascii") 2727 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) 2728 t2 = self.TextIOWrapper(b2, encoding="ascii") 2729 # circular references 2730 t1.buddy = t2 2731 t2.buddy = t1 2732 support.gc_collect() 2733 2734 maybeRaises = unittest.TestCase.assertRaises 2735 2736 2737class PyTextIOWrapperTest(TextIOWrapperTest): 2738 @contextlib.contextmanager 2739 def maybeRaises(self, *args, **kwds): 2740 yield 2741 2742 2743class IncrementalNewlineDecoderTest(unittest.TestCase): 2744 2745 def check_newline_decoding_utf8(self, decoder): 2746 # UTF-8 specific tests for a newline decoder 2747 def _check_decode(b, s, **kwargs): 2748 # We exercise getstate() / setstate() as well as decode() 2749 state = decoder.getstate() 2750 self.assertEqual(decoder.decode(b, **kwargs), s) 2751 decoder.setstate(state) 2752 self.assertEqual(decoder.decode(b, **kwargs), s) 2753 2754 _check_decode(b'\xe8\xa2\x88', "\u8888") 2755 2756 _check_decode(b'\xe8', "") 2757 _check_decode(b'\xa2', "") 2758 _check_decode(b'\x88', "\u8888") 2759 2760 _check_decode(b'\xe8', "") 2761 _check_decode(b'\xa2', "") 2762 _check_decode(b'\x88', "\u8888") 2763 2764 _check_decode(b'\xe8', "") 2765 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True) 2766 2767 decoder.reset() 2768 _check_decode(b'\n', "\n") 2769 _check_decode(b'\r', "") 2770 _check_decode(b'', "\n", final=True) 2771 _check_decode(b'\r', "\n", final=True) 2772 2773 _check_decode(b'\r', "") 2774 _check_decode(b'a', "\na") 2775 2776 _check_decode(b'\r\r\n', "\n\n") 2777 _check_decode(b'\r', "") 2778 _check_decode(b'\r', "\n") 2779 _check_decode(b'\na', "\na") 2780 2781 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n") 2782 _check_decode(b'\xe8\xa2\x88', "\u8888") 2783 _check_decode(b'\n', "\n") 2784 _check_decode(b'\xe8\xa2\x88\r', "\u8888") 2785 _check_decode(b'\n', "\n") 2786 2787 def check_newline_decoding(self, decoder, encoding): 2788 result = [] 2789 if encoding is not None: 2790 encoder = codecs.getincrementalencoder(encoding)() 2791 def _decode_bytewise(s): 2792 # Decode one byte at a time 2793 for b in encoder.encode(s): 2794 result.append(decoder.decode(b)) 2795 else: 2796 encoder = None 2797 def _decode_bytewise(s): 2798 # Decode one char at a time 2799 for c in s: 2800 result.append(decoder.decode(c)) 2801 self.assertEqual(decoder.newlines, None) 2802 _decode_bytewise("abc\n\r") 2803 self.assertEqual(decoder.newlines, '\n') 2804 _decode_bytewise("\nabc") 2805 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 2806 _decode_bytewise("abc\r") 2807 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 2808 _decode_bytewise("abc") 2809 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n')) 2810 _decode_bytewise("abc\r") 2811 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc") 2812 decoder.reset() 2813 input = "abc" 2814 if encoder is not None: 2815 encoder.reset() 2816 input = encoder.encode(input) 2817 self.assertEqual(decoder.decode(input), "abc") 2818 self.assertEqual(decoder.newlines, None) 2819 2820 def test_newline_decoder(self): 2821 encodings = ( 2822 # None meaning the IncrementalNewlineDecoder takes unicode input 2823 # rather than bytes input 2824 None, 'utf-8', 'latin-1', 2825 'utf-16', 'utf-16-le', 'utf-16-be', 2826 'utf-32', 'utf-32-le', 'utf-32-be', 2827 ) 2828 for enc in encodings: 2829 decoder = enc and codecs.getincrementaldecoder(enc)() 2830 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 2831 self.check_newline_decoding(decoder, enc) 2832 decoder = codecs.getincrementaldecoder("utf-8")() 2833 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 2834 self.check_newline_decoding_utf8(decoder) 2835 2836 def test_newline_bytes(self): 2837 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder 2838 def _check(dec): 2839 self.assertEqual(dec.newlines, None) 2840 self.assertEqual(dec.decode("\u0D00"), "\u0D00") 2841 self.assertEqual(dec.newlines, None) 2842 self.assertEqual(dec.decode("\u0A00"), "\u0A00") 2843 self.assertEqual(dec.newlines, None) 2844 dec = self.IncrementalNewlineDecoder(None, translate=False) 2845 _check(dec) 2846 dec = self.IncrementalNewlineDecoder(None, translate=True) 2847 _check(dec) 2848 2849class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 2850 pass 2851 2852class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 2853 pass 2854 2855 2856# XXX Tests for open() 2857 2858class MiscIOTest(unittest.TestCase): 2859 2860 def tearDown(self): 2861 support.unlink(support.TESTFN) 2862 2863 def test___all__(self): 2864 for name in self.io.__all__: 2865 obj = getattr(self.io, name, None) 2866 self.assertIsNotNone(obj, name) 2867 if name == "open": 2868 continue 2869 elif "error" in name.lower() or name == "UnsupportedOperation": 2870 self.assertTrue(issubclass(obj, Exception), name) 2871 elif not name.startswith("SEEK_"): 2872 self.assertTrue(issubclass(obj, self.IOBase)) 2873 2874 def test_attributes(self): 2875 f = self.open(support.TESTFN, "wb", buffering=0) 2876 self.assertEqual(f.mode, "wb") 2877 f.close() 2878 2879 f = self.open(support.TESTFN, "U") 2880 self.assertEqual(f.name, support.TESTFN) 2881 self.assertEqual(f.buffer.name, support.TESTFN) 2882 self.assertEqual(f.buffer.raw.name, support.TESTFN) 2883 self.assertEqual(f.mode, "U") 2884 self.assertEqual(f.buffer.mode, "rb") 2885 self.assertEqual(f.buffer.raw.mode, "rb") 2886 f.close() 2887 2888 f = self.open(support.TESTFN, "w+") 2889 self.assertEqual(f.mode, "w+") 2890 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter? 2891 self.assertEqual(f.buffer.raw.mode, "rb+") 2892 2893 g = self.open(f.fileno(), "wb", closefd=False) 2894 self.assertEqual(g.mode, "wb") 2895 self.assertEqual(g.raw.mode, "wb") 2896 self.assertEqual(g.name, f.fileno()) 2897 self.assertEqual(g.raw.name, f.fileno()) 2898 f.close() 2899 g.close() 2900 2901 def test_io_after_close(self): 2902 for kwargs in [ 2903 {"mode": "w"}, 2904 {"mode": "wb"}, 2905 {"mode": "w", "buffering": 1}, 2906 {"mode": "w", "buffering": 2}, 2907 {"mode": "wb", "buffering": 0}, 2908 {"mode": "r"}, 2909 {"mode": "rb"}, 2910 {"mode": "r", "buffering": 1}, 2911 {"mode": "r", "buffering": 2}, 2912 {"mode": "rb", "buffering": 0}, 2913 {"mode": "w+"}, 2914 {"mode": "w+b"}, 2915 {"mode": "w+", "buffering": 1}, 2916 {"mode": "w+", "buffering": 2}, 2917 {"mode": "w+b", "buffering": 0}, 2918 ]: 2919 f = self.open(support.TESTFN, **kwargs) 2920 f.close() 2921 self.assertRaises(ValueError, f.flush) 2922 self.assertRaises(ValueError, f.fileno) 2923 self.assertRaises(ValueError, f.isatty) 2924 self.assertRaises(ValueError, f.__iter__) 2925 if hasattr(f, "peek"): 2926 self.assertRaises(ValueError, f.peek, 1) 2927 self.assertRaises(ValueError, f.read) 2928 if hasattr(f, "read1"): 2929 self.assertRaises(ValueError, f.read1, 1024) 2930 if hasattr(f, "readall"): 2931 self.assertRaises(ValueError, f.readall) 2932 if hasattr(f, "readinto"): 2933 self.assertRaises(ValueError, f.readinto, bytearray(1024)) 2934 self.assertRaises(ValueError, f.readline) 2935 self.assertRaises(ValueError, f.readlines) 2936 self.assertRaises(ValueError, f.seek, 0) 2937 self.assertRaises(ValueError, f.tell) 2938 self.assertRaises(ValueError, f.truncate) 2939 self.assertRaises(ValueError, f.write, 2940 b"" if "b" in kwargs['mode'] else "") 2941 self.assertRaises(ValueError, f.writelines, []) 2942 self.assertRaises(ValueError, next, f) 2943 2944 def test_blockingioerror(self): 2945 # Various BlockingIOError issues 2946 self.assertRaises(TypeError, self.BlockingIOError) 2947 self.assertRaises(TypeError, self.BlockingIOError, 1) 2948 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4) 2949 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None) 2950 b = self.BlockingIOError(1, "") 2951 self.assertEqual(b.characters_written, 0) 2952 class C(unicode): 2953 pass 2954 c = C("") 2955 b = self.BlockingIOError(1, c) 2956 c.b = b 2957 b.c = c 2958 wr = weakref.ref(c) 2959 del c, b 2960 support.gc_collect() 2961 self.assertIsNone(wr(), wr) 2962 2963 def test_abcs(self): 2964 # Test the visible base classes are ABCs. 2965 self.assertIsInstance(self.IOBase, abc.ABCMeta) 2966 self.assertIsInstance(self.RawIOBase, abc.ABCMeta) 2967 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta) 2968 self.assertIsInstance(self.TextIOBase, abc.ABCMeta) 2969 2970 def _check_abc_inheritance(self, abcmodule): 2971 with self.open(support.TESTFN, "wb", buffering=0) as f: 2972 self.assertIsInstance(f, abcmodule.IOBase) 2973 self.assertIsInstance(f, abcmodule.RawIOBase) 2974 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 2975 self.assertNotIsInstance(f, abcmodule.TextIOBase) 2976 with self.open(support.TESTFN, "wb") as f: 2977 self.assertIsInstance(f, abcmodule.IOBase) 2978 self.assertNotIsInstance(f, abcmodule.RawIOBase) 2979 self.assertIsInstance(f, abcmodule.BufferedIOBase) 2980 self.assertNotIsInstance(f, abcmodule.TextIOBase) 2981 with self.open(support.TESTFN, "w") as f: 2982 self.assertIsInstance(f, abcmodule.IOBase) 2983 self.assertNotIsInstance(f, abcmodule.RawIOBase) 2984 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 2985 self.assertIsInstance(f, abcmodule.TextIOBase) 2986 2987 def test_abc_inheritance(self): 2988 # Test implementations inherit from their respective ABCs 2989 self._check_abc_inheritance(self) 2990 2991 def test_abc_inheritance_official(self): 2992 # Test implementations inherit from the official ABCs of the 2993 # baseline "io" module. 2994 self._check_abc_inheritance(io) 2995 2996 @unittest.skipUnless(fcntl, 'fcntl required for this test') 2997 def test_nonblock_pipe_write_bigbuf(self): 2998 self._test_nonblock_pipe_write(16*1024) 2999 3000 @unittest.skipUnless(fcntl, 'fcntl required for this test') 3001 def test_nonblock_pipe_write_smallbuf(self): 3002 self._test_nonblock_pipe_write(1024) 3003 3004 def _set_non_blocking(self, fd): 3005 flags = fcntl.fcntl(fd, fcntl.F_GETFL) 3006 self.assertNotEqual(flags, -1) 3007 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) 3008 self.assertEqual(res, 0) 3009 3010 def _test_nonblock_pipe_write(self, bufsize): 3011 sent = [] 3012 received = [] 3013 r, w = os.pipe() 3014 self._set_non_blocking(r) 3015 self._set_non_blocking(w) 3016 3017 # To exercise all code paths in the C implementation we need 3018 # to play with buffer sizes. For instance, if we choose a 3019 # buffer size less than or equal to _PIPE_BUF (4096 on Linux) 3020 # then we will never get a partial write of the buffer. 3021 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize) 3022 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize) 3023 3024 with rf, wf: 3025 for N in 9999, 73, 7574: 3026 try: 3027 i = 0 3028 while True: 3029 msg = bytes([i % 26 + 97]) * N 3030 sent.append(msg) 3031 wf.write(msg) 3032 i += 1 3033 3034 except self.BlockingIOError as e: 3035 self.assertEqual(e.args[0], errno.EAGAIN) 3036 sent[-1] = sent[-1][:e.characters_written] 3037 received.append(rf.read()) 3038 msg = b'BLOCKED' 3039 wf.write(msg) 3040 sent.append(msg) 3041 3042 while True: 3043 try: 3044 wf.flush() 3045 break 3046 except self.BlockingIOError as e: 3047 self.assertEqual(e.args[0], errno.EAGAIN) 3048 self.assertEqual(e.characters_written, 0) 3049 received.append(rf.read()) 3050 3051 received += iter(rf.read, None) 3052 3053 sent, received = b''.join(sent), b''.join(received) 3054 self.assertEqual(sent, received) 3055 self.assertTrue(wf.closed) 3056 self.assertTrue(rf.closed) 3057 3058class CMiscIOTest(MiscIOTest): 3059 io = io 3060 shutdown_error = "RuntimeError: could not find io module state" 3061 3062class PyMiscIOTest(MiscIOTest): 3063 io = pyio 3064 shutdown_error = "LookupError: unknown encoding: ascii" 3065 3066 3067@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.') 3068class SignalsTest(unittest.TestCase): 3069 3070 def setUp(self): 3071 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt) 3072 3073 def tearDown(self): 3074 signal.signal(signal.SIGALRM, self.oldalrm) 3075 3076 def alarm_interrupt(self, sig, frame): 3077 1 // 0 3078 3079 @unittest.skipUnless(threading, 'Threading required for this test.') 3080 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'), 3081 'issue #12429: skip test on FreeBSD <= 7') 3082 def check_interrupted_write(self, item, bytes, **fdopen_kwargs): 3083 """Check that a partial write, when it gets interrupted, properly 3084 invokes the signal handler, and bubbles up the exception raised 3085 in the latter.""" 3086 read_results = [] 3087 def _read(): 3088 s = os.read(r, 1) 3089 read_results.append(s) 3090 t = threading.Thread(target=_read) 3091 t.daemon = True 3092 r, w = os.pipe() 3093 try: 3094 wio = self.io.open(w, **fdopen_kwargs) 3095 t.start() 3096 signal.alarm(1) 3097 # Fill the pipe enough that the write will be blocking. 3098 # It will be interrupted by the timer armed above. Since the 3099 # other thread has read one byte, the low-level write will 3100 # return with a successful (partial) result rather than an EINTR. 3101 # The buffered IO layer must check for pending signal 3102 # handlers, which in this case will invoke alarm_interrupt(). 3103 try: 3104 with self.assertRaises(ZeroDivisionError): 3105 wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1)) 3106 finally: 3107 t.join() 3108 # We got one byte, get another one and check that it isn't a 3109 # repeat of the first one. 3110 read_results.append(os.read(r, 1)) 3111 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]]) 3112 finally: 3113 os.close(w) 3114 os.close(r) 3115 # This is deliberate. If we didn't close the file descriptor 3116 # before closing wio, wio would try to flush its internal 3117 # buffer, and block again. 3118 try: 3119 wio.close() 3120 except IOError as e: 3121 if e.errno != errno.EBADF: 3122 raise 3123 3124 def test_interrupted_write_unbuffered(self): 3125 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0) 3126 3127 def test_interrupted_write_buffered(self): 3128 self.check_interrupted_write(b"xy", b"xy", mode="wb") 3129 3130 def test_interrupted_write_text(self): 3131 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii") 3132 3133 def check_reentrant_write(self, data, **fdopen_kwargs): 3134 def on_alarm(*args): 3135 # Will be called reentrantly from the same thread 3136 wio.write(data) 3137 1//0 3138 signal.signal(signal.SIGALRM, on_alarm) 3139 r, w = os.pipe() 3140 wio = self.io.open(w, **fdopen_kwargs) 3141 try: 3142 signal.alarm(1) 3143 # Either the reentrant call to wio.write() fails with RuntimeError, 3144 # or the signal handler raises ZeroDivisionError. 3145 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm: 3146 while 1: 3147 for i in range(100): 3148 wio.write(data) 3149 wio.flush() 3150 # Make sure the buffer doesn't fill up and block further writes 3151 os.read(r, len(data) * 100) 3152 exc = cm.exception 3153 if isinstance(exc, RuntimeError): 3154 self.assertTrue(str(exc).startswith("reentrant call"), str(exc)) 3155 finally: 3156 wio.close() 3157 os.close(r) 3158 3159 def test_reentrant_write_buffered(self): 3160 self.check_reentrant_write(b"xy", mode="wb") 3161 3162 def test_reentrant_write_text(self): 3163 self.check_reentrant_write("xy", mode="w", encoding="ascii") 3164 3165 def check_interrupted_read_retry(self, decode, **fdopen_kwargs): 3166 """Check that a buffered read, when it gets interrupted (either 3167 returning a partial result or EINTR), properly invokes the signal 3168 handler and retries if the latter returned successfully.""" 3169 r, w = os.pipe() 3170 fdopen_kwargs["closefd"] = False 3171 def alarm_handler(sig, frame): 3172 os.write(w, b"bar") 3173 signal.signal(signal.SIGALRM, alarm_handler) 3174 try: 3175 rio = self.io.open(r, **fdopen_kwargs) 3176 os.write(w, b"foo") 3177 signal.alarm(1) 3178 # Expected behaviour: 3179 # - first raw read() returns partial b"foo" 3180 # - second raw read() returns EINTR 3181 # - third raw read() returns b"bar" 3182 self.assertEqual(decode(rio.read(6)), "foobar") 3183 finally: 3184 rio.close() 3185 os.close(w) 3186 os.close(r) 3187 3188 def test_interrupterd_read_retry_buffered(self): 3189 self.check_interrupted_read_retry(lambda x: x.decode('latin1'), 3190 mode="rb") 3191 3192 def test_interrupterd_read_retry_text(self): 3193 self.check_interrupted_read_retry(lambda x: x, 3194 mode="r") 3195 3196 @unittest.skipUnless(threading, 'Threading required for this test.') 3197 def check_interrupted_write_retry(self, item, **fdopen_kwargs): 3198 """Check that a buffered write, when it gets interrupted (either 3199 returning a partial result or EINTR), properly invokes the signal 3200 handler and retries if the latter returned successfully.""" 3201 select = support.import_module("select") 3202 # A quantity that exceeds the buffer size of an anonymous pipe's 3203 # write end. 3204 N = support.PIPE_MAX_SIZE 3205 r, w = os.pipe() 3206 fdopen_kwargs["closefd"] = False 3207 # We need a separate thread to read from the pipe and allow the 3208 # write() to finish. This thread is started after the SIGALRM is 3209 # received (forcing a first EINTR in write()). 3210 read_results = [] 3211 write_finished = False 3212 error = [None] 3213 def _read(): 3214 try: 3215 while not write_finished: 3216 while r in select.select([r], [], [], 1.0)[0]: 3217 s = os.read(r, 1024) 3218 read_results.append(s) 3219 except BaseException as exc: 3220 error[0] = exc 3221 t = threading.Thread(target=_read) 3222 t.daemon = True 3223 def alarm1(sig, frame): 3224 signal.signal(signal.SIGALRM, alarm2) 3225 signal.alarm(1) 3226 def alarm2(sig, frame): 3227 t.start() 3228 signal.signal(signal.SIGALRM, alarm1) 3229 try: 3230 wio = self.io.open(w, **fdopen_kwargs) 3231 signal.alarm(1) 3232 # Expected behaviour: 3233 # - first raw write() is partial (because of the limited pipe buffer 3234 # and the first alarm) 3235 # - second raw write() returns EINTR (because of the second alarm) 3236 # - subsequent write()s are successful (either partial or complete) 3237 self.assertEqual(N, wio.write(item * N)) 3238 wio.flush() 3239 write_finished = True 3240 t.join() 3241 3242 self.assertIsNone(error[0]) 3243 self.assertEqual(N, sum(len(x) for x in read_results)) 3244 finally: 3245 write_finished = True 3246 os.close(w) 3247 os.close(r) 3248 # This is deliberate. If we didn't close the file descriptor 3249 # before closing wio, wio would try to flush its internal 3250 # buffer, and could block (in case of failure). 3251 try: 3252 wio.close() 3253 except IOError as e: 3254 if e.errno != errno.EBADF: 3255 raise 3256 3257 def test_interrupterd_write_retry_buffered(self): 3258 self.check_interrupted_write_retry(b"x", mode="wb") 3259 3260 def test_interrupterd_write_retry_text(self): 3261 self.check_interrupted_write_retry("x", mode="w", encoding="latin1") 3262 3263 3264class CSignalsTest(SignalsTest): 3265 io = io 3266 3267class PySignalsTest(SignalsTest): 3268 io = pyio 3269 3270 # Handling reentrancy issues would slow down _pyio even more, so the 3271 # tests are disabled. 3272 test_reentrant_write_buffered = None 3273 test_reentrant_write_text = None 3274 3275 3276def test_main(): 3277 tests = (CIOTest, PyIOTest, 3278 CBufferedReaderTest, PyBufferedReaderTest, 3279 CBufferedWriterTest, PyBufferedWriterTest, 3280 CBufferedRWPairTest, PyBufferedRWPairTest, 3281 CBufferedRandomTest, PyBufferedRandomTest, 3282 StatefulIncrementalDecoderTest, 3283 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest, 3284 CTextIOWrapperTest, PyTextIOWrapperTest, 3285 CMiscIOTest, PyMiscIOTest, 3286 CSignalsTest, PySignalsTest, 3287 ) 3288 3289 # Put the namespaces of the IO module we are testing and some useful mock 3290 # classes in the __dict__ of each test. 3291 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO, 3292 MockNonBlockWriterIO, MockRawIOWithoutRead) 3293 all_members = io.__all__ + ["IncrementalNewlineDecoder"] 3294 c_io_ns = dict((name, getattr(io, name)) for name in all_members) 3295 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members) 3296 globs = globals() 3297 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks) 3298 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks) 3299 # Avoid turning open into a bound method. 3300 py_io_ns["open"] = pyio.OpenWrapper 3301 for test in tests: 3302 if test.__name__.startswith("C"): 3303 for name, obj in c_io_ns.items(): 3304 setattr(test, name, obj) 3305 elif test.__name__.startswith("Py"): 3306 for name, obj in py_io_ns.items(): 3307 setattr(test, name, obj) 3308 3309 support.run_unittest(*tests) 3310 3311if __name__ == "__main__": 3312 test_main() 3313