1"""Unit tests for the io module.""" 2 3# Tests of io are scattered over the test suite: 4# * test_bufio - tests file buffering 5# * test_memoryio - tests BytesIO and StringIO 6# * test_fileio - tests FileIO 7# * test_file - tests the file interface 8# * test_io - tests everything else in the io module 9# * test_univnewlines - tests universal newline support 10# * test_largefile - tests operations on a file greater than 2**32 bytes 11# (only enabled with -ulargefile) 12 13################################################################################ 14# ATTENTION TEST WRITERS!!! 15################################################################################ 16# When writing tests for io, it's important to test both the C and Python 17# implementations. This is usually done by writing a base test that refers to 18# the type it is testing as a attribute. Then it provides custom subclasses to 19# test both implementations. This file has lots of examples. 20################################################################################ 21 22from __future__ import print_function 23from __future__ import unicode_literals 24 25import os 26import sys 27import time 28import array 29import random 30import unittest 31import weakref 32import abc 33import signal 34import errno 35from itertools import cycle, count 36from collections import deque 37from UserList import UserList 38from test import test_support as support 39import contextlib 40 41import codecs 42import io # C implementation of io 43import _pyio as pyio # Python implementation of io 44try: 45 import threading 46except ImportError: 47 threading = None 48try: 49 import fcntl 50except ImportError: 51 fcntl = None 52 53__metaclass__ = type 54bytes = support.py3k_bytes 55 56def _default_chunk_size(): 57 """Get the default TextIOWrapper chunk size""" 58 with io.open(__file__, "r", encoding="latin1") as f: 59 return f._CHUNK_SIZE 60 61 62class MockRawIOWithoutRead: 63 """A RawIO implementation without read(), so as to exercise the default 64 RawIO.read() which calls readinto().""" 65 66 def __init__(self, read_stack=()): 67 self._read_stack = list(read_stack) 68 self._write_stack = [] 69 self._reads = 0 70 self._extraneous_reads = 0 71 72 def write(self, b): 73 self._write_stack.append(bytes(b)) 74 return len(b) 75 76 def writable(self): 77 return True 78 79 def fileno(self): 80 return 42 81 82 def readable(self): 83 return True 84 85 def seekable(self): 86 return True 87 88 def seek(self, pos, whence): 89 return 0 # wrong but we gotta return something 90 91 def tell(self): 92 return 0 # same comment as above 93 94 def readinto(self, buf): 95 self._reads += 1 96 max_len = len(buf) 97 try: 98 data = self._read_stack[0] 99 except IndexError: 100 self._extraneous_reads += 1 101 return 0 102 if data is None: 103 del self._read_stack[0] 104 return None 105 n = len(data) 106 if len(data) <= max_len: 107 del self._read_stack[0] 108 buf[:n] = data 109 return n 110 else: 111 buf[:] = data[:max_len] 112 self._read_stack[0] = data[max_len:] 113 return max_len 114 115 def truncate(self, pos=None): 116 return pos 117 118class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase): 119 pass 120 121class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase): 122 pass 123 124 125class MockRawIO(MockRawIOWithoutRead): 126 127 def read(self, n=None): 128 self._reads += 1 129 try: 130 return self._read_stack.pop(0) 131 except: 132 self._extraneous_reads += 1 133 return b"" 134 135class CMockRawIO(MockRawIO, io.RawIOBase): 136 pass 137 138class PyMockRawIO(MockRawIO, pyio.RawIOBase): 139 pass 140 141 142class MisbehavedRawIO(MockRawIO): 143 def write(self, b): 144 return MockRawIO.write(self, b) * 2 145 146 def read(self, n=None): 147 return MockRawIO.read(self, n) * 2 148 149 def seek(self, pos, whence): 150 return -123 151 152 def tell(self): 153 return -456 154 155 def readinto(self, buf): 156 MockRawIO.readinto(self, buf) 157 return len(buf) * 5 158 159class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase): 160 pass 161 162class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase): 163 pass 164 165 166class CloseFailureIO(MockRawIO): 167 closed = 0 168 169 def close(self): 170 if not self.closed: 171 self.closed = 1 172 raise IOError 173 174class CCloseFailureIO(CloseFailureIO, io.RawIOBase): 175 pass 176 177class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase): 178 pass 179 180 181class MockFileIO: 182 183 def __init__(self, data): 184 self.read_history = [] 185 super(MockFileIO, self).__init__(data) 186 187 def read(self, n=None): 188 res = super(MockFileIO, self).read(n) 189 self.read_history.append(None if res is None else len(res)) 190 return res 191 192 def readinto(self, b): 193 res = super(MockFileIO, self).readinto(b) 194 self.read_history.append(res) 195 return res 196 197class CMockFileIO(MockFileIO, io.BytesIO): 198 pass 199 200class PyMockFileIO(MockFileIO, pyio.BytesIO): 201 pass 202 203 204class MockNonBlockWriterIO: 205 206 def __init__(self): 207 self._write_stack = [] 208 self._blocker_char = None 209 210 def pop_written(self): 211 s = b"".join(self._write_stack) 212 self._write_stack[:] = [] 213 return s 214 215 def block_on(self, char): 216 """Block when a given char is encountered.""" 217 self._blocker_char = char 218 219 def readable(self): 220 return True 221 222 def seekable(self): 223 return True 224 225 def writable(self): 226 return True 227 228 def write(self, b): 229 b = bytes(b) 230 n = -1 231 if self._blocker_char: 232 try: 233 n = b.index(self._blocker_char) 234 except ValueError: 235 pass 236 else: 237 if n > 0: 238 # write data up to the first blocker 239 self._write_stack.append(b[:n]) 240 return n 241 else: 242 # cancel blocker and indicate would block 243 self._blocker_char = None 244 return None 245 self._write_stack.append(b) 246 return len(b) 247 248class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase): 249 BlockingIOError = io.BlockingIOError 250 251class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase): 252 BlockingIOError = pyio.BlockingIOError 253 254 255class IOTest(unittest.TestCase): 256 257 def setUp(self): 258 support.unlink(support.TESTFN) 259 260 def tearDown(self): 261 support.unlink(support.TESTFN) 262 263 def write_ops(self, f): 264 self.assertEqual(f.write(b"blah."), 5) 265 f.truncate(0) 266 self.assertEqual(f.tell(), 5) 267 f.seek(0) 268 269 self.assertEqual(f.write(b"blah."), 5) 270 self.assertEqual(f.seek(0), 0) 271 self.assertEqual(f.write(b"Hello."), 6) 272 self.assertEqual(f.tell(), 6) 273 self.assertEqual(f.seek(-1, 1), 5) 274 self.assertEqual(f.tell(), 5) 275 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9) 276 self.assertEqual(f.seek(0), 0) 277 self.assertEqual(f.write(b"h"), 1) 278 self.assertEqual(f.seek(-1, 2), 13) 279 self.assertEqual(f.tell(), 13) 280 281 self.assertEqual(f.truncate(12), 12) 282 self.assertEqual(f.tell(), 13) 283 self.assertRaises(TypeError, f.seek, 0.0) 284 285 def read_ops(self, f, buffered=False): 286 data = f.read(5) 287 self.assertEqual(data, b"hello") 288 data = bytearray(data) 289 self.assertEqual(f.readinto(data), 5) 290 self.assertEqual(data, b" worl") 291 self.assertEqual(f.readinto(data), 2) 292 self.assertEqual(len(data), 5) 293 self.assertEqual(data[:2], b"d\n") 294 self.assertEqual(f.seek(0), 0) 295 self.assertEqual(f.read(20), b"hello world\n") 296 self.assertEqual(f.read(1), b"") 297 self.assertEqual(f.readinto(bytearray(b"x")), 0) 298 self.assertEqual(f.seek(-6, 2), 6) 299 self.assertEqual(f.read(5), b"world") 300 self.assertEqual(f.read(0), b"") 301 self.assertEqual(f.readinto(bytearray()), 0) 302 self.assertEqual(f.seek(-6, 1), 5) 303 self.assertEqual(f.read(5), b" worl") 304 self.assertEqual(f.tell(), 10) 305 self.assertRaises(TypeError, f.seek, 0.0) 306 if buffered: 307 f.seek(0) 308 self.assertEqual(f.read(), b"hello world\n") 309 f.seek(6) 310 self.assertEqual(f.read(), b"world\n") 311 self.assertEqual(f.read(), b"") 312 313 LARGE = 2**31 314 315 def large_file_ops(self, f): 316 assert f.readable() 317 assert f.writable() 318 self.assertEqual(f.seek(self.LARGE), self.LARGE) 319 self.assertEqual(f.tell(), self.LARGE) 320 self.assertEqual(f.write(b"xxx"), 3) 321 self.assertEqual(f.tell(), self.LARGE + 3) 322 self.assertEqual(f.seek(-1, 1), self.LARGE + 2) 323 self.assertEqual(f.truncate(), self.LARGE + 2) 324 self.assertEqual(f.tell(), self.LARGE + 2) 325 self.assertEqual(f.seek(0, 2), self.LARGE + 2) 326 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1) 327 self.assertEqual(f.tell(), self.LARGE + 2) 328 self.assertEqual(f.seek(0, 2), self.LARGE + 1) 329 self.assertEqual(f.seek(-1, 2), self.LARGE) 330 self.assertEqual(f.read(2), b"x") 331 332 def test_invalid_operations(self): 333 # Try writing on a file opened in read mode and vice-versa. 334 for mode in ("w", "wb"): 335 with self.open(support.TESTFN, mode) as fp: 336 self.assertRaises(IOError, fp.read) 337 self.assertRaises(IOError, fp.readline) 338 with self.open(support.TESTFN, "rb") as fp: 339 self.assertRaises(IOError, fp.write, b"blah") 340 self.assertRaises(IOError, fp.writelines, [b"blah\n"]) 341 with self.open(support.TESTFN, "r") as fp: 342 self.assertRaises(IOError, fp.write, "blah") 343 self.assertRaises(IOError, fp.writelines, ["blah\n"]) 344 345 def test_raw_file_io(self): 346 with self.open(support.TESTFN, "wb", buffering=0) as f: 347 self.assertEqual(f.readable(), False) 348 self.assertEqual(f.writable(), True) 349 self.assertEqual(f.seekable(), True) 350 self.write_ops(f) 351 with self.open(support.TESTFN, "rb", buffering=0) as f: 352 self.assertEqual(f.readable(), True) 353 self.assertEqual(f.writable(), False) 354 self.assertEqual(f.seekable(), True) 355 self.read_ops(f) 356 357 def test_buffered_file_io(self): 358 with self.open(support.TESTFN, "wb") as f: 359 self.assertEqual(f.readable(), False) 360 self.assertEqual(f.writable(), True) 361 self.assertEqual(f.seekable(), True) 362 self.write_ops(f) 363 with self.open(support.TESTFN, "rb") as f: 364 self.assertEqual(f.readable(), True) 365 self.assertEqual(f.writable(), False) 366 self.assertEqual(f.seekable(), True) 367 self.read_ops(f, True) 368 369 def test_readline(self): 370 with self.open(support.TESTFN, "wb") as f: 371 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line") 372 with self.open(support.TESTFN, "rb") as f: 373 self.assertEqual(f.readline(), b"abc\n") 374 self.assertEqual(f.readline(10), b"def\n") 375 self.assertEqual(f.readline(2), b"xy") 376 self.assertEqual(f.readline(4), b"zzy\n") 377 self.assertEqual(f.readline(), b"foo\x00bar\n") 378 self.assertEqual(f.readline(None), b"another line") 379 self.assertRaises(TypeError, f.readline, 5.3) 380 with self.open(support.TESTFN, "r") as f: 381 self.assertRaises(TypeError, f.readline, 5.3) 382 383 def test_raw_bytes_io(self): 384 f = self.BytesIO() 385 self.write_ops(f) 386 data = f.getvalue() 387 self.assertEqual(data, b"hello world\n") 388 f = self.BytesIO(data) 389 self.read_ops(f, True) 390 391 def test_large_file_ops(self): 392 # On Windows and Mac OSX this test comsumes large resources; It takes 393 # a long time to build the >2GB file and takes >2GB of disk space 394 # therefore the resource must be enabled to run this test. 395 if sys.platform[:3] == 'win' or sys.platform == 'darwin': 396 if not support.is_resource_enabled("largefile"): 397 print("\nTesting large file ops skipped on %s." % sys.platform, 398 file=sys.stderr) 399 print("It requires %d bytes and a long time." % self.LARGE, 400 file=sys.stderr) 401 print("Use 'regrtest.py -u largefile test_io' to run it.", 402 file=sys.stderr) 403 return 404 with self.open(support.TESTFN, "w+b", 0) as f: 405 self.large_file_ops(f) 406 with self.open(support.TESTFN, "w+b") as f: 407 self.large_file_ops(f) 408 409 def test_with_open(self): 410 for bufsize in (0, 1, 100): 411 f = None 412 with self.open(support.TESTFN, "wb", bufsize) as f: 413 f.write(b"xxx") 414 self.assertEqual(f.closed, True) 415 f = None 416 try: 417 with self.open(support.TESTFN, "wb", bufsize) as f: 418 1 // 0 419 except ZeroDivisionError: 420 self.assertEqual(f.closed, True) 421 else: 422 self.fail("1 // 0 didn't raise an exception") 423 424 # issue 5008 425 def test_append_mode_tell(self): 426 with self.open(support.TESTFN, "wb") as f: 427 f.write(b"xxx") 428 with self.open(support.TESTFN, "ab", buffering=0) as f: 429 self.assertEqual(f.tell(), 3) 430 with self.open(support.TESTFN, "ab") as f: 431 self.assertEqual(f.tell(), 3) 432 with self.open(support.TESTFN, "a") as f: 433 self.assertTrue(f.tell() > 0) 434 435 def test_destructor(self): 436 record = [] 437 class MyFileIO(self.FileIO): 438 def __del__(self): 439 record.append(1) 440 try: 441 f = super(MyFileIO, self).__del__ 442 except AttributeError: 443 pass 444 else: 445 f() 446 def close(self): 447 record.append(2) 448 super(MyFileIO, self).close() 449 def flush(self): 450 record.append(3) 451 super(MyFileIO, self).flush() 452 f = MyFileIO(support.TESTFN, "wb") 453 f.write(b"xxx") 454 del f 455 support.gc_collect() 456 self.assertEqual(record, [1, 2, 3]) 457 with self.open(support.TESTFN, "rb") as f: 458 self.assertEqual(f.read(), b"xxx") 459 460 def _check_base_destructor(self, base): 461 record = [] 462 class MyIO(base): 463 def __init__(self): 464 # This exercises the availability of attributes on object 465 # destruction. 466 # (in the C version, close() is called by the tp_dealloc 467 # function, not by __del__) 468 self.on_del = 1 469 self.on_close = 2 470 self.on_flush = 3 471 def __del__(self): 472 record.append(self.on_del) 473 try: 474 f = super(MyIO, self).__del__ 475 except AttributeError: 476 pass 477 else: 478 f() 479 def close(self): 480 record.append(self.on_close) 481 super(MyIO, self).close() 482 def flush(self): 483 record.append(self.on_flush) 484 super(MyIO, self).flush() 485 f = MyIO() 486 del f 487 support.gc_collect() 488 self.assertEqual(record, [1, 2, 3]) 489 490 def test_IOBase_destructor(self): 491 self._check_base_destructor(self.IOBase) 492 493 def test_RawIOBase_destructor(self): 494 self._check_base_destructor(self.RawIOBase) 495 496 def test_BufferedIOBase_destructor(self): 497 self._check_base_destructor(self.BufferedIOBase) 498 499 def test_TextIOBase_destructor(self): 500 self._check_base_destructor(self.TextIOBase) 501 502 def test_close_flushes(self): 503 with self.open(support.TESTFN, "wb") as f: 504 f.write(b"xxx") 505 with self.open(support.TESTFN, "rb") as f: 506 self.assertEqual(f.read(), b"xxx") 507 508 def test_array_writes(self): 509 a = array.array(b'i', range(10)) 510 n = len(a.tostring()) 511 with self.open(support.TESTFN, "wb", 0) as f: 512 self.assertEqual(f.write(a), n) 513 with self.open(support.TESTFN, "wb") as f: 514 self.assertEqual(f.write(a), n) 515 516 def test_closefd(self): 517 self.assertRaises(ValueError, self.open, support.TESTFN, 'w', 518 closefd=False) 519 520 def test_read_closed(self): 521 with self.open(support.TESTFN, "w") as f: 522 f.write("egg\n") 523 with self.open(support.TESTFN, "r") as f: 524 file = self.open(f.fileno(), "r", closefd=False) 525 self.assertEqual(file.read(), "egg\n") 526 file.seek(0) 527 file.close() 528 self.assertRaises(ValueError, file.read) 529 530 def test_no_closefd_with_filename(self): 531 # can't use closefd in combination with a file name 532 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False) 533 534 def test_closefd_attr(self): 535 with self.open(support.TESTFN, "wb") as f: 536 f.write(b"egg\n") 537 with self.open(support.TESTFN, "r") as f: 538 self.assertEqual(f.buffer.raw.closefd, True) 539 file = self.open(f.fileno(), "r", closefd=False) 540 self.assertEqual(file.buffer.raw.closefd, False) 541 542 def test_garbage_collection(self): 543 # FileIO objects are collected, and collecting them flushes 544 # all data to disk. 545 f = self.FileIO(support.TESTFN, "wb") 546 f.write(b"abcxxx") 547 f.f = f 548 wr = weakref.ref(f) 549 del f 550 support.gc_collect() 551 self.assertTrue(wr() is None, wr) 552 with self.open(support.TESTFN, "rb") as f: 553 self.assertEqual(f.read(), b"abcxxx") 554 555 def test_unbounded_file(self): 556 # Issue #1174606: reading from an unbounded stream such as /dev/zero. 557 zero = "/dev/zero" 558 if not os.path.exists(zero): 559 self.skipTest("{0} does not exist".format(zero)) 560 if sys.maxsize > 0x7FFFFFFF: 561 self.skipTest("test can only run in a 32-bit address space") 562 if support.real_max_memuse < support._2G: 563 self.skipTest("test requires at least 2GB of memory") 564 with self.open(zero, "rb", buffering=0) as f: 565 self.assertRaises(OverflowError, f.read) 566 with self.open(zero, "rb") as f: 567 self.assertRaises(OverflowError, f.read) 568 with self.open(zero, "r") as f: 569 self.assertRaises(OverflowError, f.read) 570 571 def test_flush_error_on_close(self): 572 f = self.open(support.TESTFN, "wb", buffering=0) 573 def bad_flush(): 574 raise IOError() 575 f.flush = bad_flush 576 self.assertRaises(IOError, f.close) # exception not swallowed 577 self.assertTrue(f.closed) 578 579 def test_multi_close(self): 580 f = self.open(support.TESTFN, "wb", buffering=0) 581 f.close() 582 f.close() 583 f.close() 584 self.assertRaises(ValueError, f.flush) 585 586 def test_RawIOBase_read(self): 587 # Exercise the default RawIOBase.read() implementation (which calls 588 # readinto() internally). 589 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None)) 590 self.assertEqual(rawio.read(2), b"ab") 591 self.assertEqual(rawio.read(2), b"c") 592 self.assertEqual(rawio.read(2), b"d") 593 self.assertEqual(rawio.read(2), None) 594 self.assertEqual(rawio.read(2), b"ef") 595 self.assertEqual(rawio.read(2), b"g") 596 self.assertEqual(rawio.read(2), None) 597 self.assertEqual(rawio.read(2), b"") 598 599 def test_fileio_closefd(self): 600 # Issue #4841 601 with self.open(__file__, 'rb') as f1, \ 602 self.open(__file__, 'rb') as f2: 603 fileio = self.FileIO(f1.fileno(), closefd=False) 604 # .__init__() must not close f1 605 fileio.__init__(f2.fileno(), closefd=False) 606 f1.readline() 607 # .close() must not close f2 608 fileio.close() 609 f2.readline() 610 611 612class CIOTest(IOTest): 613 614 def test_IOBase_finalize(self): 615 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a 616 # class which inherits IOBase and an object of this class are caught 617 # in a reference cycle and close() is already in the method cache. 618 class MyIO(self.IOBase): 619 def close(self): 620 pass 621 622 # create an instance to populate the method cache 623 MyIO() 624 obj = MyIO() 625 obj.obj = obj 626 wr = weakref.ref(obj) 627 del MyIO 628 del obj 629 support.gc_collect() 630 self.assertTrue(wr() is None, wr) 631 632class PyIOTest(IOTest): 633 test_array_writes = unittest.skip( 634 "len(array.array) returns number of elements rather than bytelength" 635 )(IOTest.test_array_writes) 636 637 638class CommonBufferedTests: 639 # Tests common to BufferedReader, BufferedWriter and BufferedRandom 640 641 def test_detach(self): 642 raw = self.MockRawIO() 643 buf = self.tp(raw) 644 self.assertIs(buf.detach(), raw) 645 self.assertRaises(ValueError, buf.detach) 646 647 def test_fileno(self): 648 rawio = self.MockRawIO() 649 bufio = self.tp(rawio) 650 651 self.assertEqual(42, bufio.fileno()) 652 653 def test_no_fileno(self): 654 # XXX will we always have fileno() function? If so, kill 655 # this test. Else, write it. 656 pass 657 658 def test_invalid_args(self): 659 rawio = self.MockRawIO() 660 bufio = self.tp(rawio) 661 # Invalid whence 662 self.assertRaises(ValueError, bufio.seek, 0, -1) 663 self.assertRaises(ValueError, bufio.seek, 0, 3) 664 665 def test_override_destructor(self): 666 tp = self.tp 667 record = [] 668 class MyBufferedIO(tp): 669 def __del__(self): 670 record.append(1) 671 try: 672 f = super(MyBufferedIO, self).__del__ 673 except AttributeError: 674 pass 675 else: 676 f() 677 def close(self): 678 record.append(2) 679 super(MyBufferedIO, self).close() 680 def flush(self): 681 record.append(3) 682 super(MyBufferedIO, self).flush() 683 rawio = self.MockRawIO() 684 bufio = MyBufferedIO(rawio) 685 writable = bufio.writable() 686 del bufio 687 support.gc_collect() 688 if writable: 689 self.assertEqual(record, [1, 2, 3]) 690 else: 691 self.assertEqual(record, [1, 2]) 692 693 def test_context_manager(self): 694 # Test usability as a context manager 695 rawio = self.MockRawIO() 696 bufio = self.tp(rawio) 697 def _with(): 698 with bufio: 699 pass 700 _with() 701 # bufio should now be closed, and using it a second time should raise 702 # a ValueError. 703 self.assertRaises(ValueError, _with) 704 705 def test_error_through_destructor(self): 706 # Test that the exception state is not modified by a destructor, 707 # even if close() fails. 708 rawio = self.CloseFailureIO() 709 def f(): 710 self.tp(rawio).xyzzy 711 with support.captured_output("stderr") as s: 712 self.assertRaises(AttributeError, f) 713 s = s.getvalue().strip() 714 if s: 715 # The destructor *may* have printed an unraisable error, check it 716 self.assertEqual(len(s.splitlines()), 1) 717 self.assertTrue(s.startswith("Exception IOError: "), s) 718 self.assertTrue(s.endswith(" ignored"), s) 719 720 def test_repr(self): 721 raw = self.MockRawIO() 722 b = self.tp(raw) 723 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__) 724 self.assertEqual(repr(b), "<%s>" % clsname) 725 raw.name = "dummy" 726 self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname) 727 raw.name = b"dummy" 728 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname) 729 730 def test_flush_error_on_close(self): 731 raw = self.MockRawIO() 732 def bad_flush(): 733 raise IOError() 734 raw.flush = bad_flush 735 b = self.tp(raw) 736 self.assertRaises(IOError, b.close) # exception not swallowed 737 self.assertTrue(b.closed) 738 739 def test_close_error_on_close(self): 740 raw = self.MockRawIO() 741 def bad_flush(): 742 raise IOError('flush') 743 def bad_close(): 744 raise IOError('close') 745 raw.close = bad_close 746 b = self.tp(raw) 747 b.flush = bad_flush 748 with self.assertRaises(IOError) as err: # exception not swallowed 749 b.close() 750 self.assertEqual(err.exception.args, ('close',)) 751 self.assertFalse(b.closed) 752 753 def test_multi_close(self): 754 raw = self.MockRawIO() 755 b = self.tp(raw) 756 b.close() 757 b.close() 758 b.close() 759 self.assertRaises(ValueError, b.flush) 760 761 def test_readonly_attributes(self): 762 raw = self.MockRawIO() 763 buf = self.tp(raw) 764 x = self.MockRawIO() 765 with self.assertRaises((AttributeError, TypeError)): 766 buf.raw = x 767 768 769class SizeofTest: 770 771 @support.cpython_only 772 def test_sizeof(self): 773 bufsize1 = 4096 774 bufsize2 = 8192 775 rawio = self.MockRawIO() 776 bufio = self.tp(rawio, buffer_size=bufsize1) 777 size = sys.getsizeof(bufio) - bufsize1 778 rawio = self.MockRawIO() 779 bufio = self.tp(rawio, buffer_size=bufsize2) 780 self.assertEqual(sys.getsizeof(bufio), size + bufsize2) 781 782 783class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): 784 read_mode = "rb" 785 786 def test_constructor(self): 787 rawio = self.MockRawIO([b"abc"]) 788 bufio = self.tp(rawio) 789 bufio.__init__(rawio) 790 bufio.__init__(rawio, buffer_size=1024) 791 bufio.__init__(rawio, buffer_size=16) 792 self.assertEqual(b"abc", bufio.read()) 793 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 794 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 795 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 796 rawio = self.MockRawIO([b"abc"]) 797 bufio.__init__(rawio) 798 self.assertEqual(b"abc", bufio.read()) 799 800 def test_read(self): 801 for arg in (None, 7): 802 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 803 bufio = self.tp(rawio) 804 self.assertEqual(b"abcdefg", bufio.read(arg)) 805 # Invalid args 806 self.assertRaises(ValueError, bufio.read, -2) 807 808 def test_read1(self): 809 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 810 bufio = self.tp(rawio) 811 self.assertEqual(b"a", bufio.read(1)) 812 self.assertEqual(b"b", bufio.read1(1)) 813 self.assertEqual(rawio._reads, 1) 814 self.assertEqual(b"c", bufio.read1(100)) 815 self.assertEqual(rawio._reads, 1) 816 self.assertEqual(b"d", bufio.read1(100)) 817 self.assertEqual(rawio._reads, 2) 818 self.assertEqual(b"efg", bufio.read1(100)) 819 self.assertEqual(rawio._reads, 3) 820 self.assertEqual(b"", bufio.read1(100)) 821 self.assertEqual(rawio._reads, 4) 822 # Invalid args 823 self.assertRaises(ValueError, bufio.read1, -1) 824 825 def test_readinto(self): 826 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 827 bufio = self.tp(rawio) 828 b = bytearray(2) 829 self.assertEqual(bufio.readinto(b), 2) 830 self.assertEqual(b, b"ab") 831 self.assertEqual(bufio.readinto(b), 2) 832 self.assertEqual(b, b"cd") 833 self.assertEqual(bufio.readinto(b), 2) 834 self.assertEqual(b, b"ef") 835 self.assertEqual(bufio.readinto(b), 1) 836 self.assertEqual(b, b"gf") 837 self.assertEqual(bufio.readinto(b), 0) 838 self.assertEqual(b, b"gf") 839 840 def test_readlines(self): 841 def bufio(): 842 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef")) 843 return self.tp(rawio) 844 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"]) 845 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"]) 846 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"]) 847 848 def test_buffering(self): 849 data = b"abcdefghi" 850 dlen = len(data) 851 852 tests = [ 853 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ], 854 [ 100, [ 3, 3, 3], [ dlen ] ], 855 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ], 856 ] 857 858 for bufsize, buf_read_sizes, raw_read_sizes in tests: 859 rawio = self.MockFileIO(data) 860 bufio = self.tp(rawio, buffer_size=bufsize) 861 pos = 0 862 for nbytes in buf_read_sizes: 863 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes]) 864 pos += nbytes 865 # this is mildly implementation-dependent 866 self.assertEqual(rawio.read_history, raw_read_sizes) 867 868 def test_read_non_blocking(self): 869 # Inject some None's in there to simulate EWOULDBLOCK 870 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None)) 871 bufio = self.tp(rawio) 872 self.assertEqual(b"abcd", bufio.read(6)) 873 self.assertEqual(b"e", bufio.read(1)) 874 self.assertEqual(b"fg", bufio.read()) 875 self.assertEqual(b"", bufio.peek(1)) 876 self.assertIsNone(bufio.read()) 877 self.assertEqual(b"", bufio.read()) 878 879 rawio = self.MockRawIO((b"a", None, None)) 880 self.assertEqual(b"a", rawio.readall()) 881 self.assertIsNone(rawio.readall()) 882 883 def test_read_past_eof(self): 884 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 885 bufio = self.tp(rawio) 886 887 self.assertEqual(b"abcdefg", bufio.read(9000)) 888 889 def test_read_all(self): 890 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 891 bufio = self.tp(rawio) 892 893 self.assertEqual(b"abcdefg", bufio.read()) 894 895 @unittest.skipUnless(threading, 'Threading required for this test.') 896 @support.requires_resource('cpu') 897 def test_threads(self): 898 try: 899 # Write out many bytes with exactly the same number of 0's, 900 # 1's... 255's. This will help us check that concurrent reading 901 # doesn't duplicate or forget contents. 902 N = 1000 903 l = list(range(256)) * N 904 random.shuffle(l) 905 s = bytes(bytearray(l)) 906 with self.open(support.TESTFN, "wb") as f: 907 f.write(s) 908 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw: 909 bufio = self.tp(raw, 8) 910 errors = [] 911 results = [] 912 def f(): 913 try: 914 # Intra-buffer read then buffer-flushing read 915 for n in cycle([1, 19]): 916 s = bufio.read(n) 917 if not s: 918 break 919 # list.append() is atomic 920 results.append(s) 921 except Exception as e: 922 errors.append(e) 923 raise 924 threads = [threading.Thread(target=f) for x in range(20)] 925 for t in threads: 926 t.start() 927 time.sleep(0.02) # yield 928 for t in threads: 929 t.join() 930 self.assertFalse(errors, 931 "the following exceptions were caught: %r" % errors) 932 s = b''.join(results) 933 for i in range(256): 934 c = bytes(bytearray([i])) 935 self.assertEqual(s.count(c), N) 936 finally: 937 support.unlink(support.TESTFN) 938 939 def test_misbehaved_io(self): 940 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 941 bufio = self.tp(rawio) 942 self.assertRaises(IOError, bufio.seek, 0) 943 self.assertRaises(IOError, bufio.tell) 944 945 def test_no_extraneous_read(self): 946 # Issue #9550; when the raw IO object has satisfied the read request, 947 # we should not issue any additional reads, otherwise it may block 948 # (e.g. socket). 949 bufsize = 16 950 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2): 951 rawio = self.MockRawIO([b"x" * n]) 952 bufio = self.tp(rawio, bufsize) 953 self.assertEqual(bufio.read(n), b"x" * n) 954 # Simple case: one raw read is enough to satisfy the request. 955 self.assertEqual(rawio._extraneous_reads, 0, 956 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 957 # A more complex case where two raw reads are needed to satisfy 958 # the request. 959 rawio = self.MockRawIO([b"x" * (n - 1), b"x"]) 960 bufio = self.tp(rawio, bufsize) 961 self.assertEqual(bufio.read(n), b"x" * n) 962 self.assertEqual(rawio._extraneous_reads, 0, 963 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 964 965 966class CBufferedReaderTest(BufferedReaderTest, SizeofTest): 967 tp = io.BufferedReader 968 969 def test_constructor(self): 970 BufferedReaderTest.test_constructor(self) 971 # The allocation can succeed on 32-bit builds, e.g. with more 972 # than 2GB RAM and a 64-bit kernel. 973 if sys.maxsize > 0x7FFFFFFF: 974 rawio = self.MockRawIO() 975 bufio = self.tp(rawio) 976 self.assertRaises((OverflowError, MemoryError, ValueError), 977 bufio.__init__, rawio, sys.maxsize) 978 979 def test_initialization(self): 980 rawio = self.MockRawIO([b"abc"]) 981 bufio = self.tp(rawio) 982 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 983 self.assertRaises(ValueError, bufio.read) 984 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 985 self.assertRaises(ValueError, bufio.read) 986 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 987 self.assertRaises(ValueError, bufio.read) 988 989 def test_misbehaved_io_read(self): 990 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 991 bufio = self.tp(rawio) 992 # _pyio.BufferedReader seems to implement reading different, so that 993 # checking this is not so easy. 994 self.assertRaises(IOError, bufio.read, 10) 995 996 def test_garbage_collection(self): 997 # C BufferedReader objects are collected. 998 # The Python version has __del__, so it ends into gc.garbage instead 999 rawio = self.FileIO(support.TESTFN, "w+b") 1000 f = self.tp(rawio) 1001 f.f = f 1002 wr = weakref.ref(f) 1003 del f 1004 support.gc_collect() 1005 self.assertTrue(wr() is None, wr) 1006 1007 def test_args_error(self): 1008 # Issue #17275 1009 with self.assertRaisesRegexp(TypeError, "BufferedReader"): 1010 self.tp(io.BytesIO(), 1024, 1024, 1024) 1011 1012 1013class PyBufferedReaderTest(BufferedReaderTest): 1014 tp = pyio.BufferedReader 1015 1016 1017class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): 1018 write_mode = "wb" 1019 1020 def test_constructor(self): 1021 rawio = self.MockRawIO() 1022 bufio = self.tp(rawio) 1023 bufio.__init__(rawio) 1024 bufio.__init__(rawio, buffer_size=1024) 1025 bufio.__init__(rawio, buffer_size=16) 1026 self.assertEqual(3, bufio.write(b"abc")) 1027 bufio.flush() 1028 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1029 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1030 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1031 bufio.__init__(rawio) 1032 self.assertEqual(3, bufio.write(b"ghi")) 1033 bufio.flush() 1034 self.assertEqual(b"".join(rawio._write_stack), b"abcghi") 1035 1036 def test_detach_flush(self): 1037 raw = self.MockRawIO() 1038 buf = self.tp(raw) 1039 buf.write(b"howdy!") 1040 self.assertFalse(raw._write_stack) 1041 buf.detach() 1042 self.assertEqual(raw._write_stack, [b"howdy!"]) 1043 1044 def test_write(self): 1045 # Write to the buffered IO but don't overflow the buffer. 1046 writer = self.MockRawIO() 1047 bufio = self.tp(writer, 8) 1048 bufio.write(b"abc") 1049 self.assertFalse(writer._write_stack) 1050 1051 def test_write_overflow(self): 1052 writer = self.MockRawIO() 1053 bufio = self.tp(writer, 8) 1054 contents = b"abcdefghijklmnop" 1055 for n in range(0, len(contents), 3): 1056 bufio.write(contents[n:n+3]) 1057 flushed = b"".join(writer._write_stack) 1058 # At least (total - 8) bytes were implicitly flushed, perhaps more 1059 # depending on the implementation. 1060 self.assertTrue(flushed.startswith(contents[:-8]), flushed) 1061 1062 def check_writes(self, intermediate_func): 1063 # Lots of writes, test the flushed output is as expected. 1064 contents = bytes(range(256)) * 1000 1065 n = 0 1066 writer = self.MockRawIO() 1067 bufio = self.tp(writer, 13) 1068 # Generator of write sizes: repeat each N 15 times then proceed to N+1 1069 def gen_sizes(): 1070 for size in count(1): 1071 for i in range(15): 1072 yield size 1073 sizes = gen_sizes() 1074 while n < len(contents): 1075 size = min(next(sizes), len(contents) - n) 1076 self.assertEqual(bufio.write(contents[n:n+size]), size) 1077 intermediate_func(bufio) 1078 n += size 1079 bufio.flush() 1080 self.assertEqual(contents, 1081 b"".join(writer._write_stack)) 1082 1083 def test_writes(self): 1084 self.check_writes(lambda bufio: None) 1085 1086 def test_writes_and_flushes(self): 1087 self.check_writes(lambda bufio: bufio.flush()) 1088 1089 def test_writes_and_seeks(self): 1090 def _seekabs(bufio): 1091 pos = bufio.tell() 1092 bufio.seek(pos + 1, 0) 1093 bufio.seek(pos - 1, 0) 1094 bufio.seek(pos, 0) 1095 self.check_writes(_seekabs) 1096 def _seekrel(bufio): 1097 pos = bufio.seek(0, 1) 1098 bufio.seek(+1, 1) 1099 bufio.seek(-1, 1) 1100 bufio.seek(pos, 0) 1101 self.check_writes(_seekrel) 1102 1103 def test_writes_and_truncates(self): 1104 self.check_writes(lambda bufio: bufio.truncate(bufio.tell())) 1105 1106 def test_write_non_blocking(self): 1107 raw = self.MockNonBlockWriterIO() 1108 bufio = self.tp(raw, 8) 1109 1110 self.assertEqual(bufio.write(b"abcd"), 4) 1111 self.assertEqual(bufio.write(b"efghi"), 5) 1112 # 1 byte will be written, the rest will be buffered 1113 raw.block_on(b"k") 1114 self.assertEqual(bufio.write(b"jklmn"), 5) 1115 1116 # 8 bytes will be written, 8 will be buffered and the rest will be lost 1117 raw.block_on(b"0") 1118 try: 1119 bufio.write(b"opqrwxyz0123456789") 1120 except self.BlockingIOError as e: 1121 written = e.characters_written 1122 else: 1123 self.fail("BlockingIOError should have been raised") 1124 self.assertEqual(written, 16) 1125 self.assertEqual(raw.pop_written(), 1126 b"abcdefghijklmnopqrwxyz") 1127 1128 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9) 1129 s = raw.pop_written() 1130 # Previously buffered bytes were flushed 1131 self.assertTrue(s.startswith(b"01234567A"), s) 1132 1133 def test_write_and_rewind(self): 1134 raw = io.BytesIO() 1135 bufio = self.tp(raw, 4) 1136 self.assertEqual(bufio.write(b"abcdef"), 6) 1137 self.assertEqual(bufio.tell(), 6) 1138 bufio.seek(0, 0) 1139 self.assertEqual(bufio.write(b"XY"), 2) 1140 bufio.seek(6, 0) 1141 self.assertEqual(raw.getvalue(), b"XYcdef") 1142 self.assertEqual(bufio.write(b"123456"), 6) 1143 bufio.flush() 1144 self.assertEqual(raw.getvalue(), b"XYcdef123456") 1145 1146 def test_flush(self): 1147 writer = self.MockRawIO() 1148 bufio = self.tp(writer, 8) 1149 bufio.write(b"abc") 1150 bufio.flush() 1151 self.assertEqual(b"abc", writer._write_stack[0]) 1152 1153 def test_writelines(self): 1154 l = [b'ab', b'cd', b'ef'] 1155 writer = self.MockRawIO() 1156 bufio = self.tp(writer, 8) 1157 bufio.writelines(l) 1158 bufio.flush() 1159 self.assertEqual(b''.join(writer._write_stack), b'abcdef') 1160 1161 def test_writelines_userlist(self): 1162 l = UserList([b'ab', b'cd', b'ef']) 1163 writer = self.MockRawIO() 1164 bufio = self.tp(writer, 8) 1165 bufio.writelines(l) 1166 bufio.flush() 1167 self.assertEqual(b''.join(writer._write_stack), b'abcdef') 1168 1169 def test_writelines_error(self): 1170 writer = self.MockRawIO() 1171 bufio = self.tp(writer, 8) 1172 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3]) 1173 self.assertRaises(TypeError, bufio.writelines, None) 1174 1175 def test_destructor(self): 1176 writer = self.MockRawIO() 1177 bufio = self.tp(writer, 8) 1178 bufio.write(b"abc") 1179 del bufio 1180 support.gc_collect() 1181 self.assertEqual(b"abc", writer._write_stack[0]) 1182 1183 def test_truncate(self): 1184 # Truncate implicitly flushes the buffer. 1185 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw: 1186 bufio = self.tp(raw, 8) 1187 bufio.write(b"abcdef") 1188 self.assertEqual(bufio.truncate(3), 3) 1189 self.assertEqual(bufio.tell(), 6) 1190 with self.open(support.TESTFN, "rb", buffering=0) as f: 1191 self.assertEqual(f.read(), b"abc") 1192 1193 @unittest.skipUnless(threading, 'Threading required for this test.') 1194 @support.requires_resource('cpu') 1195 def test_threads(self): 1196 try: 1197 # Write out many bytes from many threads and test they were 1198 # all flushed. 1199 N = 1000 1200 contents = bytes(range(256)) * N 1201 sizes = cycle([1, 19]) 1202 n = 0 1203 queue = deque() 1204 while n < len(contents): 1205 size = next(sizes) 1206 queue.append(contents[n:n+size]) 1207 n += size 1208 del contents 1209 # We use a real file object because it allows us to 1210 # exercise situations where the GIL is released before 1211 # writing the buffer to the raw streams. This is in addition 1212 # to concurrency issues due to switching threads in the middle 1213 # of Python code. 1214 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw: 1215 bufio = self.tp(raw, 8) 1216 errors = [] 1217 def f(): 1218 try: 1219 while True: 1220 try: 1221 s = queue.popleft() 1222 except IndexError: 1223 return 1224 bufio.write(s) 1225 except Exception as e: 1226 errors.append(e) 1227 raise 1228 threads = [threading.Thread(target=f) for x in range(20)] 1229 for t in threads: 1230 t.start() 1231 time.sleep(0.02) # yield 1232 for t in threads: 1233 t.join() 1234 self.assertFalse(errors, 1235 "the following exceptions were caught: %r" % errors) 1236 bufio.close() 1237 with self.open(support.TESTFN, "rb") as f: 1238 s = f.read() 1239 for i in range(256): 1240 self.assertEqual(s.count(bytes([i])), N) 1241 finally: 1242 support.unlink(support.TESTFN) 1243 1244 def test_misbehaved_io(self): 1245 rawio = self.MisbehavedRawIO() 1246 bufio = self.tp(rawio, 5) 1247 self.assertRaises(IOError, bufio.seek, 0) 1248 self.assertRaises(IOError, bufio.tell) 1249 self.assertRaises(IOError, bufio.write, b"abcdef") 1250 1251 def test_max_buffer_size_deprecation(self): 1252 with support.check_warnings(("max_buffer_size is deprecated", 1253 DeprecationWarning)): 1254 self.tp(self.MockRawIO(), 8, 12) 1255 1256 def test_write_error_on_close(self): 1257 raw = self.MockRawIO() 1258 def bad_write(b): 1259 raise IOError() 1260 raw.write = bad_write 1261 b = self.tp(raw) 1262 b.write(b'spam') 1263 self.assertRaises(IOError, b.close) # exception not swallowed 1264 self.assertTrue(b.closed) 1265 1266 1267class CBufferedWriterTest(BufferedWriterTest, SizeofTest): 1268 tp = io.BufferedWriter 1269 1270 def test_constructor(self): 1271 BufferedWriterTest.test_constructor(self) 1272 # The allocation can succeed on 32-bit builds, e.g. with more 1273 # than 2GB RAM and a 64-bit kernel. 1274 if sys.maxsize > 0x7FFFFFFF: 1275 rawio = self.MockRawIO() 1276 bufio = self.tp(rawio) 1277 self.assertRaises((OverflowError, MemoryError, ValueError), 1278 bufio.__init__, rawio, sys.maxsize) 1279 1280 def test_initialization(self): 1281 rawio = self.MockRawIO() 1282 bufio = self.tp(rawio) 1283 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1284 self.assertRaises(ValueError, bufio.write, b"def") 1285 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1286 self.assertRaises(ValueError, bufio.write, b"def") 1287 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1288 self.assertRaises(ValueError, bufio.write, b"def") 1289 1290 def test_garbage_collection(self): 1291 # C BufferedWriter objects are collected, and collecting them flushes 1292 # all data to disk. 1293 # The Python version has __del__, so it ends into gc.garbage instead 1294 rawio = self.FileIO(support.TESTFN, "w+b") 1295 f = self.tp(rawio) 1296 f.write(b"123xxx") 1297 f.x = f 1298 wr = weakref.ref(f) 1299 del f 1300 support.gc_collect() 1301 self.assertTrue(wr() is None, wr) 1302 with self.open(support.TESTFN, "rb") as f: 1303 self.assertEqual(f.read(), b"123xxx") 1304 1305 def test_args_error(self): 1306 # Issue #17275 1307 with self.assertRaisesRegexp(TypeError, "BufferedWriter"): 1308 self.tp(io.BytesIO(), 1024, 1024, 1024) 1309 1310 1311class PyBufferedWriterTest(BufferedWriterTest): 1312 tp = pyio.BufferedWriter 1313 1314class BufferedRWPairTest(unittest.TestCase): 1315 1316 def test_constructor(self): 1317 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1318 self.assertFalse(pair.closed) 1319 1320 def test_detach(self): 1321 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1322 self.assertRaises(self.UnsupportedOperation, pair.detach) 1323 1324 def test_constructor_max_buffer_size_deprecation(self): 1325 with support.check_warnings(("max_buffer_size is deprecated", 1326 DeprecationWarning)): 1327 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12) 1328 1329 def test_constructor_with_not_readable(self): 1330 class NotReadable(MockRawIO): 1331 def readable(self): 1332 return False 1333 1334 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO()) 1335 1336 def test_constructor_with_not_writeable(self): 1337 class NotWriteable(MockRawIO): 1338 def writable(self): 1339 return False 1340 1341 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable()) 1342 1343 def test_read(self): 1344 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1345 1346 self.assertEqual(pair.read(3), b"abc") 1347 self.assertEqual(pair.read(1), b"d") 1348 self.assertEqual(pair.read(), b"ef") 1349 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO()) 1350 self.assertEqual(pair.read(None), b"abc") 1351 1352 def test_readlines(self): 1353 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO()) 1354 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 1355 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 1356 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"]) 1357 1358 def test_read1(self): 1359 # .read1() is delegated to the underlying reader object, so this test 1360 # can be shallow. 1361 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1362 1363 self.assertEqual(pair.read1(3), b"abc") 1364 1365 def test_readinto(self): 1366 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1367 1368 data = bytearray(5) 1369 self.assertEqual(pair.readinto(data), 5) 1370 self.assertEqual(data, b"abcde") 1371 1372 def test_write(self): 1373 w = self.MockRawIO() 1374 pair = self.tp(self.MockRawIO(), w) 1375 1376 pair.write(b"abc") 1377 pair.flush() 1378 pair.write(b"def") 1379 pair.flush() 1380 self.assertEqual(w._write_stack, [b"abc", b"def"]) 1381 1382 def test_peek(self): 1383 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1384 1385 self.assertTrue(pair.peek(3).startswith(b"abc")) 1386 self.assertEqual(pair.read(3), b"abc") 1387 1388 def test_readable(self): 1389 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1390 self.assertTrue(pair.readable()) 1391 1392 def test_writeable(self): 1393 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1394 self.assertTrue(pair.writable()) 1395 1396 def test_seekable(self): 1397 # BufferedRWPairs are never seekable, even if their readers and writers 1398 # are. 1399 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1400 self.assertFalse(pair.seekable()) 1401 1402 # .flush() is delegated to the underlying writer object and has been 1403 # tested in the test_write method. 1404 1405 def test_close_and_closed(self): 1406 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1407 self.assertFalse(pair.closed) 1408 pair.close() 1409 self.assertTrue(pair.closed) 1410 1411 def test_isatty(self): 1412 class SelectableIsAtty(MockRawIO): 1413 def __init__(self, isatty): 1414 MockRawIO.__init__(self) 1415 self._isatty = isatty 1416 1417 def isatty(self): 1418 return self._isatty 1419 1420 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False)) 1421 self.assertFalse(pair.isatty()) 1422 1423 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False)) 1424 self.assertTrue(pair.isatty()) 1425 1426 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True)) 1427 self.assertTrue(pair.isatty()) 1428 1429 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True)) 1430 self.assertTrue(pair.isatty()) 1431 1432class CBufferedRWPairTest(BufferedRWPairTest): 1433 tp = io.BufferedRWPair 1434 1435class PyBufferedRWPairTest(BufferedRWPairTest): 1436 tp = pyio.BufferedRWPair 1437 1438 1439class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest): 1440 read_mode = "rb+" 1441 write_mode = "wb+" 1442 1443 def test_constructor(self): 1444 BufferedReaderTest.test_constructor(self) 1445 BufferedWriterTest.test_constructor(self) 1446 1447 def test_read_and_write(self): 1448 raw = self.MockRawIO((b"asdf", b"ghjk")) 1449 rw = self.tp(raw, 8) 1450 1451 self.assertEqual(b"as", rw.read(2)) 1452 rw.write(b"ddd") 1453 rw.write(b"eee") 1454 self.assertFalse(raw._write_stack) # Buffer writes 1455 self.assertEqual(b"ghjk", rw.read()) 1456 self.assertEqual(b"dddeee", raw._write_stack[0]) 1457 1458 def test_seek_and_tell(self): 1459 raw = self.BytesIO(b"asdfghjkl") 1460 rw = self.tp(raw) 1461 1462 self.assertEqual(b"as", rw.read(2)) 1463 self.assertEqual(2, rw.tell()) 1464 rw.seek(0, 0) 1465 self.assertEqual(b"asdf", rw.read(4)) 1466 1467 rw.write(b"123f") 1468 rw.seek(0, 0) 1469 self.assertEqual(b"asdf123fl", rw.read()) 1470 self.assertEqual(9, rw.tell()) 1471 rw.seek(-4, 2) 1472 self.assertEqual(5, rw.tell()) 1473 rw.seek(2, 1) 1474 self.assertEqual(7, rw.tell()) 1475 self.assertEqual(b"fl", rw.read(11)) 1476 rw.flush() 1477 self.assertEqual(b"asdf123fl", raw.getvalue()) 1478 1479 self.assertRaises(TypeError, rw.seek, 0.0) 1480 1481 def check_flush_and_read(self, read_func): 1482 raw = self.BytesIO(b"abcdefghi") 1483 bufio = self.tp(raw) 1484 1485 self.assertEqual(b"ab", read_func(bufio, 2)) 1486 bufio.write(b"12") 1487 self.assertEqual(b"ef", read_func(bufio, 2)) 1488 self.assertEqual(6, bufio.tell()) 1489 bufio.flush() 1490 self.assertEqual(6, bufio.tell()) 1491 self.assertEqual(b"ghi", read_func(bufio)) 1492 raw.seek(0, 0) 1493 raw.write(b"XYZ") 1494 # flush() resets the read buffer 1495 bufio.flush() 1496 bufio.seek(0, 0) 1497 self.assertEqual(b"XYZ", read_func(bufio, 3)) 1498 1499 def test_flush_and_read(self): 1500 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args)) 1501 1502 def test_flush_and_readinto(self): 1503 def _readinto(bufio, n=-1): 1504 b = bytearray(n if n >= 0 else 9999) 1505 n = bufio.readinto(b) 1506 return bytes(b[:n]) 1507 self.check_flush_and_read(_readinto) 1508 1509 def test_flush_and_peek(self): 1510 def _peek(bufio, n=-1): 1511 # This relies on the fact that the buffer can contain the whole 1512 # raw stream, otherwise peek() can return less. 1513 b = bufio.peek(n) 1514 if n != -1: 1515 b = b[:n] 1516 bufio.seek(len(b), 1) 1517 return b 1518 self.check_flush_and_read(_peek) 1519 1520 def test_flush_and_write(self): 1521 raw = self.BytesIO(b"abcdefghi") 1522 bufio = self.tp(raw) 1523 1524 bufio.write(b"123") 1525 bufio.flush() 1526 bufio.write(b"45") 1527 bufio.flush() 1528 bufio.seek(0, 0) 1529 self.assertEqual(b"12345fghi", raw.getvalue()) 1530 self.assertEqual(b"12345fghi", bufio.read()) 1531 1532 def test_threads(self): 1533 BufferedReaderTest.test_threads(self) 1534 BufferedWriterTest.test_threads(self) 1535 1536 def test_writes_and_peek(self): 1537 def _peek(bufio): 1538 bufio.peek(1) 1539 self.check_writes(_peek) 1540 def _peek(bufio): 1541 pos = bufio.tell() 1542 bufio.seek(-1, 1) 1543 bufio.peek(1) 1544 bufio.seek(pos, 0) 1545 self.check_writes(_peek) 1546 1547 def test_writes_and_reads(self): 1548 def _read(bufio): 1549 bufio.seek(-1, 1) 1550 bufio.read(1) 1551 self.check_writes(_read) 1552 1553 def test_writes_and_read1s(self): 1554 def _read1(bufio): 1555 bufio.seek(-1, 1) 1556 bufio.read1(1) 1557 self.check_writes(_read1) 1558 1559 def test_writes_and_readintos(self): 1560 def _read(bufio): 1561 bufio.seek(-1, 1) 1562 bufio.readinto(bytearray(1)) 1563 self.check_writes(_read) 1564 1565 def test_write_after_readahead(self): 1566 # Issue #6629: writing after the buffer was filled by readahead should 1567 # first rewind the raw stream. 1568 for overwrite_size in [1, 5]: 1569 raw = self.BytesIO(b"A" * 10) 1570 bufio = self.tp(raw, 4) 1571 # Trigger readahead 1572 self.assertEqual(bufio.read(1), b"A") 1573 self.assertEqual(bufio.tell(), 1) 1574 # Overwriting should rewind the raw stream if it needs so 1575 bufio.write(b"B" * overwrite_size) 1576 self.assertEqual(bufio.tell(), overwrite_size + 1) 1577 # If the write size was smaller than the buffer size, flush() and 1578 # check that rewind happens. 1579 bufio.flush() 1580 self.assertEqual(bufio.tell(), overwrite_size + 1) 1581 s = raw.getvalue() 1582 self.assertEqual(s, 1583 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size)) 1584 1585 def test_write_rewind_write(self): 1586 # Various combinations of reading / writing / seeking backwards / writing again 1587 def mutate(bufio, pos1, pos2): 1588 assert pos2 >= pos1 1589 # Fill the buffer 1590 bufio.seek(pos1) 1591 bufio.read(pos2 - pos1) 1592 bufio.write(b'\x02') 1593 # This writes earlier than the previous write, but still inside 1594 # the buffer. 1595 bufio.seek(pos1) 1596 bufio.write(b'\x01') 1597 1598 b = b"\x80\x81\x82\x83\x84" 1599 for i in range(0, len(b)): 1600 for j in range(i, len(b)): 1601 raw = self.BytesIO(b) 1602 bufio = self.tp(raw, 100) 1603 mutate(bufio, i, j) 1604 bufio.flush() 1605 expected = bytearray(b) 1606 expected[j] = 2 1607 expected[i] = 1 1608 self.assertEqual(raw.getvalue(), expected, 1609 "failed result for i=%d, j=%d" % (i, j)) 1610 1611 def test_truncate_after_read_or_write(self): 1612 raw = self.BytesIO(b"A" * 10) 1613 bufio = self.tp(raw, 100) 1614 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled 1615 self.assertEqual(bufio.truncate(), 2) 1616 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases 1617 self.assertEqual(bufio.truncate(), 4) 1618 1619 def test_misbehaved_io(self): 1620 BufferedReaderTest.test_misbehaved_io(self) 1621 BufferedWriterTest.test_misbehaved_io(self) 1622 1623 def test_interleaved_read_write(self): 1624 # Test for issue #12213 1625 with self.BytesIO(b'abcdefgh') as raw: 1626 with self.tp(raw, 100) as f: 1627 f.write(b"1") 1628 self.assertEqual(f.read(1), b'b') 1629 f.write(b'2') 1630 self.assertEqual(f.read1(1), b'd') 1631 f.write(b'3') 1632 buf = bytearray(1) 1633 f.readinto(buf) 1634 self.assertEqual(buf, b'f') 1635 f.write(b'4') 1636 self.assertEqual(f.peek(1), b'h') 1637 f.flush() 1638 self.assertEqual(raw.getvalue(), b'1b2d3f4h') 1639 1640 with self.BytesIO(b'abc') as raw: 1641 with self.tp(raw, 100) as f: 1642 self.assertEqual(f.read(1), b'a') 1643 f.write(b"2") 1644 self.assertEqual(f.read(1), b'c') 1645 f.flush() 1646 self.assertEqual(raw.getvalue(), b'a2c') 1647 1648 def test_interleaved_readline_write(self): 1649 with self.BytesIO(b'ab\ncdef\ng\n') as raw: 1650 with self.tp(raw) as f: 1651 f.write(b'1') 1652 self.assertEqual(f.readline(), b'b\n') 1653 f.write(b'2') 1654 self.assertEqual(f.readline(), b'def\n') 1655 f.write(b'3') 1656 self.assertEqual(f.readline(), b'\n') 1657 f.flush() 1658 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n') 1659 1660 1661class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, 1662 BufferedRandomTest, SizeofTest): 1663 tp = io.BufferedRandom 1664 1665 def test_constructor(self): 1666 BufferedRandomTest.test_constructor(self) 1667 # The allocation can succeed on 32-bit builds, e.g. with more 1668 # than 2GB RAM and a 64-bit kernel. 1669 if sys.maxsize > 0x7FFFFFFF: 1670 rawio = self.MockRawIO() 1671 bufio = self.tp(rawio) 1672 self.assertRaises((OverflowError, MemoryError, ValueError), 1673 bufio.__init__, rawio, sys.maxsize) 1674 1675 def test_garbage_collection(self): 1676 CBufferedReaderTest.test_garbage_collection(self) 1677 CBufferedWriterTest.test_garbage_collection(self) 1678 1679 def test_args_error(self): 1680 # Issue #17275 1681 with self.assertRaisesRegexp(TypeError, "BufferedRandom"): 1682 self.tp(io.BytesIO(), 1024, 1024, 1024) 1683 1684 1685class PyBufferedRandomTest(BufferedRandomTest): 1686 tp = pyio.BufferedRandom 1687 1688 1689# To fully exercise seek/tell, the StatefulIncrementalDecoder has these 1690# properties: 1691# - A single output character can correspond to many bytes of input. 1692# - The number of input bytes to complete the character can be 1693# undetermined until the last input byte is received. 1694# - The number of input bytes can vary depending on previous input. 1695# - A single input byte can correspond to many characters of output. 1696# - The number of output characters can be undetermined until the 1697# last input byte is received. 1698# - The number of output characters can vary depending on previous input. 1699 1700class StatefulIncrementalDecoder(codecs.IncrementalDecoder): 1701 """ 1702 For testing seek/tell behavior with a stateful, buffering decoder. 1703 1704 Input is a sequence of words. Words may be fixed-length (length set 1705 by input) or variable-length (period-terminated). In variable-length 1706 mode, extra periods are ignored. Possible words are: 1707 - 'i' followed by a number sets the input length, I (maximum 99). 1708 When I is set to 0, words are space-terminated. 1709 - 'o' followed by a number sets the output length, O (maximum 99). 1710 - Any other word is converted into a word followed by a period on 1711 the output. The output word consists of the input word truncated 1712 or padded out with hyphens to make its length equal to O. If O 1713 is 0, the word is output verbatim without truncating or padding. 1714 I and O are initially set to 1. When I changes, any buffered input is 1715 re-scanned according to the new I. EOF also terminates the last word. 1716 """ 1717 1718 def __init__(self, errors='strict'): 1719 codecs.IncrementalDecoder.__init__(self, errors) 1720 self.reset() 1721 1722 def __repr__(self): 1723 return '<SID %x>' % id(self) 1724 1725 def reset(self): 1726 self.i = 1 1727 self.o = 1 1728 self.buffer = bytearray() 1729 1730 def getstate(self): 1731 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset() 1732 return bytes(self.buffer), i*100 + o 1733 1734 def setstate(self, state): 1735 buffer, io = state 1736 self.buffer = bytearray(buffer) 1737 i, o = divmod(io, 100) 1738 self.i, self.o = i ^ 1, o ^ 1 1739 1740 def decode(self, input, final=False): 1741 output = '' 1742 for b in input: 1743 if self.i == 0: # variable-length, terminated with period 1744 if b == '.': 1745 if self.buffer: 1746 output += self.process_word() 1747 else: 1748 self.buffer.append(b) 1749 else: # fixed-length, terminate after self.i bytes 1750 self.buffer.append(b) 1751 if len(self.buffer) == self.i: 1752 output += self.process_word() 1753 if final and self.buffer: # EOF terminates the last word 1754 output += self.process_word() 1755 return output 1756 1757 def process_word(self): 1758 output = '' 1759 if self.buffer[0] == ord('i'): 1760 self.i = min(99, int(self.buffer[1:] or 0)) # set input length 1761 elif self.buffer[0] == ord('o'): 1762 self.o = min(99, int(self.buffer[1:] or 0)) # set output length 1763 else: 1764 output = self.buffer.decode('ascii') 1765 if len(output) < self.o: 1766 output += '-'*self.o # pad out with hyphens 1767 if self.o: 1768 output = output[:self.o] # truncate to output length 1769 output += '.' 1770 self.buffer = bytearray() 1771 return output 1772 1773 codecEnabled = False 1774 1775 @classmethod 1776 def lookupTestDecoder(cls, name): 1777 if cls.codecEnabled and name == 'test_decoder': 1778 latin1 = codecs.lookup('latin-1') 1779 return codecs.CodecInfo( 1780 name='test_decoder', encode=latin1.encode, decode=None, 1781 incrementalencoder=None, 1782 streamreader=None, streamwriter=None, 1783 incrementaldecoder=cls) 1784 1785# Register the previous decoder for testing. 1786# Disabled by default, tests will enable it. 1787codecs.register(StatefulIncrementalDecoder.lookupTestDecoder) 1788 1789 1790class StatefulIncrementalDecoderTest(unittest.TestCase): 1791 """ 1792 Make sure the StatefulIncrementalDecoder actually works. 1793 """ 1794 1795 test_cases = [ 1796 # I=1, O=1 (fixed-length input == fixed-length output) 1797 (b'abcd', False, 'a.b.c.d.'), 1798 # I=0, O=0 (variable-length input, variable-length output) 1799 (b'oiabcd', True, 'abcd.'), 1800 # I=0, O=0 (should ignore extra periods) 1801 (b'oi...abcd...', True, 'abcd.'), 1802 # I=0, O=6 (variable-length input, fixed-length output) 1803 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'), 1804 # I=2, O=6 (fixed-length input < fixed-length output) 1805 (b'i.i2.o6xyz', True, 'xy----.z-----.'), 1806 # I=6, O=3 (fixed-length input > fixed-length output) 1807 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'), 1808 # I=0, then 3; O=29, then 15 (with longer output) 1809 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True, 1810 'a----------------------------.' + 1811 'b----------------------------.' + 1812 'cde--------------------------.' + 1813 'abcdefghijabcde.' + 1814 'a.b------------.' + 1815 '.c.------------.' + 1816 'd.e------------.' + 1817 'k--------------.' + 1818 'l--------------.' + 1819 'm--------------.') 1820 ] 1821 1822 def test_decoder(self): 1823 # Try a few one-shot test cases. 1824 for input, eof, output in self.test_cases: 1825 d = StatefulIncrementalDecoder() 1826 self.assertEqual(d.decode(input, eof), output) 1827 1828 # Also test an unfinished decode, followed by forcing EOF. 1829 d = StatefulIncrementalDecoder() 1830 self.assertEqual(d.decode(b'oiabcd'), '') 1831 self.assertEqual(d.decode(b'', 1), 'abcd.') 1832 1833class TextIOWrapperTest(unittest.TestCase): 1834 1835 def setUp(self): 1836 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n" 1837 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii") 1838 support.unlink(support.TESTFN) 1839 1840 def tearDown(self): 1841 support.unlink(support.TESTFN) 1842 1843 def test_constructor(self): 1844 r = self.BytesIO(b"\xc3\xa9\n\n") 1845 b = self.BufferedReader(r, 1000) 1846 t = self.TextIOWrapper(b) 1847 t.__init__(b, encoding="latin1", newline="\r\n") 1848 self.assertEqual(t.encoding, "latin1") 1849 self.assertEqual(t.line_buffering, False) 1850 t.__init__(b, encoding="utf8", line_buffering=True) 1851 self.assertEqual(t.encoding, "utf8") 1852 self.assertEqual(t.line_buffering, True) 1853 self.assertEqual("\xe9\n", t.readline()) 1854 self.assertRaises(TypeError, t.__init__, b, newline=42) 1855 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') 1856 1857 def test_detach(self): 1858 r = self.BytesIO() 1859 b = self.BufferedWriter(r) 1860 t = self.TextIOWrapper(b) 1861 self.assertIs(t.detach(), b) 1862 1863 t = self.TextIOWrapper(b, encoding="ascii") 1864 t.write("howdy") 1865 self.assertFalse(r.getvalue()) 1866 t.detach() 1867 self.assertEqual(r.getvalue(), b"howdy") 1868 self.assertRaises(ValueError, t.detach) 1869 1870 def test_repr(self): 1871 raw = self.BytesIO("hello".encode("utf-8")) 1872 b = self.BufferedReader(raw) 1873 t = self.TextIOWrapper(b, encoding="utf-8") 1874 modname = self.TextIOWrapper.__module__ 1875 self.assertEqual(repr(t), 1876 "<%s.TextIOWrapper encoding='utf-8'>" % modname) 1877 raw.name = "dummy" 1878 self.assertEqual(repr(t), 1879 "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname) 1880 raw.name = b"dummy" 1881 self.assertEqual(repr(t), 1882 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname) 1883 1884 def test_line_buffering(self): 1885 r = self.BytesIO() 1886 b = self.BufferedWriter(r, 1000) 1887 t = self.TextIOWrapper(b, newline="\n", line_buffering=True) 1888 t.write("X") 1889 self.assertEqual(r.getvalue(), b"") # No flush happened 1890 t.write("Y\nZ") 1891 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed 1892 t.write("A\rB") 1893 self.assertEqual(r.getvalue(), b"XY\nZA\rB") 1894 1895 def test_encoding(self): 1896 # Check the encoding attribute is always set, and valid 1897 b = self.BytesIO() 1898 t = self.TextIOWrapper(b, encoding="utf8") 1899 self.assertEqual(t.encoding, "utf8") 1900 t = self.TextIOWrapper(b) 1901 self.assertTrue(t.encoding is not None) 1902 codecs.lookup(t.encoding) 1903 1904 def test_encoding_errors_reading(self): 1905 # (1) default 1906 b = self.BytesIO(b"abc\n\xff\n") 1907 t = self.TextIOWrapper(b, encoding="ascii") 1908 self.assertRaises(UnicodeError, t.read) 1909 # (2) explicit strict 1910 b = self.BytesIO(b"abc\n\xff\n") 1911 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 1912 self.assertRaises(UnicodeError, t.read) 1913 # (3) ignore 1914 b = self.BytesIO(b"abc\n\xff\n") 1915 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore") 1916 self.assertEqual(t.read(), "abc\n\n") 1917 # (4) replace 1918 b = self.BytesIO(b"abc\n\xff\n") 1919 t = self.TextIOWrapper(b, encoding="ascii", errors="replace") 1920 self.assertEqual(t.read(), "abc\n\ufffd\n") 1921 1922 def test_encoding_errors_writing(self): 1923 # (1) default 1924 b = self.BytesIO() 1925 t = self.TextIOWrapper(b, encoding="ascii") 1926 self.assertRaises(UnicodeError, t.write, "\xff") 1927 # (2) explicit strict 1928 b = self.BytesIO() 1929 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 1930 self.assertRaises(UnicodeError, t.write, "\xff") 1931 # (3) ignore 1932 b = self.BytesIO() 1933 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore", 1934 newline="\n") 1935 t.write("abc\xffdef\n") 1936 t.flush() 1937 self.assertEqual(b.getvalue(), b"abcdef\n") 1938 # (4) replace 1939 b = self.BytesIO() 1940 t = self.TextIOWrapper(b, encoding="ascii", errors="replace", 1941 newline="\n") 1942 t.write("abc\xffdef\n") 1943 t.flush() 1944 self.assertEqual(b.getvalue(), b"abc?def\n") 1945 1946 def test_newlines(self): 1947 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ] 1948 1949 tests = [ 1950 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ], 1951 [ '', input_lines ], 1952 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ], 1953 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ], 1954 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ], 1955 ] 1956 encodings = ( 1957 'utf-8', 'latin-1', 1958 'utf-16', 'utf-16-le', 'utf-16-be', 1959 'utf-32', 'utf-32-le', 'utf-32-be', 1960 ) 1961 1962 # Try a range of buffer sizes to test the case where \r is the last 1963 # character in TextIOWrapper._pending_line. 1964 for encoding in encodings: 1965 # XXX: str.encode() should return bytes 1966 data = bytes(''.join(input_lines).encode(encoding)) 1967 for do_reads in (False, True): 1968 for bufsize in range(1, 10): 1969 for newline, exp_lines in tests: 1970 bufio = self.BufferedReader(self.BytesIO(data), bufsize) 1971 textio = self.TextIOWrapper(bufio, newline=newline, 1972 encoding=encoding) 1973 if do_reads: 1974 got_lines = [] 1975 while True: 1976 c2 = textio.read(2) 1977 if c2 == '': 1978 break 1979 self.assertEqual(len(c2), 2) 1980 got_lines.append(c2 + textio.readline()) 1981 else: 1982 got_lines = list(textio) 1983 1984 for got_line, exp_line in zip(got_lines, exp_lines): 1985 self.assertEqual(got_line, exp_line) 1986 self.assertEqual(len(got_lines), len(exp_lines)) 1987 1988 def test_newlines_input(self): 1989 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" 1990 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") 1991 for newline, expected in [ 1992 (None, normalized.decode("ascii").splitlines(True)), 1993 ("", testdata.decode("ascii").splitlines(True)), 1994 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 1995 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 1996 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), 1997 ]: 1998 buf = self.BytesIO(testdata) 1999 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 2000 self.assertEqual(txt.readlines(), expected) 2001 txt.seek(0) 2002 self.assertEqual(txt.read(), "".join(expected)) 2003 2004 def test_newlines_output(self): 2005 testdict = { 2006 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 2007 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 2008 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ", 2009 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ", 2010 } 2011 tests = [(None, testdict[os.linesep])] + sorted(testdict.items()) 2012 for newline, expected in tests: 2013 buf = self.BytesIO() 2014 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 2015 txt.write("AAA\nB") 2016 txt.write("BB\nCCC\n") 2017 txt.write("X\rY\r\nZ") 2018 txt.flush() 2019 self.assertEqual(buf.closed, False) 2020 self.assertEqual(buf.getvalue(), expected) 2021 2022 def test_destructor(self): 2023 l = [] 2024 base = self.BytesIO 2025 class MyBytesIO(base): 2026 def close(self): 2027 l.append(self.getvalue()) 2028 base.close(self) 2029 b = MyBytesIO() 2030 t = self.TextIOWrapper(b, encoding="ascii") 2031 t.write("abc") 2032 del t 2033 support.gc_collect() 2034 self.assertEqual([b"abc"], l) 2035 2036 def test_override_destructor(self): 2037 record = [] 2038 class MyTextIO(self.TextIOWrapper): 2039 def __del__(self): 2040 record.append(1) 2041 try: 2042 f = super(MyTextIO, self).__del__ 2043 except AttributeError: 2044 pass 2045 else: 2046 f() 2047 def close(self): 2048 record.append(2) 2049 super(MyTextIO, self).close() 2050 def flush(self): 2051 record.append(3) 2052 super(MyTextIO, self).flush() 2053 b = self.BytesIO() 2054 t = MyTextIO(b, encoding="ascii") 2055 del t 2056 support.gc_collect() 2057 self.assertEqual(record, [1, 2, 3]) 2058 2059 def test_error_through_destructor(self): 2060 # Test that the exception state is not modified by a destructor, 2061 # even if close() fails. 2062 rawio = self.CloseFailureIO() 2063 def f(): 2064 self.TextIOWrapper(rawio).xyzzy 2065 with support.captured_output("stderr") as s: 2066 self.assertRaises(AttributeError, f) 2067 s = s.getvalue().strip() 2068 if s: 2069 # The destructor *may* have printed an unraisable error, check it 2070 self.assertEqual(len(s.splitlines()), 1) 2071 self.assertTrue(s.startswith("Exception IOError: "), s) 2072 self.assertTrue(s.endswith(" ignored"), s) 2073 2074 # Systematic tests of the text I/O API 2075 2076 def test_basic_io(self): 2077 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65): 2078 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le": 2079 f = self.open(support.TESTFN, "w+", encoding=enc) 2080 f._CHUNK_SIZE = chunksize 2081 self.assertEqual(f.write("abc"), 3) 2082 f.close() 2083 f = self.open(support.TESTFN, "r+", encoding=enc) 2084 f._CHUNK_SIZE = chunksize 2085 self.assertEqual(f.tell(), 0) 2086 self.assertEqual(f.read(), "abc") 2087 cookie = f.tell() 2088 self.assertEqual(f.seek(0), 0) 2089 self.assertEqual(f.read(None), "abc") 2090 f.seek(0) 2091 self.assertEqual(f.read(2), "ab") 2092 self.assertEqual(f.read(1), "c") 2093 self.assertEqual(f.read(1), "") 2094 self.assertEqual(f.read(), "") 2095 self.assertEqual(f.tell(), cookie) 2096 self.assertEqual(f.seek(0), 0) 2097 self.assertEqual(f.seek(0, 2), cookie) 2098 self.assertEqual(f.write("def"), 3) 2099 self.assertEqual(f.seek(cookie), cookie) 2100 self.assertEqual(f.read(), "def") 2101 if enc.startswith("utf"): 2102 self.multi_line_test(f, enc) 2103 f.close() 2104 2105 def multi_line_test(self, f, enc): 2106 f.seek(0) 2107 f.truncate() 2108 sample = "s\xff\u0fff\uffff" 2109 wlines = [] 2110 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000): 2111 chars = [] 2112 for i in range(size): 2113 chars.append(sample[i % len(sample)]) 2114 line = "".join(chars) + "\n" 2115 wlines.append((f.tell(), line)) 2116 f.write(line) 2117 f.seek(0) 2118 rlines = [] 2119 while True: 2120 pos = f.tell() 2121 line = f.readline() 2122 if not line: 2123 break 2124 rlines.append((pos, line)) 2125 self.assertEqual(rlines, wlines) 2126 2127 def test_telling(self): 2128 f = self.open(support.TESTFN, "w+", encoding="utf8") 2129 p0 = f.tell() 2130 f.write("\xff\n") 2131 p1 = f.tell() 2132 f.write("\xff\n") 2133 p2 = f.tell() 2134 f.seek(0) 2135 self.assertEqual(f.tell(), p0) 2136 self.assertEqual(f.readline(), "\xff\n") 2137 self.assertEqual(f.tell(), p1) 2138 self.assertEqual(f.readline(), "\xff\n") 2139 self.assertEqual(f.tell(), p2) 2140 f.seek(0) 2141 for line in f: 2142 self.assertEqual(line, "\xff\n") 2143 self.assertRaises(IOError, f.tell) 2144 self.assertEqual(f.tell(), p2) 2145 f.close() 2146 2147 def test_seeking(self): 2148 chunk_size = _default_chunk_size() 2149 prefix_size = chunk_size - 2 2150 u_prefix = "a" * prefix_size 2151 prefix = bytes(u_prefix.encode("utf-8")) 2152 self.assertEqual(len(u_prefix), len(prefix)) 2153 u_suffix = "\u8888\n" 2154 suffix = bytes(u_suffix.encode("utf-8")) 2155 line = prefix + suffix 2156 f = self.open(support.TESTFN, "wb") 2157 f.write(line*2) 2158 f.close() 2159 f = self.open(support.TESTFN, "r", encoding="utf-8") 2160 s = f.read(prefix_size) 2161 self.assertEqual(s, prefix.decode("ascii")) 2162 self.assertEqual(f.tell(), prefix_size) 2163 self.assertEqual(f.readline(), u_suffix) 2164 2165 def test_seeking_too(self): 2166 # Regression test for a specific bug 2167 data = b'\xe0\xbf\xbf\n' 2168 f = self.open(support.TESTFN, "wb") 2169 f.write(data) 2170 f.close() 2171 f = self.open(support.TESTFN, "r", encoding="utf-8") 2172 f._CHUNK_SIZE # Just test that it exists 2173 f._CHUNK_SIZE = 2 2174 f.readline() 2175 f.tell() 2176 2177 def test_seek_and_tell(self): 2178 #Test seek/tell using the StatefulIncrementalDecoder. 2179 # Make test faster by doing smaller seeks 2180 CHUNK_SIZE = 128 2181 2182 def test_seek_and_tell_with_data(data, min_pos=0): 2183 """Tell/seek to various points within a data stream and ensure 2184 that the decoded data returned by read() is consistent.""" 2185 f = self.open(support.TESTFN, 'wb') 2186 f.write(data) 2187 f.close() 2188 f = self.open(support.TESTFN, encoding='test_decoder') 2189 f._CHUNK_SIZE = CHUNK_SIZE 2190 decoded = f.read() 2191 f.close() 2192 2193 for i in range(min_pos, len(decoded) + 1): # seek positions 2194 for j in [1, 5, len(decoded) - i]: # read lengths 2195 f = self.open(support.TESTFN, encoding='test_decoder') 2196 self.assertEqual(f.read(i), decoded[:i]) 2197 cookie = f.tell() 2198 self.assertEqual(f.read(j), decoded[i:i + j]) 2199 f.seek(cookie) 2200 self.assertEqual(f.read(), decoded[i:]) 2201 f.close() 2202 2203 # Enable the test decoder. 2204 StatefulIncrementalDecoder.codecEnabled = 1 2205 2206 # Run the tests. 2207 try: 2208 # Try each test case. 2209 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 2210 test_seek_and_tell_with_data(input) 2211 2212 # Position each test case so that it crosses a chunk boundary. 2213 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 2214 offset = CHUNK_SIZE - len(input)//2 2215 prefix = b'.'*offset 2216 # Don't bother seeking into the prefix (takes too long). 2217 min_pos = offset*2 2218 test_seek_and_tell_with_data(prefix + input, min_pos) 2219 2220 # Ensure our test decoder won't interfere with subsequent tests. 2221 finally: 2222 StatefulIncrementalDecoder.codecEnabled = 0 2223 2224 def test_encoded_writes(self): 2225 data = "1234567890" 2226 tests = ("utf-16", 2227 "utf-16-le", 2228 "utf-16-be", 2229 "utf-32", 2230 "utf-32-le", 2231 "utf-32-be") 2232 for encoding in tests: 2233 buf = self.BytesIO() 2234 f = self.TextIOWrapper(buf, encoding=encoding) 2235 # Check if the BOM is written only once (see issue1753). 2236 f.write(data) 2237 f.write(data) 2238 f.seek(0) 2239 self.assertEqual(f.read(), data * 2) 2240 f.seek(0) 2241 self.assertEqual(f.read(), data * 2) 2242 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding)) 2243 2244 def test_unreadable(self): 2245 class UnReadable(self.BytesIO): 2246 def readable(self): 2247 return False 2248 txt = self.TextIOWrapper(UnReadable()) 2249 self.assertRaises(IOError, txt.read) 2250 2251 def test_read_one_by_one(self): 2252 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB")) 2253 reads = "" 2254 while True: 2255 c = txt.read(1) 2256 if not c: 2257 break 2258 reads += c 2259 self.assertEqual(reads, "AA\nBB") 2260 2261 def test_readlines(self): 2262 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC")) 2263 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"]) 2264 txt.seek(0) 2265 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"]) 2266 txt.seek(0) 2267 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"]) 2268 2269 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128. 2270 def test_read_by_chunk(self): 2271 # make sure "\r\n" straddles 128 char boundary. 2272 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB")) 2273 reads = "" 2274 while True: 2275 c = txt.read(128) 2276 if not c: 2277 break 2278 reads += c 2279 self.assertEqual(reads, "A"*127+"\nB") 2280 2281 def test_writelines(self): 2282 l = ['ab', 'cd', 'ef'] 2283 buf = self.BytesIO() 2284 txt = self.TextIOWrapper(buf) 2285 txt.writelines(l) 2286 txt.flush() 2287 self.assertEqual(buf.getvalue(), b'abcdef') 2288 2289 def test_writelines_userlist(self): 2290 l = UserList(['ab', 'cd', 'ef']) 2291 buf = self.BytesIO() 2292 txt = self.TextIOWrapper(buf) 2293 txt.writelines(l) 2294 txt.flush() 2295 self.assertEqual(buf.getvalue(), b'abcdef') 2296 2297 def test_writelines_error(self): 2298 txt = self.TextIOWrapper(self.BytesIO()) 2299 self.assertRaises(TypeError, txt.writelines, [1, 2, 3]) 2300 self.assertRaises(TypeError, txt.writelines, None) 2301 self.assertRaises(TypeError, txt.writelines, b'abc') 2302 2303 def test_issue1395_1(self): 2304 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2305 2306 # read one char at a time 2307 reads = "" 2308 while True: 2309 c = txt.read(1) 2310 if not c: 2311 break 2312 reads += c 2313 self.assertEqual(reads, self.normalized) 2314 2315 def test_issue1395_2(self): 2316 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2317 txt._CHUNK_SIZE = 4 2318 2319 reads = "" 2320 while True: 2321 c = txt.read(4) 2322 if not c: 2323 break 2324 reads += c 2325 self.assertEqual(reads, self.normalized) 2326 2327 def test_issue1395_3(self): 2328 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2329 txt._CHUNK_SIZE = 4 2330 2331 reads = txt.read(4) 2332 reads += txt.read(4) 2333 reads += txt.readline() 2334 reads += txt.readline() 2335 reads += txt.readline() 2336 self.assertEqual(reads, self.normalized) 2337 2338 def test_issue1395_4(self): 2339 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2340 txt._CHUNK_SIZE = 4 2341 2342 reads = txt.read(4) 2343 reads += txt.read() 2344 self.assertEqual(reads, self.normalized) 2345 2346 def test_issue1395_5(self): 2347 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2348 txt._CHUNK_SIZE = 4 2349 2350 reads = txt.read(4) 2351 pos = txt.tell() 2352 txt.seek(0) 2353 txt.seek(pos) 2354 self.assertEqual(txt.read(4), "BBB\n") 2355 2356 def test_issue2282(self): 2357 buffer = self.BytesIO(self.testdata) 2358 txt = self.TextIOWrapper(buffer, encoding="ascii") 2359 2360 self.assertEqual(buffer.seekable(), txt.seekable()) 2361 2362 def test_append_bom(self): 2363 # The BOM is not written again when appending to a non-empty file 2364 filename = support.TESTFN 2365 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 2366 with self.open(filename, 'w', encoding=charset) as f: 2367 f.write('aaa') 2368 pos = f.tell() 2369 with self.open(filename, 'rb') as f: 2370 self.assertEqual(f.read(), 'aaa'.encode(charset)) 2371 2372 with self.open(filename, 'a', encoding=charset) as f: 2373 f.write('xxx') 2374 with self.open(filename, 'rb') as f: 2375 self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) 2376 2377 def test_seek_bom(self): 2378 # Same test, but when seeking manually 2379 filename = support.TESTFN 2380 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 2381 with self.open(filename, 'w', encoding=charset) as f: 2382 f.write('aaa') 2383 pos = f.tell() 2384 with self.open(filename, 'r+', encoding=charset) as f: 2385 f.seek(pos) 2386 f.write('zzz') 2387 f.seek(0) 2388 f.write('bbb') 2389 with self.open(filename, 'rb') as f: 2390 self.assertEqual(f.read(), 'bbbzzz'.encode(charset)) 2391 2392 def test_errors_property(self): 2393 with self.open(support.TESTFN, "w") as f: 2394 self.assertEqual(f.errors, "strict") 2395 with self.open(support.TESTFN, "w", errors="replace") as f: 2396 self.assertEqual(f.errors, "replace") 2397 2398 @unittest.skipUnless(threading, 'Threading required for this test.') 2399 def test_threads_write(self): 2400 # Issue6750: concurrent writes could duplicate data 2401 event = threading.Event() 2402 with self.open(support.TESTFN, "w", buffering=1) as f: 2403 def run(n): 2404 text = "Thread%03d\n" % n 2405 event.wait() 2406 f.write(text) 2407 threads = [threading.Thread(target=lambda n=x: run(n)) 2408 for x in range(20)] 2409 for t in threads: 2410 t.start() 2411 time.sleep(0.02) 2412 event.set() 2413 for t in threads: 2414 t.join() 2415 with self.open(support.TESTFN) as f: 2416 content = f.read() 2417 for n in range(20): 2418 self.assertEqual(content.count("Thread%03d\n" % n), 1) 2419 2420 def test_flush_error_on_close(self): 2421 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2422 def bad_flush(): 2423 raise IOError() 2424 txt.flush = bad_flush 2425 self.assertRaises(IOError, txt.close) # exception not swallowed 2426 self.assertTrue(txt.closed) 2427 2428 def test_multi_close(self): 2429 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2430 txt.close() 2431 txt.close() 2432 txt.close() 2433 self.assertRaises(ValueError, txt.flush) 2434 2435 def test_readonly_attributes(self): 2436 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 2437 buf = self.BytesIO(self.testdata) 2438 with self.assertRaises((AttributeError, TypeError)): 2439 txt.buffer = buf 2440 2441 def test_read_nonbytes(self): 2442 # Issue #17106 2443 # Crash when underlying read() returns non-bytes 2444 class NonbytesStream(self.StringIO): 2445 read1 = self.StringIO.read 2446 class NonbytesStream(self.StringIO): 2447 read1 = self.StringIO.read 2448 t = self.TextIOWrapper(NonbytesStream('a')) 2449 with self.maybeRaises(TypeError): 2450 t.read(1) 2451 t = self.TextIOWrapper(NonbytesStream('a')) 2452 with self.maybeRaises(TypeError): 2453 t.readline() 2454 t = self.TextIOWrapper(NonbytesStream('a')) 2455 self.assertEqual(t.read(), u'a') 2456 2457 def test_illegal_decoder(self): 2458 # Issue #17106 2459 # Crash when decoder returns non-string 2460 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n', 2461 encoding='quopri_codec') 2462 with self.maybeRaises(TypeError): 2463 t.read(1) 2464 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n', 2465 encoding='quopri_codec') 2466 with self.maybeRaises(TypeError): 2467 t.readline() 2468 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n', 2469 encoding='quopri_codec') 2470 with self.maybeRaises(TypeError): 2471 t.read() 2472 2473 2474class CTextIOWrapperTest(TextIOWrapperTest): 2475 2476 def test_initialization(self): 2477 r = self.BytesIO(b"\xc3\xa9\n\n") 2478 b = self.BufferedReader(r, 1000) 2479 t = self.TextIOWrapper(b) 2480 self.assertRaises(TypeError, t.__init__, b, newline=42) 2481 self.assertRaises(ValueError, t.read) 2482 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') 2483 self.assertRaises(ValueError, t.read) 2484 2485 def test_garbage_collection(self): 2486 # C TextIOWrapper objects are collected, and collecting them flushes 2487 # all data to disk. 2488 # The Python version has __del__, so it ends in gc.garbage instead. 2489 rawio = io.FileIO(support.TESTFN, "wb") 2490 b = self.BufferedWriter(rawio) 2491 t = self.TextIOWrapper(b, encoding="ascii") 2492 t.write("456def") 2493 t.x = t 2494 wr = weakref.ref(t) 2495 del t 2496 support.gc_collect() 2497 self.assertTrue(wr() is None, wr) 2498 with self.open(support.TESTFN, "rb") as f: 2499 self.assertEqual(f.read(), b"456def") 2500 2501 def test_rwpair_cleared_before_textio(self): 2502 # Issue 13070: TextIOWrapper's finalization would crash when called 2503 # after the reference to the underlying BufferedRWPair's writer got 2504 # cleared by the GC. 2505 for i in range(1000): 2506 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) 2507 t1 = self.TextIOWrapper(b1, encoding="ascii") 2508 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) 2509 t2 = self.TextIOWrapper(b2, encoding="ascii") 2510 # circular references 2511 t1.buddy = t2 2512 t2.buddy = t1 2513 support.gc_collect() 2514 2515 maybeRaises = unittest.TestCase.assertRaises 2516 2517 2518class PyTextIOWrapperTest(TextIOWrapperTest): 2519 @contextlib.contextmanager 2520 def maybeRaises(self, *args, **kwds): 2521 yield 2522 2523 2524class IncrementalNewlineDecoderTest(unittest.TestCase): 2525 2526 def check_newline_decoding_utf8(self, decoder): 2527 # UTF-8 specific tests for a newline decoder 2528 def _check_decode(b, s, **kwargs): 2529 # We exercise getstate() / setstate() as well as decode() 2530 state = decoder.getstate() 2531 self.assertEqual(decoder.decode(b, **kwargs), s) 2532 decoder.setstate(state) 2533 self.assertEqual(decoder.decode(b, **kwargs), s) 2534 2535 _check_decode(b'\xe8\xa2\x88', "\u8888") 2536 2537 _check_decode(b'\xe8', "") 2538 _check_decode(b'\xa2', "") 2539 _check_decode(b'\x88', "\u8888") 2540 2541 _check_decode(b'\xe8', "") 2542 _check_decode(b'\xa2', "") 2543 _check_decode(b'\x88', "\u8888") 2544 2545 _check_decode(b'\xe8', "") 2546 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True) 2547 2548 decoder.reset() 2549 _check_decode(b'\n', "\n") 2550 _check_decode(b'\r', "") 2551 _check_decode(b'', "\n", final=True) 2552 _check_decode(b'\r', "\n", final=True) 2553 2554 _check_decode(b'\r', "") 2555 _check_decode(b'a', "\na") 2556 2557 _check_decode(b'\r\r\n', "\n\n") 2558 _check_decode(b'\r', "") 2559 _check_decode(b'\r', "\n") 2560 _check_decode(b'\na', "\na") 2561 2562 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n") 2563 _check_decode(b'\xe8\xa2\x88', "\u8888") 2564 _check_decode(b'\n', "\n") 2565 _check_decode(b'\xe8\xa2\x88\r', "\u8888") 2566 _check_decode(b'\n', "\n") 2567 2568 def check_newline_decoding(self, decoder, encoding): 2569 result = [] 2570 if encoding is not None: 2571 encoder = codecs.getincrementalencoder(encoding)() 2572 def _decode_bytewise(s): 2573 # Decode one byte at a time 2574 for b in encoder.encode(s): 2575 result.append(decoder.decode(b)) 2576 else: 2577 encoder = None 2578 def _decode_bytewise(s): 2579 # Decode one char at a time 2580 for c in s: 2581 result.append(decoder.decode(c)) 2582 self.assertEqual(decoder.newlines, None) 2583 _decode_bytewise("abc\n\r") 2584 self.assertEqual(decoder.newlines, '\n') 2585 _decode_bytewise("\nabc") 2586 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 2587 _decode_bytewise("abc\r") 2588 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 2589 _decode_bytewise("abc") 2590 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n')) 2591 _decode_bytewise("abc\r") 2592 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc") 2593 decoder.reset() 2594 input = "abc" 2595 if encoder is not None: 2596 encoder.reset() 2597 input = encoder.encode(input) 2598 self.assertEqual(decoder.decode(input), "abc") 2599 self.assertEqual(decoder.newlines, None) 2600 2601 def test_newline_decoder(self): 2602 encodings = ( 2603 # None meaning the IncrementalNewlineDecoder takes unicode input 2604 # rather than bytes input 2605 None, 'utf-8', 'latin-1', 2606 'utf-16', 'utf-16-le', 'utf-16-be', 2607 'utf-32', 'utf-32-le', 'utf-32-be', 2608 ) 2609 for enc in encodings: 2610 decoder = enc and codecs.getincrementaldecoder(enc)() 2611 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 2612 self.check_newline_decoding(decoder, enc) 2613 decoder = codecs.getincrementaldecoder("utf-8")() 2614 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 2615 self.check_newline_decoding_utf8(decoder) 2616 2617 def test_newline_bytes(self): 2618 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder 2619 def _check(dec): 2620 self.assertEqual(dec.newlines, None) 2621 self.assertEqual(dec.decode("\u0D00"), "\u0D00") 2622 self.assertEqual(dec.newlines, None) 2623 self.assertEqual(dec.decode("\u0A00"), "\u0A00") 2624 self.assertEqual(dec.newlines, None) 2625 dec = self.IncrementalNewlineDecoder(None, translate=False) 2626 _check(dec) 2627 dec = self.IncrementalNewlineDecoder(None, translate=True) 2628 _check(dec) 2629 2630class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 2631 pass 2632 2633class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 2634 pass 2635 2636 2637# XXX Tests for open() 2638 2639class MiscIOTest(unittest.TestCase): 2640 2641 def tearDown(self): 2642 support.unlink(support.TESTFN) 2643 2644 def test___all__(self): 2645 for name in self.io.__all__: 2646 obj = getattr(self.io, name, None) 2647 self.assertTrue(obj is not None, name) 2648 if name == "open": 2649 continue 2650 elif "error" in name.lower() or name == "UnsupportedOperation": 2651 self.assertTrue(issubclass(obj, Exception), name) 2652 elif not name.startswith("SEEK_"): 2653 self.assertTrue(issubclass(obj, self.IOBase)) 2654 2655 def test_attributes(self): 2656 f = self.open(support.TESTFN, "wb", buffering=0) 2657 self.assertEqual(f.mode, "wb") 2658 f.close() 2659 2660 f = self.open(support.TESTFN, "U") 2661 self.assertEqual(f.name, support.TESTFN) 2662 self.assertEqual(f.buffer.name, support.TESTFN) 2663 self.assertEqual(f.buffer.raw.name, support.TESTFN) 2664 self.assertEqual(f.mode, "U") 2665 self.assertEqual(f.buffer.mode, "rb") 2666 self.assertEqual(f.buffer.raw.mode, "rb") 2667 f.close() 2668 2669 f = self.open(support.TESTFN, "w+") 2670 self.assertEqual(f.mode, "w+") 2671 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter? 2672 self.assertEqual(f.buffer.raw.mode, "rb+") 2673 2674 g = self.open(f.fileno(), "wb", closefd=False) 2675 self.assertEqual(g.mode, "wb") 2676 self.assertEqual(g.raw.mode, "wb") 2677 self.assertEqual(g.name, f.fileno()) 2678 self.assertEqual(g.raw.name, f.fileno()) 2679 f.close() 2680 g.close() 2681 2682 def test_io_after_close(self): 2683 for kwargs in [ 2684 {"mode": "w"}, 2685 {"mode": "wb"}, 2686 {"mode": "w", "buffering": 1}, 2687 {"mode": "w", "buffering": 2}, 2688 {"mode": "wb", "buffering": 0}, 2689 {"mode": "r"}, 2690 {"mode": "rb"}, 2691 {"mode": "r", "buffering": 1}, 2692 {"mode": "r", "buffering": 2}, 2693 {"mode": "rb", "buffering": 0}, 2694 {"mode": "w+"}, 2695 {"mode": "w+b"}, 2696 {"mode": "w+", "buffering": 1}, 2697 {"mode": "w+", "buffering": 2}, 2698 {"mode": "w+b", "buffering": 0}, 2699 ]: 2700 f = self.open(support.TESTFN, **kwargs) 2701 f.close() 2702 self.assertRaises(ValueError, f.flush) 2703 self.assertRaises(ValueError, f.fileno) 2704 self.assertRaises(ValueError, f.isatty) 2705 self.assertRaises(ValueError, f.__iter__) 2706 if hasattr(f, "peek"): 2707 self.assertRaises(ValueError, f.peek, 1) 2708 self.assertRaises(ValueError, f.read) 2709 if hasattr(f, "read1"): 2710 self.assertRaises(ValueError, f.read1, 1024) 2711 if hasattr(f, "readall"): 2712 self.assertRaises(ValueError, f.readall) 2713 if hasattr(f, "readinto"): 2714 self.assertRaises(ValueError, f.readinto, bytearray(1024)) 2715 self.assertRaises(ValueError, f.readline) 2716 self.assertRaises(ValueError, f.readlines) 2717 self.assertRaises(ValueError, f.seek, 0) 2718 self.assertRaises(ValueError, f.tell) 2719 self.assertRaises(ValueError, f.truncate) 2720 self.assertRaises(ValueError, f.write, 2721 b"" if "b" in kwargs['mode'] else "") 2722 self.assertRaises(ValueError, f.writelines, []) 2723 self.assertRaises(ValueError, next, f) 2724 2725 def test_blockingioerror(self): 2726 # Various BlockingIOError issues 2727 self.assertRaises(TypeError, self.BlockingIOError) 2728 self.assertRaises(TypeError, self.BlockingIOError, 1) 2729 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4) 2730 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None) 2731 b = self.BlockingIOError(1, "") 2732 self.assertEqual(b.characters_written, 0) 2733 class C(unicode): 2734 pass 2735 c = C("") 2736 b = self.BlockingIOError(1, c) 2737 c.b = b 2738 b.c = c 2739 wr = weakref.ref(c) 2740 del c, b 2741 support.gc_collect() 2742 self.assertTrue(wr() is None, wr) 2743 2744 def test_abcs(self): 2745 # Test the visible base classes are ABCs. 2746 self.assertIsInstance(self.IOBase, abc.ABCMeta) 2747 self.assertIsInstance(self.RawIOBase, abc.ABCMeta) 2748 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta) 2749 self.assertIsInstance(self.TextIOBase, abc.ABCMeta) 2750 2751 def _check_abc_inheritance(self, abcmodule): 2752 with self.open(support.TESTFN, "wb", buffering=0) as f: 2753 self.assertIsInstance(f, abcmodule.IOBase) 2754 self.assertIsInstance(f, abcmodule.RawIOBase) 2755 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 2756 self.assertNotIsInstance(f, abcmodule.TextIOBase) 2757 with self.open(support.TESTFN, "wb") as f: 2758 self.assertIsInstance(f, abcmodule.IOBase) 2759 self.assertNotIsInstance(f, abcmodule.RawIOBase) 2760 self.assertIsInstance(f, abcmodule.BufferedIOBase) 2761 self.assertNotIsInstance(f, abcmodule.TextIOBase) 2762 with self.open(support.TESTFN, "w") as f: 2763 self.assertIsInstance(f, abcmodule.IOBase) 2764 self.assertNotIsInstance(f, abcmodule.RawIOBase) 2765 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 2766 self.assertIsInstance(f, abcmodule.TextIOBase) 2767 2768 def test_abc_inheritance(self): 2769 # Test implementations inherit from their respective ABCs 2770 self._check_abc_inheritance(self) 2771 2772 def test_abc_inheritance_official(self): 2773 # Test implementations inherit from the official ABCs of the 2774 # baseline "io" module. 2775 self._check_abc_inheritance(io) 2776 2777 @unittest.skipUnless(fcntl, 'fcntl required for this test') 2778 def test_nonblock_pipe_write_bigbuf(self): 2779 self._test_nonblock_pipe_write(16*1024) 2780 2781 @unittest.skipUnless(fcntl, 'fcntl required for this test') 2782 def test_nonblock_pipe_write_smallbuf(self): 2783 self._test_nonblock_pipe_write(1024) 2784 2785 def _set_non_blocking(self, fd): 2786 flags = fcntl.fcntl(fd, fcntl.F_GETFL) 2787 self.assertNotEqual(flags, -1) 2788 res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) 2789 self.assertEqual(res, 0) 2790 2791 def _test_nonblock_pipe_write(self, bufsize): 2792 sent = [] 2793 received = [] 2794 r, w = os.pipe() 2795 self._set_non_blocking(r) 2796 self._set_non_blocking(w) 2797 2798 # To exercise all code paths in the C implementation we need 2799 # to play with buffer sizes. For instance, if we choose a 2800 # buffer size less than or equal to _PIPE_BUF (4096 on Linux) 2801 # then we will never get a partial write of the buffer. 2802 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize) 2803 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize) 2804 2805 with rf, wf: 2806 for N in 9999, 73, 7574: 2807 try: 2808 i = 0 2809 while True: 2810 msg = bytes([i % 26 + 97]) * N 2811 sent.append(msg) 2812 wf.write(msg) 2813 i += 1 2814 2815 except self.BlockingIOError as e: 2816 self.assertEqual(e.args[0], errno.EAGAIN) 2817 sent[-1] = sent[-1][:e.characters_written] 2818 received.append(rf.read()) 2819 msg = b'BLOCKED' 2820 wf.write(msg) 2821 sent.append(msg) 2822 2823 while True: 2824 try: 2825 wf.flush() 2826 break 2827 except self.BlockingIOError as e: 2828 self.assertEqual(e.args[0], errno.EAGAIN) 2829 self.assertEqual(e.characters_written, 0) 2830 received.append(rf.read()) 2831 2832 received += iter(rf.read, None) 2833 2834 sent, received = b''.join(sent), b''.join(received) 2835 self.assertTrue(sent == received) 2836 self.assertTrue(wf.closed) 2837 self.assertTrue(rf.closed) 2838 2839class CMiscIOTest(MiscIOTest): 2840 io = io 2841 2842class PyMiscIOTest(MiscIOTest): 2843 io = pyio 2844 2845 2846@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.') 2847class SignalsTest(unittest.TestCase): 2848 2849 def setUp(self): 2850 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt) 2851 2852 def tearDown(self): 2853 signal.signal(signal.SIGALRM, self.oldalrm) 2854 2855 def alarm_interrupt(self, sig, frame): 2856 1 // 0 2857 2858 @unittest.skipUnless(threading, 'Threading required for this test.') 2859 @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'), 2860 'issue #12429: skip test on FreeBSD <= 7') 2861 def check_interrupted_write(self, item, bytes, **fdopen_kwargs): 2862 """Check that a partial write, when it gets interrupted, properly 2863 invokes the signal handler, and bubbles up the exception raised 2864 in the latter.""" 2865 read_results = [] 2866 def _read(): 2867 s = os.read(r, 1) 2868 read_results.append(s) 2869 t = threading.Thread(target=_read) 2870 t.daemon = True 2871 r, w = os.pipe() 2872 try: 2873 wio = self.io.open(w, **fdopen_kwargs) 2874 t.start() 2875 signal.alarm(1) 2876 # Fill the pipe enough that the write will be blocking. 2877 # It will be interrupted by the timer armed above. Since the 2878 # other thread has read one byte, the low-level write will 2879 # return with a successful (partial) result rather than an EINTR. 2880 # The buffered IO layer must check for pending signal 2881 # handlers, which in this case will invoke alarm_interrupt(). 2882 self.assertRaises(ZeroDivisionError, 2883 wio.write, item * (support.PIPE_MAX_SIZE // len(item) + 1)) 2884 t.join() 2885 # We got one byte, get another one and check that it isn't a 2886 # repeat of the first one. 2887 read_results.append(os.read(r, 1)) 2888 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]]) 2889 finally: 2890 os.close(w) 2891 os.close(r) 2892 # This is deliberate. If we didn't close the file descriptor 2893 # before closing wio, wio would try to flush its internal 2894 # buffer, and block again. 2895 try: 2896 wio.close() 2897 except IOError as e: 2898 if e.errno != errno.EBADF: 2899 raise 2900 2901 def test_interrupted_write_unbuffered(self): 2902 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0) 2903 2904 def test_interrupted_write_buffered(self): 2905 self.check_interrupted_write(b"xy", b"xy", mode="wb") 2906 2907 def test_interrupted_write_text(self): 2908 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii") 2909 2910 def check_reentrant_write(self, data, **fdopen_kwargs): 2911 def on_alarm(*args): 2912 # Will be called reentrantly from the same thread 2913 wio.write(data) 2914 1//0 2915 signal.signal(signal.SIGALRM, on_alarm) 2916 r, w = os.pipe() 2917 wio = self.io.open(w, **fdopen_kwargs) 2918 try: 2919 signal.alarm(1) 2920 # Either the reentrant call to wio.write() fails with RuntimeError, 2921 # or the signal handler raises ZeroDivisionError. 2922 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm: 2923 while 1: 2924 for i in range(100): 2925 wio.write(data) 2926 wio.flush() 2927 # Make sure the buffer doesn't fill up and block further writes 2928 os.read(r, len(data) * 100) 2929 exc = cm.exception 2930 if isinstance(exc, RuntimeError): 2931 self.assertTrue(str(exc).startswith("reentrant call"), str(exc)) 2932 finally: 2933 wio.close() 2934 os.close(r) 2935 2936 def test_reentrant_write_buffered(self): 2937 self.check_reentrant_write(b"xy", mode="wb") 2938 2939 def test_reentrant_write_text(self): 2940 self.check_reentrant_write("xy", mode="w", encoding="ascii") 2941 2942 def check_interrupted_read_retry(self, decode, **fdopen_kwargs): 2943 """Check that a buffered read, when it gets interrupted (either 2944 returning a partial result or EINTR), properly invokes the signal 2945 handler and retries if the latter returned successfully.""" 2946 r, w = os.pipe() 2947 fdopen_kwargs["closefd"] = False 2948 def alarm_handler(sig, frame): 2949 os.write(w, b"bar") 2950 signal.signal(signal.SIGALRM, alarm_handler) 2951 try: 2952 rio = self.io.open(r, **fdopen_kwargs) 2953 os.write(w, b"foo") 2954 signal.alarm(1) 2955 # Expected behaviour: 2956 # - first raw read() returns partial b"foo" 2957 # - second raw read() returns EINTR 2958 # - third raw read() returns b"bar" 2959 self.assertEqual(decode(rio.read(6)), "foobar") 2960 finally: 2961 rio.close() 2962 os.close(w) 2963 os.close(r) 2964 2965 def test_interrupterd_read_retry_buffered(self): 2966 self.check_interrupted_read_retry(lambda x: x.decode('latin1'), 2967 mode="rb") 2968 2969 def test_interrupterd_read_retry_text(self): 2970 self.check_interrupted_read_retry(lambda x: x, 2971 mode="r") 2972 2973 @unittest.skipUnless(threading, 'Threading required for this test.') 2974 def check_interrupted_write_retry(self, item, **fdopen_kwargs): 2975 """Check that a buffered write, when it gets interrupted (either 2976 returning a partial result or EINTR), properly invokes the signal 2977 handler and retries if the latter returned successfully.""" 2978 select = support.import_module("select") 2979 # A quantity that exceeds the buffer size of an anonymous pipe's 2980 # write end. 2981 N = support.PIPE_MAX_SIZE 2982 r, w = os.pipe() 2983 fdopen_kwargs["closefd"] = False 2984 # We need a separate thread to read from the pipe and allow the 2985 # write() to finish. This thread is started after the SIGALRM is 2986 # received (forcing a first EINTR in write()). 2987 read_results = [] 2988 write_finished = False 2989 def _read(): 2990 while not write_finished: 2991 while r in select.select([r], [], [], 1.0)[0]: 2992 s = os.read(r, 1024) 2993 read_results.append(s) 2994 t = threading.Thread(target=_read) 2995 t.daemon = True 2996 def alarm1(sig, frame): 2997 signal.signal(signal.SIGALRM, alarm2) 2998 signal.alarm(1) 2999 def alarm2(sig, frame): 3000 t.start() 3001 signal.signal(signal.SIGALRM, alarm1) 3002 try: 3003 wio = self.io.open(w, **fdopen_kwargs) 3004 signal.alarm(1) 3005 # Expected behaviour: 3006 # - first raw write() is partial (because of the limited pipe buffer 3007 # and the first alarm) 3008 # - second raw write() returns EINTR (because of the second alarm) 3009 # - subsequent write()s are successful (either partial or complete) 3010 self.assertEqual(N, wio.write(item * N)) 3011 wio.flush() 3012 write_finished = True 3013 t.join() 3014 self.assertEqual(N, sum(len(x) for x in read_results)) 3015 finally: 3016 write_finished = True 3017 os.close(w) 3018 os.close(r) 3019 # This is deliberate. If we didn't close the file descriptor 3020 # before closing wio, wio would try to flush its internal 3021 # buffer, and could block (in case of failure). 3022 try: 3023 wio.close() 3024 except IOError as e: 3025 if e.errno != errno.EBADF: 3026 raise 3027 3028 def test_interrupterd_write_retry_buffered(self): 3029 self.check_interrupted_write_retry(b"x", mode="wb") 3030 3031 def test_interrupterd_write_retry_text(self): 3032 self.check_interrupted_write_retry("x", mode="w", encoding="latin1") 3033 3034 3035class CSignalsTest(SignalsTest): 3036 io = io 3037 3038class PySignalsTest(SignalsTest): 3039 io = pyio 3040 3041 # Handling reentrancy issues would slow down _pyio even more, so the 3042 # tests are disabled. 3043 test_reentrant_write_buffered = None 3044 test_reentrant_write_text = None 3045 3046 3047def test_main(): 3048 tests = (CIOTest, PyIOTest, 3049 CBufferedReaderTest, PyBufferedReaderTest, 3050 CBufferedWriterTest, PyBufferedWriterTest, 3051 CBufferedRWPairTest, PyBufferedRWPairTest, 3052 CBufferedRandomTest, PyBufferedRandomTest, 3053 StatefulIncrementalDecoderTest, 3054 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest, 3055 CTextIOWrapperTest, PyTextIOWrapperTest, 3056 CMiscIOTest, PyMiscIOTest, 3057 CSignalsTest, PySignalsTest, 3058 ) 3059 3060 # Put the namespaces of the IO module we are testing and some useful mock 3061 # classes in the __dict__ of each test. 3062 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO, 3063 MockNonBlockWriterIO, MockRawIOWithoutRead) 3064 all_members = io.__all__ + ["IncrementalNewlineDecoder"] 3065 c_io_ns = dict((name, getattr(io, name)) for name in all_members) 3066 py_io_ns = dict((name, getattr(pyio, name)) for name in all_members) 3067 globs = globals() 3068 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks) 3069 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks) 3070 # Avoid turning open into a bound method. 3071 py_io_ns["open"] = pyio.OpenWrapper 3072 for test in tests: 3073 if test.__name__.startswith("C"): 3074 for name, obj in c_io_ns.items(): 3075 setattr(test, name, obj) 3076 elif test.__name__.startswith("Py"): 3077 for name, obj in py_io_ns.items(): 3078 setattr(test, name, obj) 3079 3080 support.run_unittest(*tests) 3081 3082if __name__ == "__main__": 3083 test_main() 3084