test_file2k.py revision b0acc1b0a35905fa8dc3d3d10581602b5129c87a
1import sys
2import os
3import unittest
4import itertools
5import select
6import signal
7import subprocess
8import time
9from array import array
10from weakref import proxy
11try:
12    import threading
13except ImportError:
14    threading = None
15
16from test import test_support
17from test.test_support import TESTFN, run_unittest
18from UserList import UserList
19
20class AutoFileTests(unittest.TestCase):
21    # file tests for which a test file is automatically set up
22
23    def setUp(self):
24        self.f = open(TESTFN, 'wb')
25
26    def tearDown(self):
27        if self.f:
28            self.f.close()
29        os.remove(TESTFN)
30
31    def testWeakRefs(self):
32        # verify weak references
33        p = proxy(self.f)
34        p.write('teststring')
35        self.assertEqual(self.f.tell(), p.tell())
36        self.f.close()
37        self.f = None
38        self.assertRaises(ReferenceError, getattr, p, 'tell')
39
40    def testAttributes(self):
41        # verify expected attributes exist
42        f = self.f
43        with test_support.check_py3k_warnings():
44            softspace = f.softspace
45        f.name     # merely shouldn't blow up
46        f.mode     # ditto
47        f.closed   # ditto
48
49        with test_support.check_py3k_warnings():
50            # verify softspace is writable
51            f.softspace = softspace    # merely shouldn't blow up
52
53        # verify the others aren't
54        for attr in 'name', 'mode', 'closed':
55            self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
56
57    def testReadinto(self):
58        # verify readinto
59        self.f.write('12')
60        self.f.close()
61        a = array('c', 'x'*10)
62        self.f = open(TESTFN, 'rb')
63        n = self.f.readinto(a)
64        self.assertEqual('12', a.tostring()[:n])
65
66    def testWritelinesUserList(self):
67        # verify writelines with instance sequence
68        l = UserList(['1', '2'])
69        self.f.writelines(l)
70        self.f.close()
71        self.f = open(TESTFN, 'rb')
72        buf = self.f.read()
73        self.assertEqual(buf, '12')
74
75    def testWritelinesIntegers(self):
76        # verify writelines with integers
77        self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
78
79    def testWritelinesIntegersUserList(self):
80        # verify writelines with integers in UserList
81        l = UserList([1,2,3])
82        self.assertRaises(TypeError, self.f.writelines, l)
83
84    def testWritelinesNonString(self):
85        # verify writelines with non-string object
86        class NonString:
87            pass
88
89        self.assertRaises(TypeError, self.f.writelines,
90                          [NonString(), NonString()])
91
92    def testWritelinesBuffer(self):
93        self.f.writelines([array('c', 'abc')])
94        self.f.close()
95        self.f = open(TESTFN, 'rb')
96        buf = self.f.read()
97        self.assertEqual(buf, 'abc')
98
99    def testRepr(self):
100        # verify repr works
101        self.assertTrue(repr(self.f).startswith("<open file '" + TESTFN))
102        # see issue #14161
103        # Windows doesn't like \r\n\t" in the file name, but ' is ok
104        fname = 'xx\rxx\nxx\'xx"xx' if sys.platform != "win32" else "xx'xx"
105        with open(fname, 'w') as f:
106            self.addCleanup(os.remove, fname)
107            self.assertTrue(repr(f).startswith(
108                    "<open file %r, mode 'w' at" % fname))
109
110    def testErrors(self):
111        self.f.close()
112        self.f = open(TESTFN, 'rb')
113        f = self.f
114        self.assertEqual(f.name, TESTFN)
115        self.assertTrue(not f.isatty())
116        self.assertTrue(not f.closed)
117
118        self.assertRaises(TypeError, f.readinto, "")
119        f.close()
120        self.assertTrue(f.closed)
121
122    def testMethods(self):
123        methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
124                   'readline', 'readlines', 'seek', 'tell', 'truncate',
125                   'write', '__iter__']
126        deprecated_methods = ['xreadlines']
127        if sys.platform.startswith('atheos'):
128            methods.remove('truncate')
129
130        # __exit__ should close the file
131        self.f.__exit__(None, None, None)
132        self.assertTrue(self.f.closed)
133
134        for methodname in methods:
135            method = getattr(self.f, methodname)
136            # should raise on closed file
137            self.assertRaises(ValueError, method)
138        with test_support.check_py3k_warnings():
139            for methodname in deprecated_methods:
140                method = getattr(self.f, methodname)
141                self.assertRaises(ValueError, method)
142        self.assertRaises(ValueError, self.f.writelines, [])
143
144        # file is closed, __exit__ shouldn't do anything
145        self.assertEqual(self.f.__exit__(None, None, None), None)
146        # it must also return None if an exception was given
147        try:
148            1 // 0
149        except:
150            self.assertEqual(self.f.__exit__(*sys.exc_info()), None)
151
152    def testReadWhenWriting(self):
153        self.assertRaises(IOError, self.f.read)
154
155    def testNastyWritelinesGenerator(self):
156        def nasty():
157            for i in range(5):
158                if i == 3:
159                    self.f.close()
160                yield str(i)
161        self.assertRaises(ValueError, self.f.writelines, nasty())
162
163    def testIssue5677(self):
164        # Remark: Do not perform more than one test per open file,
165        # since that does NOT catch the readline error on Windows.
166        data = 'xxx'
167        for mode in ['w', 'wb', 'a', 'ab']:
168            for attr in ['read', 'readline', 'readlines']:
169                self.f = open(TESTFN, mode)
170                self.f.write(data)
171                self.assertRaises(IOError, getattr(self.f, attr))
172                self.f.close()
173
174            self.f = open(TESTFN, mode)
175            self.f.write(data)
176            self.assertRaises(IOError, lambda: [line for line in self.f])
177            self.f.close()
178
179            self.f = open(TESTFN, mode)
180            self.f.write(data)
181            self.assertRaises(IOError, self.f.readinto, bytearray(len(data)))
182            self.f.close()
183
184        for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']:
185            self.f = open(TESTFN, mode)
186            self.assertRaises(IOError, self.f.write, data)
187            self.f.close()
188
189            self.f = open(TESTFN, mode)
190            self.assertRaises(IOError, self.f.writelines, [data, data])
191            self.f.close()
192
193            self.f = open(TESTFN, mode)
194            self.assertRaises(IOError, self.f.truncate)
195            self.f.close()
196
197class OtherFileTests(unittest.TestCase):
198
199    def testOpenDir(self):
200        this_dir = os.path.dirname(__file__) or os.curdir
201        for mode in (None, "w"):
202            try:
203                if mode:
204                    f = open(this_dir, mode)
205                else:
206                    f = open(this_dir)
207            except IOError as e:
208                self.assertEqual(e.filename, this_dir)
209            else:
210                self.fail("opening a directory didn't raise an IOError")
211
212    def testModeStrings(self):
213        # check invalid mode strings
214        for mode in ("", "aU", "wU+"):
215            try:
216                f = open(TESTFN, mode)
217            except ValueError:
218                pass
219            else:
220                f.close()
221                self.fail('%r is an invalid file mode' % mode)
222
223        # Some invalid modes fail on Windows, but pass on Unix
224        # Issue3965: avoid a crash on Windows when filename is unicode
225        for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
226            try:
227                f = open(name, "rr")
228            except (IOError, ValueError):
229                pass
230            else:
231                f.close()
232
233    def testStdin(self):
234        # This causes the interpreter to exit on OSF1 v5.1.
235        if sys.platform != 'osf1V5':
236            self.assertRaises(IOError, sys.stdin.seek, -1)
237        else:
238            print >>sys.__stdout__, (
239                '  Skipping sys.stdin.seek(-1), it may crash the interpreter.'
240                ' Test manually.')
241        self.assertRaises(IOError, sys.stdin.truncate)
242
243    def testUnicodeOpen(self):
244        # verify repr works for unicode too
245        f = open(unicode(TESTFN), "w")
246        self.assertTrue(repr(f).startswith("<open file u'" + TESTFN))
247        f.close()
248        os.unlink(TESTFN)
249
250    def testBadModeArgument(self):
251        # verify that we get a sensible error message for bad mode argument
252        bad_mode = "qwerty"
253        try:
254            f = open(TESTFN, bad_mode)
255        except ValueError, msg:
256            if msg.args[0] != 0:
257                s = str(msg)
258                if TESTFN in s or bad_mode not in s:
259                    self.fail("bad error message for invalid mode: %s" % s)
260            # if msg.args[0] == 0, we're probably on Windows where there may
261            # be no obvious way to discover why open() failed.
262        else:
263            f.close()
264            self.fail("no error for invalid mode: %s" % bad_mode)
265
266    def testSetBufferSize(self):
267        # make sure that explicitly setting the buffer size doesn't cause
268        # misbehaviour especially with repeated close() calls
269        for s in (-1, 0, 1, 512):
270            try:
271                f = open(TESTFN, 'w', s)
272                f.write(str(s))
273                f.close()
274                f.close()
275                f = open(TESTFN, 'r', s)
276                d = int(f.read())
277                f.close()
278                f.close()
279            except IOError, msg:
280                self.fail('error setting buffer size %d: %s' % (s, str(msg)))
281            self.assertEqual(d, s)
282
283    def testTruncateOnWindows(self):
284        os.unlink(TESTFN)
285
286        def bug801631():
287            # SF bug <http://www.python.org/sf/801631>
288            # "file.truncate fault on windows"
289            f = open(TESTFN, 'wb')
290            f.write('12345678901')   # 11 bytes
291            f.close()
292
293            f = open(TESTFN,'rb+')
294            data = f.read(5)
295            if data != '12345':
296                self.fail("Read on file opened for update failed %r" % data)
297            if f.tell() != 5:
298                self.fail("File pos after read wrong %d" % f.tell())
299
300            f.truncate()
301            if f.tell() != 5:
302                self.fail("File pos after ftruncate wrong %d" % f.tell())
303
304            f.close()
305            size = os.path.getsize(TESTFN)
306            if size != 5:
307                self.fail("File size after ftruncate wrong %d" % size)
308
309        try:
310            bug801631()
311        finally:
312            os.unlink(TESTFN)
313
314    def testIteration(self):
315        # Test the complex interaction when mixing file-iteration and the
316        # various read* methods. Ostensibly, the mixture could just be tested
317        # to work when it should work according to the Python language,
318        # instead of fail when it should fail according to the current CPython
319        # implementation.  People don't always program Python the way they
320        # should, though, and the implemenation might change in subtle ways,
321        # so we explicitly test for errors, too; the test will just have to
322        # be updated when the implementation changes.
323        dataoffset = 16384
324        filler = "ham\n"
325        assert not dataoffset % len(filler), \
326            "dataoffset must be multiple of len(filler)"
327        nchunks = dataoffset // len(filler)
328        testlines = [
329            "spam, spam and eggs\n",
330            "eggs, spam, ham and spam\n",
331            "saussages, spam, spam and eggs\n",
332            "spam, ham, spam and eggs\n",
333            "spam, spam, spam, spam, spam, ham, spam\n",
334            "wonderful spaaaaaam.\n"
335        ]
336        methods = [("readline", ()), ("read", ()), ("readlines", ()),
337                   ("readinto", (array("c", " "*100),))]
338
339        try:
340            # Prepare the testfile
341            bag = open(TESTFN, "w")
342            bag.write(filler * nchunks)
343            bag.writelines(testlines)
344            bag.close()
345            # Test for appropriate errors mixing read* and iteration
346            for methodname, args in methods:
347                f = open(TESTFN)
348                if f.next() != filler:
349                    self.fail, "Broken testfile"
350                meth = getattr(f, methodname)
351                try:
352                    meth(*args)
353                except ValueError:
354                    pass
355                else:
356                    self.fail("%s%r after next() didn't raise ValueError" %
357                                     (methodname, args))
358                f.close()
359
360            # Test to see if harmless (by accident) mixing of read* and
361            # iteration still works. This depends on the size of the internal
362            # iteration buffer (currently 8192,) but we can test it in a
363            # flexible manner.  Each line in the bag o' ham is 4 bytes
364            # ("h", "a", "m", "\n"), so 4096 lines of that should get us
365            # exactly on the buffer boundary for any power-of-2 buffersize
366            # between 4 and 16384 (inclusive).
367            f = open(TESTFN)
368            for i in range(nchunks):
369                f.next()
370            testline = testlines.pop(0)
371            try:
372                line = f.readline()
373            except ValueError:
374                self.fail("readline() after next() with supposedly empty "
375                          "iteration-buffer failed anyway")
376            if line != testline:
377                self.fail("readline() after next() with empty buffer "
378                          "failed. Got %r, expected %r" % (line, testline))
379            testline = testlines.pop(0)
380            buf = array("c", "\x00" * len(testline))
381            try:
382                f.readinto(buf)
383            except ValueError:
384                self.fail("readinto() after next() with supposedly empty "
385                          "iteration-buffer failed anyway")
386            line = buf.tostring()
387            if line != testline:
388                self.fail("readinto() after next() with empty buffer "
389                          "failed. Got %r, expected %r" % (line, testline))
390
391            testline = testlines.pop(0)
392            try:
393                line = f.read(len(testline))
394            except ValueError:
395                self.fail("read() after next() with supposedly empty "
396                          "iteration-buffer failed anyway")
397            if line != testline:
398                self.fail("read() after next() with empty buffer "
399                          "failed. Got %r, expected %r" % (line, testline))
400            try:
401                lines = f.readlines()
402            except ValueError:
403                self.fail("readlines() after next() with supposedly empty "
404                          "iteration-buffer failed anyway")
405            if lines != testlines:
406                self.fail("readlines() after next() with empty buffer "
407                          "failed. Got %r, expected %r" % (line, testline))
408            # Reading after iteration hit EOF shouldn't hurt either
409            f = open(TESTFN)
410            try:
411                for line in f:
412                    pass
413                try:
414                    f.readline()
415                    f.readinto(buf)
416                    f.read()
417                    f.readlines()
418                except ValueError:
419                    self.fail("read* failed after next() consumed file")
420            finally:
421                f.close()
422        finally:
423            os.unlink(TESTFN)
424
425    @unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
426    def test_write_full(self):
427        # Issue #17976
428        try:
429            f = open('/dev/full', 'w', 1)
430        except IOError:
431            self.skipTest("requires '/dev/full'")
432        try:
433            with self.assertRaises(IOError):
434                f.write('hello')
435                f.write('\n')
436        finally:
437            f.close()
438
439class FileSubclassTests(unittest.TestCase):
440
441    def testExit(self):
442        # test that exiting with context calls subclass' close
443        class C(file):
444            def __init__(self, *args):
445                self.subclass_closed = False
446                file.__init__(self, *args)
447            def close(self):
448                self.subclass_closed = True
449                file.close(self)
450
451        with C(TESTFN, 'w') as f:
452            pass
453        self.assertTrue(f.subclass_closed)
454
455
456@unittest.skipUnless(threading, 'Threading required for this test.')
457class FileThreadingTests(unittest.TestCase):
458    # These tests check the ability to call various methods of file objects
459    # (including close()) concurrently without crashing the Python interpreter.
460    # See #815646, #595601
461
462    def setUp(self):
463        self._threads = test_support.threading_setup()
464        self.f = None
465        self.filename = TESTFN
466        with open(self.filename, "w") as f:
467            f.write("\n".join("0123456789"))
468        self._count_lock = threading.Lock()
469        self.close_count = 0
470        self.close_success_count = 0
471        self.use_buffering = False
472
473    def tearDown(self):
474        if self.f:
475            try:
476                self.f.close()
477            except (EnvironmentError, ValueError):
478                pass
479        try:
480            os.remove(self.filename)
481        except EnvironmentError:
482            pass
483        test_support.threading_cleanup(*self._threads)
484
485    def _create_file(self):
486        if self.use_buffering:
487            self.f = open(self.filename, "w+", buffering=1024*16)
488        else:
489            self.f = open(self.filename, "w+")
490
491    def _close_file(self):
492        with self._count_lock:
493            self.close_count += 1
494        self.f.close()
495        with self._count_lock:
496            self.close_success_count += 1
497
498    def _close_and_reopen_file(self):
499        self._close_file()
500        # if close raises an exception thats fine, self.f remains valid so
501        # we don't need to reopen.
502        self._create_file()
503
504    def _run_workers(self, func, nb_workers, duration=0.2):
505        with self._count_lock:
506            self.close_count = 0
507            self.close_success_count = 0
508        self.do_continue = True
509        threads = []
510        try:
511            for i in range(nb_workers):
512                t = threading.Thread(target=func)
513                t.start()
514                threads.append(t)
515            for _ in xrange(100):
516                time.sleep(duration/100)
517                with self._count_lock:
518                    if self.close_count-self.close_success_count > nb_workers+1:
519                        if test_support.verbose:
520                            print 'Q',
521                        break
522            time.sleep(duration)
523        finally:
524            self.do_continue = False
525            for t in threads:
526                t.join()
527
528    def _test_close_open_io(self, io_func, nb_workers=5):
529        def worker():
530            self._create_file()
531            funcs = itertools.cycle((
532                lambda: io_func(),
533                lambda: self._close_and_reopen_file(),
534            ))
535            for f in funcs:
536                if not self.do_continue:
537                    break
538                try:
539                    f()
540                except (IOError, ValueError):
541                    pass
542        self._run_workers(worker, nb_workers)
543        if test_support.verbose:
544            # Useful verbose statistics when tuning this test to take
545            # less time to run but still ensuring that its still useful.
546            #
547            # the percent of close calls that raised an error
548            percent = 100. - 100.*self.close_success_count/self.close_count
549            print self.close_count, ('%.4f ' % percent),
550
551    def test_close_open(self):
552        def io_func():
553            pass
554        self._test_close_open_io(io_func)
555
556    def test_close_open_flush(self):
557        def io_func():
558            self.f.flush()
559        self._test_close_open_io(io_func)
560
561    def test_close_open_iter(self):
562        def io_func():
563            list(iter(self.f))
564        self._test_close_open_io(io_func)
565
566    def test_close_open_isatty(self):
567        def io_func():
568            self.f.isatty()
569        self._test_close_open_io(io_func)
570
571    def test_close_open_print(self):
572        def io_func():
573            print >> self.f, ''
574        self._test_close_open_io(io_func)
575
576    def test_close_open_print_buffered(self):
577        self.use_buffering = True
578        def io_func():
579            print >> self.f, ''
580        self._test_close_open_io(io_func)
581
582    def test_close_open_read(self):
583        def io_func():
584            self.f.read(0)
585        self._test_close_open_io(io_func)
586
587    def test_close_open_readinto(self):
588        def io_func():
589            a = array('c', 'xxxxx')
590            self.f.readinto(a)
591        self._test_close_open_io(io_func)
592
593    def test_close_open_readline(self):
594        def io_func():
595            self.f.readline()
596        self._test_close_open_io(io_func)
597
598    def test_close_open_readlines(self):
599        def io_func():
600            self.f.readlines()
601        self._test_close_open_io(io_func)
602
603    def test_close_open_seek(self):
604        def io_func():
605            self.f.seek(0, 0)
606        self._test_close_open_io(io_func)
607
608    def test_close_open_tell(self):
609        def io_func():
610            self.f.tell()
611        self._test_close_open_io(io_func)
612
613    def test_close_open_truncate(self):
614        def io_func():
615            self.f.truncate()
616        self._test_close_open_io(io_func)
617
618    def test_close_open_write(self):
619        def io_func():
620            self.f.write('')
621        self._test_close_open_io(io_func)
622
623    def test_close_open_writelines(self):
624        def io_func():
625            self.f.writelines('')
626        self._test_close_open_io(io_func)
627
628
629@unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
630class TestFileSignalEINTR(unittest.TestCase):
631    def _test_reading(self, data_to_write, read_and_verify_code, method_name,
632                      universal_newlines=False):
633        """Generic buffered read method test harness to verify EINTR behavior.
634
635        Also validates that Python signal handlers are run during the read.
636
637        Args:
638            data_to_write: String to write to the child process for reading
639                before sending it a signal, confirming the signal was handled,
640                writing a final newline char and closing the infile pipe.
641            read_and_verify_code: Single "line" of code to read from a file
642                object named 'infile' and validate the result.  This will be
643                executed as part of a python subprocess fed data_to_write.
644            method_name: The name of the read method being tested, for use in
645                an error message on failure.
646            universal_newlines: If True, infile will be opened in universal
647                newline mode in the child process.
648        """
649        if universal_newlines:
650            # Test the \r\n -> \n conversion while we're at it.
651            data_to_write = data_to_write.replace('\n', '\r\n')
652            infile_setup_code = 'infile = os.fdopen(sys.stdin.fileno(), "rU")'
653        else:
654            infile_setup_code = 'infile = sys.stdin'
655        # Total pipe IO in this function is smaller than the minimum posix OS
656        # pipe buffer size of 512 bytes.  No writer should block.
657        assert len(data_to_write) < 512, 'data_to_write must fit in pipe buf.'
658
659        child_code = (
660             'import os, signal, sys ;'
661             'signal.signal('
662                     'signal.SIGINT, lambda s, f: sys.stderr.write("$\\n")) ;'
663             + infile_setup_code + ' ;' +
664             'assert isinstance(infile, file) ;'
665             'sys.stderr.write("Go.\\n") ;'
666             + read_and_verify_code)
667        reader_process = subprocess.Popen(
668                [sys.executable, '-c', child_code],
669                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
670                stderr=subprocess.PIPE)
671        # Wait for the signal handler to be installed.
672        go = reader_process.stderr.read(4)
673        if go != 'Go.\n':
674            reader_process.kill()
675            self.fail('Error from %s process while awaiting "Go":\n%s' % (
676                    method_name, go+reader_process.stderr.read()))
677        reader_process.stdin.write(data_to_write)
678        signals_sent = 0
679        rlist = []
680        # We don't know when the read_and_verify_code in our child is actually
681        # executing within the read system call we want to interrupt.  This
682        # loop waits for a bit before sending the first signal to increase
683        # the likelihood of that.  Implementations without correct EINTR
684        # and signal handling usually fail this test.
685        while not rlist:
686            rlist, _, _ = select.select([reader_process.stderr], (), (), 0.05)
687            reader_process.send_signal(signal.SIGINT)
688            # Give the subprocess time to handle it before we loop around and
689            # send another one.  On OSX the second signal happening close to
690            # immediately after the first was causing the subprocess to crash
691            # via the OS's default SIGINT handler.
692            time.sleep(0.1)
693            signals_sent += 1
694            if signals_sent > 200:
695                reader_process.kill()
696                self.fail("failed to handle signal during %s." % method_name)
697        # This assumes anything unexpected that writes to stderr will also
698        # write a newline.  That is true of the traceback printing code.
699        signal_line = reader_process.stderr.readline()
700        if signal_line != '$\n':
701            reader_process.kill()
702            self.fail('Error from %s process while awaiting signal:\n%s' % (
703                    method_name, signal_line+reader_process.stderr.read()))
704        # We append a newline to our input so that a readline call can
705        # end on its own before the EOF is seen.
706        stdout, stderr = reader_process.communicate(input='\n')
707        if reader_process.returncode != 0:
708            self.fail('%s() process exited rc=%d.\nSTDOUT:\n%s\nSTDERR:\n%s' % (
709                    method_name, reader_process.returncode, stdout, stderr))
710
711    def test_readline(self, universal_newlines=False):
712        """file.readline must handle signals and not lose data."""
713        self._test_reading(
714                data_to_write='hello, world!',
715                read_and_verify_code=(
716                        'line = infile.readline() ;'
717                        'expected_line = "hello, world!\\n" ;'
718                        'assert line == expected_line, ('
719                        '"read %r expected %r" % (line, expected_line))'
720                ),
721                method_name='readline',
722                universal_newlines=universal_newlines)
723
724    def test_readline_with_universal_newlines(self):
725        self.test_readline(universal_newlines=True)
726
727    def test_readlines(self, universal_newlines=False):
728        """file.readlines must handle signals and not lose data."""
729        self._test_reading(
730                data_to_write='hello\nworld!',
731                read_and_verify_code=(
732                        'lines = infile.readlines() ;'
733                        'expected_lines = ["hello\\n", "world!\\n"] ;'
734                        'assert lines == expected_lines, ('
735                        '"readlines returned wrong data.\\n" '
736                        '"got lines %r\\nexpected  %r" '
737                        '% (lines, expected_lines))'
738                ),
739                method_name='readlines',
740                universal_newlines=universal_newlines)
741
742    def test_readlines_with_universal_newlines(self):
743        self.test_readlines(universal_newlines=True)
744
745    def test_readall(self):
746        """Unbounded file.read() must handle signals and not lose data."""
747        self._test_reading(
748                data_to_write='hello, world!abcdefghijklm',
749                read_and_verify_code=(
750                        'data = infile.read() ;'
751                        'expected_data = "hello, world!abcdefghijklm\\n";'
752                        'assert data == expected_data, ('
753                        '"read %r expected %r" % (data, expected_data))'
754                ),
755                method_name='unbounded read')
756
757    def test_readinto(self):
758        """file.readinto must handle signals and not lose data."""
759        self._test_reading(
760                data_to_write='hello, world!',
761                read_and_verify_code=(
762                        'data = bytearray(50) ;'
763                        'num_read = infile.readinto(data) ;'
764                        'expected_data = "hello, world!\\n";'
765                        'assert data[:num_read] == expected_data, ('
766                        '"read %r expected %r" % (data, expected_data))'
767                ),
768                method_name='readinto')
769
770
771class StdoutTests(unittest.TestCase):
772
773    def test_move_stdout_on_write(self):
774        # Issue 3242: sys.stdout can be replaced (and freed) during a
775        # print statement; prevent a segfault in this case
776        save_stdout = sys.stdout
777
778        class File:
779            def write(self, data):
780                if '\n' in data:
781                    sys.stdout = save_stdout
782
783        try:
784            sys.stdout = File()
785            print "some text"
786        finally:
787            sys.stdout = save_stdout
788
789    def test_del_stdout_before_print(self):
790        # Issue 4597: 'print' with no argument wasn't reporting when
791        # sys.stdout was deleted.
792        save_stdout = sys.stdout
793        del sys.stdout
794        try:
795            print
796        except RuntimeError as e:
797            self.assertEqual(str(e), "lost sys.stdout")
798        else:
799            self.fail("Expected RuntimeError")
800        finally:
801            sys.stdout = save_stdout
802
803    def test_unicode(self):
804        import subprocess
805
806        def get_message(encoding, *code):
807            code = '\n'.join(code)
808            env = os.environ.copy()
809            env['PYTHONIOENCODING'] = encoding
810            process = subprocess.Popen([sys.executable, "-c", code],
811                                       stdout=subprocess.PIPE, env=env)
812            stdout, stderr = process.communicate()
813            self.assertEqual(process.returncode, 0)
814            return stdout
815
816        def check_message(text, encoding, expected):
817            stdout = get_message(encoding,
818                "import sys",
819                "sys.stdout.write(%r)" % text,
820                "sys.stdout.flush()")
821            self.assertEqual(stdout, expected)
822
823        # test the encoding
824        check_message(u'15\u20ac', "iso-8859-15", "15\xa4")
825        check_message(u'15\u20ac', "utf-8", '15\xe2\x82\xac')
826        check_message(u'15\u20ac', "utf-16-le", '1\x005\x00\xac\x20')
827
828        # test the error handler
829        check_message(u'15\u20ac', "iso-8859-1:ignore", "15")
830        check_message(u'15\u20ac', "iso-8859-1:replace", "15?")
831        check_message(u'15\u20ac', "iso-8859-1:backslashreplace", "15\\u20ac")
832
833        # test the buffer API
834        for objtype in ('buffer', 'bytearray'):
835            stdout = get_message('ascii',
836                'import sys',
837                r'sys.stdout.write(%s("\xe9"))' % objtype,
838                'sys.stdout.flush()')
839            self.assertEqual(stdout, "\xe9")
840
841
842def test_main():
843    # Historically, these tests have been sloppy about removing TESTFN.
844    # So get rid of it no matter what.
845    try:
846        run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
847            FileThreadingTests, TestFileSignalEINTR, StdoutTests)
848    finally:
849        if os.path.exists(TESTFN):
850            os.unlink(TESTFN)
851
852if __name__ == '__main__':
853    test_main()
854