test_file2k.py revision b0acc1b0a35905fa8dc3d3d10581602b5129c87a
1import sys 2import os 3import unittest 4import itertools 5import select 6import signal 7import subprocess 8import time 9from array import array 10from weakref import proxy 11try: 12 import threading 13except ImportError: 14 threading = None 15 16from test import test_support 17from test.test_support import TESTFN, run_unittest 18from UserList import UserList 19 20class AutoFileTests(unittest.TestCase): 21 # file tests for which a test file is automatically set up 22 23 def setUp(self): 24 self.f = open(TESTFN, 'wb') 25 26 def tearDown(self): 27 if self.f: 28 self.f.close() 29 os.remove(TESTFN) 30 31 def testWeakRefs(self): 32 # verify weak references 33 p = proxy(self.f) 34 p.write('teststring') 35 self.assertEqual(self.f.tell(), p.tell()) 36 self.f.close() 37 self.f = None 38 self.assertRaises(ReferenceError, getattr, p, 'tell') 39 40 def testAttributes(self): 41 # verify expected attributes exist 42 f = self.f 43 with test_support.check_py3k_warnings(): 44 softspace = f.softspace 45 f.name # merely shouldn't blow up 46 f.mode # ditto 47 f.closed # ditto 48 49 with test_support.check_py3k_warnings(): 50 # verify softspace is writable 51 f.softspace = softspace # merely shouldn't blow up 52 53 # verify the others aren't 54 for attr in 'name', 'mode', 'closed': 55 self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops') 56 57 def testReadinto(self): 58 # verify readinto 59 self.f.write('12') 60 self.f.close() 61 a = array('c', 'x'*10) 62 self.f = open(TESTFN, 'rb') 63 n = self.f.readinto(a) 64 self.assertEqual('12', a.tostring()[:n]) 65 66 def testWritelinesUserList(self): 67 # verify writelines with instance sequence 68 l = UserList(['1', '2']) 69 self.f.writelines(l) 70 self.f.close() 71 self.f = open(TESTFN, 'rb') 72 buf = self.f.read() 73 self.assertEqual(buf, '12') 74 75 def testWritelinesIntegers(self): 76 # verify writelines with integers 77 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3]) 78 79 def testWritelinesIntegersUserList(self): 80 # verify writelines with integers in UserList 81 l = UserList([1,2,3]) 82 self.assertRaises(TypeError, self.f.writelines, l) 83 84 def testWritelinesNonString(self): 85 # verify writelines with non-string object 86 class NonString: 87 pass 88 89 self.assertRaises(TypeError, self.f.writelines, 90 [NonString(), NonString()]) 91 92 def testWritelinesBuffer(self): 93 self.f.writelines([array('c', 'abc')]) 94 self.f.close() 95 self.f = open(TESTFN, 'rb') 96 buf = self.f.read() 97 self.assertEqual(buf, 'abc') 98 99 def testRepr(self): 100 # verify repr works 101 self.assertTrue(repr(self.f).startswith("<open file '" + TESTFN)) 102 # see issue #14161 103 # Windows doesn't like \r\n\t" in the file name, but ' is ok 104 fname = 'xx\rxx\nxx\'xx"xx' if sys.platform != "win32" else "xx'xx" 105 with open(fname, 'w') as f: 106 self.addCleanup(os.remove, fname) 107 self.assertTrue(repr(f).startswith( 108 "<open file %r, mode 'w' at" % fname)) 109 110 def testErrors(self): 111 self.f.close() 112 self.f = open(TESTFN, 'rb') 113 f = self.f 114 self.assertEqual(f.name, TESTFN) 115 self.assertTrue(not f.isatty()) 116 self.assertTrue(not f.closed) 117 118 self.assertRaises(TypeError, f.readinto, "") 119 f.close() 120 self.assertTrue(f.closed) 121 122 def testMethods(self): 123 methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto', 124 'readline', 'readlines', 'seek', 'tell', 'truncate', 125 'write', '__iter__'] 126 deprecated_methods = ['xreadlines'] 127 if sys.platform.startswith('atheos'): 128 methods.remove('truncate') 129 130 # __exit__ should close the file 131 self.f.__exit__(None, None, None) 132 self.assertTrue(self.f.closed) 133 134 for methodname in methods: 135 method = getattr(self.f, methodname) 136 # should raise on closed file 137 self.assertRaises(ValueError, method) 138 with test_support.check_py3k_warnings(): 139 for methodname in deprecated_methods: 140 method = getattr(self.f, methodname) 141 self.assertRaises(ValueError, method) 142 self.assertRaises(ValueError, self.f.writelines, []) 143 144 # file is closed, __exit__ shouldn't do anything 145 self.assertEqual(self.f.__exit__(None, None, None), None) 146 # it must also return None if an exception was given 147 try: 148 1 // 0 149 except: 150 self.assertEqual(self.f.__exit__(*sys.exc_info()), None) 151 152 def testReadWhenWriting(self): 153 self.assertRaises(IOError, self.f.read) 154 155 def testNastyWritelinesGenerator(self): 156 def nasty(): 157 for i in range(5): 158 if i == 3: 159 self.f.close() 160 yield str(i) 161 self.assertRaises(ValueError, self.f.writelines, nasty()) 162 163 def testIssue5677(self): 164 # Remark: Do not perform more than one test per open file, 165 # since that does NOT catch the readline error on Windows. 166 data = 'xxx' 167 for mode in ['w', 'wb', 'a', 'ab']: 168 for attr in ['read', 'readline', 'readlines']: 169 self.f = open(TESTFN, mode) 170 self.f.write(data) 171 self.assertRaises(IOError, getattr(self.f, attr)) 172 self.f.close() 173 174 self.f = open(TESTFN, mode) 175 self.f.write(data) 176 self.assertRaises(IOError, lambda: [line for line in self.f]) 177 self.f.close() 178 179 self.f = open(TESTFN, mode) 180 self.f.write(data) 181 self.assertRaises(IOError, self.f.readinto, bytearray(len(data))) 182 self.f.close() 183 184 for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']: 185 self.f = open(TESTFN, mode) 186 self.assertRaises(IOError, self.f.write, data) 187 self.f.close() 188 189 self.f = open(TESTFN, mode) 190 self.assertRaises(IOError, self.f.writelines, [data, data]) 191 self.f.close() 192 193 self.f = open(TESTFN, mode) 194 self.assertRaises(IOError, self.f.truncate) 195 self.f.close() 196 197class OtherFileTests(unittest.TestCase): 198 199 def testOpenDir(self): 200 this_dir = os.path.dirname(__file__) or os.curdir 201 for mode in (None, "w"): 202 try: 203 if mode: 204 f = open(this_dir, mode) 205 else: 206 f = open(this_dir) 207 except IOError as e: 208 self.assertEqual(e.filename, this_dir) 209 else: 210 self.fail("opening a directory didn't raise an IOError") 211 212 def testModeStrings(self): 213 # check invalid mode strings 214 for mode in ("", "aU", "wU+"): 215 try: 216 f = open(TESTFN, mode) 217 except ValueError: 218 pass 219 else: 220 f.close() 221 self.fail('%r is an invalid file mode' % mode) 222 223 # Some invalid modes fail on Windows, but pass on Unix 224 # Issue3965: avoid a crash on Windows when filename is unicode 225 for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')): 226 try: 227 f = open(name, "rr") 228 except (IOError, ValueError): 229 pass 230 else: 231 f.close() 232 233 def testStdin(self): 234 # This causes the interpreter to exit on OSF1 v5.1. 235 if sys.platform != 'osf1V5': 236 self.assertRaises(IOError, sys.stdin.seek, -1) 237 else: 238 print >>sys.__stdout__, ( 239 ' Skipping sys.stdin.seek(-1), it may crash the interpreter.' 240 ' Test manually.') 241 self.assertRaises(IOError, sys.stdin.truncate) 242 243 def testUnicodeOpen(self): 244 # verify repr works for unicode too 245 f = open(unicode(TESTFN), "w") 246 self.assertTrue(repr(f).startswith("<open file u'" + TESTFN)) 247 f.close() 248 os.unlink(TESTFN) 249 250 def testBadModeArgument(self): 251 # verify that we get a sensible error message for bad mode argument 252 bad_mode = "qwerty" 253 try: 254 f = open(TESTFN, bad_mode) 255 except ValueError, msg: 256 if msg.args[0] != 0: 257 s = str(msg) 258 if TESTFN in s or bad_mode not in s: 259 self.fail("bad error message for invalid mode: %s" % s) 260 # if msg.args[0] == 0, we're probably on Windows where there may 261 # be no obvious way to discover why open() failed. 262 else: 263 f.close() 264 self.fail("no error for invalid mode: %s" % bad_mode) 265 266 def testSetBufferSize(self): 267 # make sure that explicitly setting the buffer size doesn't cause 268 # misbehaviour especially with repeated close() calls 269 for s in (-1, 0, 1, 512): 270 try: 271 f = open(TESTFN, 'w', s) 272 f.write(str(s)) 273 f.close() 274 f.close() 275 f = open(TESTFN, 'r', s) 276 d = int(f.read()) 277 f.close() 278 f.close() 279 except IOError, msg: 280 self.fail('error setting buffer size %d: %s' % (s, str(msg))) 281 self.assertEqual(d, s) 282 283 def testTruncateOnWindows(self): 284 os.unlink(TESTFN) 285 286 def bug801631(): 287 # SF bug <http://www.python.org/sf/801631> 288 # "file.truncate fault on windows" 289 f = open(TESTFN, 'wb') 290 f.write('12345678901') # 11 bytes 291 f.close() 292 293 f = open(TESTFN,'rb+') 294 data = f.read(5) 295 if data != '12345': 296 self.fail("Read on file opened for update failed %r" % data) 297 if f.tell() != 5: 298 self.fail("File pos after read wrong %d" % f.tell()) 299 300 f.truncate() 301 if f.tell() != 5: 302 self.fail("File pos after ftruncate wrong %d" % f.tell()) 303 304 f.close() 305 size = os.path.getsize(TESTFN) 306 if size != 5: 307 self.fail("File size after ftruncate wrong %d" % size) 308 309 try: 310 bug801631() 311 finally: 312 os.unlink(TESTFN) 313 314 def testIteration(self): 315 # Test the complex interaction when mixing file-iteration and the 316 # various read* methods. Ostensibly, the mixture could just be tested 317 # to work when it should work according to the Python language, 318 # instead of fail when it should fail according to the current CPython 319 # implementation. People don't always program Python the way they 320 # should, though, and the implemenation might change in subtle ways, 321 # so we explicitly test for errors, too; the test will just have to 322 # be updated when the implementation changes. 323 dataoffset = 16384 324 filler = "ham\n" 325 assert not dataoffset % len(filler), \ 326 "dataoffset must be multiple of len(filler)" 327 nchunks = dataoffset // len(filler) 328 testlines = [ 329 "spam, spam and eggs\n", 330 "eggs, spam, ham and spam\n", 331 "saussages, spam, spam and eggs\n", 332 "spam, ham, spam and eggs\n", 333 "spam, spam, spam, spam, spam, ham, spam\n", 334 "wonderful spaaaaaam.\n" 335 ] 336 methods = [("readline", ()), ("read", ()), ("readlines", ()), 337 ("readinto", (array("c", " "*100),))] 338 339 try: 340 # Prepare the testfile 341 bag = open(TESTFN, "w") 342 bag.write(filler * nchunks) 343 bag.writelines(testlines) 344 bag.close() 345 # Test for appropriate errors mixing read* and iteration 346 for methodname, args in methods: 347 f = open(TESTFN) 348 if f.next() != filler: 349 self.fail, "Broken testfile" 350 meth = getattr(f, methodname) 351 try: 352 meth(*args) 353 except ValueError: 354 pass 355 else: 356 self.fail("%s%r after next() didn't raise ValueError" % 357 (methodname, args)) 358 f.close() 359 360 # Test to see if harmless (by accident) mixing of read* and 361 # iteration still works. This depends on the size of the internal 362 # iteration buffer (currently 8192,) but we can test it in a 363 # flexible manner. Each line in the bag o' ham is 4 bytes 364 # ("h", "a", "m", "\n"), so 4096 lines of that should get us 365 # exactly on the buffer boundary for any power-of-2 buffersize 366 # between 4 and 16384 (inclusive). 367 f = open(TESTFN) 368 for i in range(nchunks): 369 f.next() 370 testline = testlines.pop(0) 371 try: 372 line = f.readline() 373 except ValueError: 374 self.fail("readline() after next() with supposedly empty " 375 "iteration-buffer failed anyway") 376 if line != testline: 377 self.fail("readline() after next() with empty buffer " 378 "failed. Got %r, expected %r" % (line, testline)) 379 testline = testlines.pop(0) 380 buf = array("c", "\x00" * len(testline)) 381 try: 382 f.readinto(buf) 383 except ValueError: 384 self.fail("readinto() after next() with supposedly empty " 385 "iteration-buffer failed anyway") 386 line = buf.tostring() 387 if line != testline: 388 self.fail("readinto() after next() with empty buffer " 389 "failed. Got %r, expected %r" % (line, testline)) 390 391 testline = testlines.pop(0) 392 try: 393 line = f.read(len(testline)) 394 except ValueError: 395 self.fail("read() after next() with supposedly empty " 396 "iteration-buffer failed anyway") 397 if line != testline: 398 self.fail("read() after next() with empty buffer " 399 "failed. Got %r, expected %r" % (line, testline)) 400 try: 401 lines = f.readlines() 402 except ValueError: 403 self.fail("readlines() after next() with supposedly empty " 404 "iteration-buffer failed anyway") 405 if lines != testlines: 406 self.fail("readlines() after next() with empty buffer " 407 "failed. Got %r, expected %r" % (line, testline)) 408 # Reading after iteration hit EOF shouldn't hurt either 409 f = open(TESTFN) 410 try: 411 for line in f: 412 pass 413 try: 414 f.readline() 415 f.readinto(buf) 416 f.read() 417 f.readlines() 418 except ValueError: 419 self.fail("read* failed after next() consumed file") 420 finally: 421 f.close() 422 finally: 423 os.unlink(TESTFN) 424 425 @unittest.skipUnless(os.name == 'posix', 'test requires a posix system.') 426 def test_write_full(self): 427 # Issue #17976 428 try: 429 f = open('/dev/full', 'w', 1) 430 except IOError: 431 self.skipTest("requires '/dev/full'") 432 try: 433 with self.assertRaises(IOError): 434 f.write('hello') 435 f.write('\n') 436 finally: 437 f.close() 438 439class FileSubclassTests(unittest.TestCase): 440 441 def testExit(self): 442 # test that exiting with context calls subclass' close 443 class C(file): 444 def __init__(self, *args): 445 self.subclass_closed = False 446 file.__init__(self, *args) 447 def close(self): 448 self.subclass_closed = True 449 file.close(self) 450 451 with C(TESTFN, 'w') as f: 452 pass 453 self.assertTrue(f.subclass_closed) 454 455 456@unittest.skipUnless(threading, 'Threading required for this test.') 457class FileThreadingTests(unittest.TestCase): 458 # These tests check the ability to call various methods of file objects 459 # (including close()) concurrently without crashing the Python interpreter. 460 # See #815646, #595601 461 462 def setUp(self): 463 self._threads = test_support.threading_setup() 464 self.f = None 465 self.filename = TESTFN 466 with open(self.filename, "w") as f: 467 f.write("\n".join("0123456789")) 468 self._count_lock = threading.Lock() 469 self.close_count = 0 470 self.close_success_count = 0 471 self.use_buffering = False 472 473 def tearDown(self): 474 if self.f: 475 try: 476 self.f.close() 477 except (EnvironmentError, ValueError): 478 pass 479 try: 480 os.remove(self.filename) 481 except EnvironmentError: 482 pass 483 test_support.threading_cleanup(*self._threads) 484 485 def _create_file(self): 486 if self.use_buffering: 487 self.f = open(self.filename, "w+", buffering=1024*16) 488 else: 489 self.f = open(self.filename, "w+") 490 491 def _close_file(self): 492 with self._count_lock: 493 self.close_count += 1 494 self.f.close() 495 with self._count_lock: 496 self.close_success_count += 1 497 498 def _close_and_reopen_file(self): 499 self._close_file() 500 # if close raises an exception thats fine, self.f remains valid so 501 # we don't need to reopen. 502 self._create_file() 503 504 def _run_workers(self, func, nb_workers, duration=0.2): 505 with self._count_lock: 506 self.close_count = 0 507 self.close_success_count = 0 508 self.do_continue = True 509 threads = [] 510 try: 511 for i in range(nb_workers): 512 t = threading.Thread(target=func) 513 t.start() 514 threads.append(t) 515 for _ in xrange(100): 516 time.sleep(duration/100) 517 with self._count_lock: 518 if self.close_count-self.close_success_count > nb_workers+1: 519 if test_support.verbose: 520 print 'Q', 521 break 522 time.sleep(duration) 523 finally: 524 self.do_continue = False 525 for t in threads: 526 t.join() 527 528 def _test_close_open_io(self, io_func, nb_workers=5): 529 def worker(): 530 self._create_file() 531 funcs = itertools.cycle(( 532 lambda: io_func(), 533 lambda: self._close_and_reopen_file(), 534 )) 535 for f in funcs: 536 if not self.do_continue: 537 break 538 try: 539 f() 540 except (IOError, ValueError): 541 pass 542 self._run_workers(worker, nb_workers) 543 if test_support.verbose: 544 # Useful verbose statistics when tuning this test to take 545 # less time to run but still ensuring that its still useful. 546 # 547 # the percent of close calls that raised an error 548 percent = 100. - 100.*self.close_success_count/self.close_count 549 print self.close_count, ('%.4f ' % percent), 550 551 def test_close_open(self): 552 def io_func(): 553 pass 554 self._test_close_open_io(io_func) 555 556 def test_close_open_flush(self): 557 def io_func(): 558 self.f.flush() 559 self._test_close_open_io(io_func) 560 561 def test_close_open_iter(self): 562 def io_func(): 563 list(iter(self.f)) 564 self._test_close_open_io(io_func) 565 566 def test_close_open_isatty(self): 567 def io_func(): 568 self.f.isatty() 569 self._test_close_open_io(io_func) 570 571 def test_close_open_print(self): 572 def io_func(): 573 print >> self.f, '' 574 self._test_close_open_io(io_func) 575 576 def test_close_open_print_buffered(self): 577 self.use_buffering = True 578 def io_func(): 579 print >> self.f, '' 580 self._test_close_open_io(io_func) 581 582 def test_close_open_read(self): 583 def io_func(): 584 self.f.read(0) 585 self._test_close_open_io(io_func) 586 587 def test_close_open_readinto(self): 588 def io_func(): 589 a = array('c', 'xxxxx') 590 self.f.readinto(a) 591 self._test_close_open_io(io_func) 592 593 def test_close_open_readline(self): 594 def io_func(): 595 self.f.readline() 596 self._test_close_open_io(io_func) 597 598 def test_close_open_readlines(self): 599 def io_func(): 600 self.f.readlines() 601 self._test_close_open_io(io_func) 602 603 def test_close_open_seek(self): 604 def io_func(): 605 self.f.seek(0, 0) 606 self._test_close_open_io(io_func) 607 608 def test_close_open_tell(self): 609 def io_func(): 610 self.f.tell() 611 self._test_close_open_io(io_func) 612 613 def test_close_open_truncate(self): 614 def io_func(): 615 self.f.truncate() 616 self._test_close_open_io(io_func) 617 618 def test_close_open_write(self): 619 def io_func(): 620 self.f.write('') 621 self._test_close_open_io(io_func) 622 623 def test_close_open_writelines(self): 624 def io_func(): 625 self.f.writelines('') 626 self._test_close_open_io(io_func) 627 628 629@unittest.skipUnless(os.name == 'posix', 'test requires a posix system.') 630class TestFileSignalEINTR(unittest.TestCase): 631 def _test_reading(self, data_to_write, read_and_verify_code, method_name, 632 universal_newlines=False): 633 """Generic buffered read method test harness to verify EINTR behavior. 634 635 Also validates that Python signal handlers are run during the read. 636 637 Args: 638 data_to_write: String to write to the child process for reading 639 before sending it a signal, confirming the signal was handled, 640 writing a final newline char and closing the infile pipe. 641 read_and_verify_code: Single "line" of code to read from a file 642 object named 'infile' and validate the result. This will be 643 executed as part of a python subprocess fed data_to_write. 644 method_name: The name of the read method being tested, for use in 645 an error message on failure. 646 universal_newlines: If True, infile will be opened in universal 647 newline mode in the child process. 648 """ 649 if universal_newlines: 650 # Test the \r\n -> \n conversion while we're at it. 651 data_to_write = data_to_write.replace('\n', '\r\n') 652 infile_setup_code = 'infile = os.fdopen(sys.stdin.fileno(), "rU")' 653 else: 654 infile_setup_code = 'infile = sys.stdin' 655 # Total pipe IO in this function is smaller than the minimum posix OS 656 # pipe buffer size of 512 bytes. No writer should block. 657 assert len(data_to_write) < 512, 'data_to_write must fit in pipe buf.' 658 659 child_code = ( 660 'import os, signal, sys ;' 661 'signal.signal(' 662 'signal.SIGINT, lambda s, f: sys.stderr.write("$\\n")) ;' 663 + infile_setup_code + ' ;' + 664 'assert isinstance(infile, file) ;' 665 'sys.stderr.write("Go.\\n") ;' 666 + read_and_verify_code) 667 reader_process = subprocess.Popen( 668 [sys.executable, '-c', child_code], 669 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 670 stderr=subprocess.PIPE) 671 # Wait for the signal handler to be installed. 672 go = reader_process.stderr.read(4) 673 if go != 'Go.\n': 674 reader_process.kill() 675 self.fail('Error from %s process while awaiting "Go":\n%s' % ( 676 method_name, go+reader_process.stderr.read())) 677 reader_process.stdin.write(data_to_write) 678 signals_sent = 0 679 rlist = [] 680 # We don't know when the read_and_verify_code in our child is actually 681 # executing within the read system call we want to interrupt. This 682 # loop waits for a bit before sending the first signal to increase 683 # the likelihood of that. Implementations without correct EINTR 684 # and signal handling usually fail this test. 685 while not rlist: 686 rlist, _, _ = select.select([reader_process.stderr], (), (), 0.05) 687 reader_process.send_signal(signal.SIGINT) 688 # Give the subprocess time to handle it before we loop around and 689 # send another one. On OSX the second signal happening close to 690 # immediately after the first was causing the subprocess to crash 691 # via the OS's default SIGINT handler. 692 time.sleep(0.1) 693 signals_sent += 1 694 if signals_sent > 200: 695 reader_process.kill() 696 self.fail("failed to handle signal during %s." % method_name) 697 # This assumes anything unexpected that writes to stderr will also 698 # write a newline. That is true of the traceback printing code. 699 signal_line = reader_process.stderr.readline() 700 if signal_line != '$\n': 701 reader_process.kill() 702 self.fail('Error from %s process while awaiting signal:\n%s' % ( 703 method_name, signal_line+reader_process.stderr.read())) 704 # We append a newline to our input so that a readline call can 705 # end on its own before the EOF is seen. 706 stdout, stderr = reader_process.communicate(input='\n') 707 if reader_process.returncode != 0: 708 self.fail('%s() process exited rc=%d.\nSTDOUT:\n%s\nSTDERR:\n%s' % ( 709 method_name, reader_process.returncode, stdout, stderr)) 710 711 def test_readline(self, universal_newlines=False): 712 """file.readline must handle signals and not lose data.""" 713 self._test_reading( 714 data_to_write='hello, world!', 715 read_and_verify_code=( 716 'line = infile.readline() ;' 717 'expected_line = "hello, world!\\n" ;' 718 'assert line == expected_line, (' 719 '"read %r expected %r" % (line, expected_line))' 720 ), 721 method_name='readline', 722 universal_newlines=universal_newlines) 723 724 def test_readline_with_universal_newlines(self): 725 self.test_readline(universal_newlines=True) 726 727 def test_readlines(self, universal_newlines=False): 728 """file.readlines must handle signals and not lose data.""" 729 self._test_reading( 730 data_to_write='hello\nworld!', 731 read_and_verify_code=( 732 'lines = infile.readlines() ;' 733 'expected_lines = ["hello\\n", "world!\\n"] ;' 734 'assert lines == expected_lines, (' 735 '"readlines returned wrong data.\\n" ' 736 '"got lines %r\\nexpected %r" ' 737 '% (lines, expected_lines))' 738 ), 739 method_name='readlines', 740 universal_newlines=universal_newlines) 741 742 def test_readlines_with_universal_newlines(self): 743 self.test_readlines(universal_newlines=True) 744 745 def test_readall(self): 746 """Unbounded file.read() must handle signals and not lose data.""" 747 self._test_reading( 748 data_to_write='hello, world!abcdefghijklm', 749 read_and_verify_code=( 750 'data = infile.read() ;' 751 'expected_data = "hello, world!abcdefghijklm\\n";' 752 'assert data == expected_data, (' 753 '"read %r expected %r" % (data, expected_data))' 754 ), 755 method_name='unbounded read') 756 757 def test_readinto(self): 758 """file.readinto must handle signals and not lose data.""" 759 self._test_reading( 760 data_to_write='hello, world!', 761 read_and_verify_code=( 762 'data = bytearray(50) ;' 763 'num_read = infile.readinto(data) ;' 764 'expected_data = "hello, world!\\n";' 765 'assert data[:num_read] == expected_data, (' 766 '"read %r expected %r" % (data, expected_data))' 767 ), 768 method_name='readinto') 769 770 771class StdoutTests(unittest.TestCase): 772 773 def test_move_stdout_on_write(self): 774 # Issue 3242: sys.stdout can be replaced (and freed) during a 775 # print statement; prevent a segfault in this case 776 save_stdout = sys.stdout 777 778 class File: 779 def write(self, data): 780 if '\n' in data: 781 sys.stdout = save_stdout 782 783 try: 784 sys.stdout = File() 785 print "some text" 786 finally: 787 sys.stdout = save_stdout 788 789 def test_del_stdout_before_print(self): 790 # Issue 4597: 'print' with no argument wasn't reporting when 791 # sys.stdout was deleted. 792 save_stdout = sys.stdout 793 del sys.stdout 794 try: 795 print 796 except RuntimeError as e: 797 self.assertEqual(str(e), "lost sys.stdout") 798 else: 799 self.fail("Expected RuntimeError") 800 finally: 801 sys.stdout = save_stdout 802 803 def test_unicode(self): 804 import subprocess 805 806 def get_message(encoding, *code): 807 code = '\n'.join(code) 808 env = os.environ.copy() 809 env['PYTHONIOENCODING'] = encoding 810 process = subprocess.Popen([sys.executable, "-c", code], 811 stdout=subprocess.PIPE, env=env) 812 stdout, stderr = process.communicate() 813 self.assertEqual(process.returncode, 0) 814 return stdout 815 816 def check_message(text, encoding, expected): 817 stdout = get_message(encoding, 818 "import sys", 819 "sys.stdout.write(%r)" % text, 820 "sys.stdout.flush()") 821 self.assertEqual(stdout, expected) 822 823 # test the encoding 824 check_message(u'15\u20ac', "iso-8859-15", "15\xa4") 825 check_message(u'15\u20ac', "utf-8", '15\xe2\x82\xac') 826 check_message(u'15\u20ac', "utf-16-le", '1\x005\x00\xac\x20') 827 828 # test the error handler 829 check_message(u'15\u20ac', "iso-8859-1:ignore", "15") 830 check_message(u'15\u20ac', "iso-8859-1:replace", "15?") 831 check_message(u'15\u20ac', "iso-8859-1:backslashreplace", "15\\u20ac") 832 833 # test the buffer API 834 for objtype in ('buffer', 'bytearray'): 835 stdout = get_message('ascii', 836 'import sys', 837 r'sys.stdout.write(%s("\xe9"))' % objtype, 838 'sys.stdout.flush()') 839 self.assertEqual(stdout, "\xe9") 840 841 842def test_main(): 843 # Historically, these tests have been sloppy about removing TESTFN. 844 # So get rid of it no matter what. 845 try: 846 run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests, 847 FileThreadingTests, TestFileSignalEINTR, StdoutTests) 848 finally: 849 if os.path.exists(TESTFN): 850 os.unlink(TESTFN) 851 852if __name__ == '__main__': 853 test_main() 854