1import sys
2import os
3import unittest
4import itertools
5import select
6import signal
7import subprocess
8import time
9from array import array
10from weakref import proxy
11try:
12    import threading
13except ImportError:
14    threading = None
15
16from test import test_support
17from test.test_support import TESTFN, run_unittest
18from UserList import UserList
19
20class AutoFileTests(unittest.TestCase):
21    # file tests for which a test file is automatically set up
22
23    def setUp(self):
24        self.f = open(TESTFN, 'wb')
25
26    def tearDown(self):
27        if self.f:
28            self.f.close()
29        os.remove(TESTFN)
30
31    def testWeakRefs(self):
32        # verify weak references
33        p = proxy(self.f)
34        p.write('teststring')
35        self.assertEqual(self.f.tell(), p.tell())
36        self.f.close()
37        self.f = None
38        self.assertRaises(ReferenceError, getattr, p, 'tell')
39
40    def testAttributes(self):
41        # verify expected attributes exist
42        f = self.f
43        with test_support.check_py3k_warnings():
44            softspace = f.softspace
45        f.name     # merely shouldn't blow up
46        f.mode     # ditto
47        f.closed   # ditto
48
49        with test_support.check_py3k_warnings():
50            # verify softspace is writable
51            f.softspace = softspace    # merely shouldn't blow up
52
53        # verify the others aren't
54        for attr in 'name', 'mode', 'closed':
55            self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
56
57    def testReadinto(self):
58        # verify readinto
59        self.f.write('12')
60        self.f.close()
61        a = array('c', 'x'*10)
62        self.f = open(TESTFN, 'rb')
63        n = self.f.readinto(a)
64        self.assertEqual('12', a.tostring()[:n])
65
66    def testWritelinesUserList(self):
67        # verify writelines with instance sequence
68        l = UserList(['1', '2'])
69        self.f.writelines(l)
70        self.f.close()
71        self.f = open(TESTFN, 'rb')
72        buf = self.f.read()
73        self.assertEqual(buf, '12')
74
75    def testWritelinesIntegers(self):
76        # verify writelines with integers
77        self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
78
79    def testWritelinesIntegersUserList(self):
80        # verify writelines with integers in UserList
81        l = UserList([1,2,3])
82        self.assertRaises(TypeError, self.f.writelines, l)
83
84    def testWritelinesNonString(self):
85        # verify writelines with non-string object
86        class NonString:
87            pass
88
89        self.assertRaises(TypeError, self.f.writelines,
90                          [NonString(), NonString()])
91
92    def testRepr(self):
93        # verify repr works
94        self.assertTrue(repr(self.f).startswith("<open file '" + TESTFN))
95        # see issue #14161
96        # Windows doesn't like \r\n\t" in the file name, but ' is ok
97        fname = 'xx\rxx\nxx\'xx"xx' if sys.platform != "win32" else "xx'xx"
98        with open(fname, 'w') as f:
99            self.addCleanup(os.remove, fname)
100            self.assertTrue(repr(f).startswith(
101                    "<open file %r, mode 'w' at" % fname))
102
103    def testErrors(self):
104        self.f.close()
105        self.f = open(TESTFN, 'rb')
106        f = self.f
107        self.assertEqual(f.name, TESTFN)
108        self.assertTrue(not f.isatty())
109        self.assertTrue(not f.closed)
110
111        self.assertRaises(TypeError, f.readinto, "")
112        f.close()
113        self.assertTrue(f.closed)
114
115    def testMethods(self):
116        methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
117                   'readline', 'readlines', 'seek', 'tell', 'truncate',
118                   'write', '__iter__']
119        deprecated_methods = ['xreadlines']
120        if sys.platform.startswith('atheos'):
121            methods.remove('truncate')
122
123        # __exit__ should close the file
124        self.f.__exit__(None, None, None)
125        self.assertTrue(self.f.closed)
126
127        for methodname in methods:
128            method = getattr(self.f, methodname)
129            # should raise on closed file
130            self.assertRaises(ValueError, method)
131        with test_support.check_py3k_warnings():
132            for methodname in deprecated_methods:
133                method = getattr(self.f, methodname)
134                self.assertRaises(ValueError, method)
135        self.assertRaises(ValueError, self.f.writelines, [])
136
137        # file is closed, __exit__ shouldn't do anything
138        self.assertEqual(self.f.__exit__(None, None, None), None)
139        # it must also return None if an exception was given
140        try:
141            1 // 0
142        except:
143            self.assertEqual(self.f.__exit__(*sys.exc_info()), None)
144
145    def testReadWhenWriting(self):
146        self.assertRaises(IOError, self.f.read)
147
148    def testNastyWritelinesGenerator(self):
149        def nasty():
150            for i in range(5):
151                if i == 3:
152                    self.f.close()
153                yield str(i)
154        self.assertRaises(ValueError, self.f.writelines, nasty())
155
156    def testIssue5677(self):
157        # Remark: Do not perform more than one test per open file,
158        # since that does NOT catch the readline error on Windows.
159        data = 'xxx'
160        for mode in ['w', 'wb', 'a', 'ab']:
161            for attr in ['read', 'readline', 'readlines']:
162                self.f = open(TESTFN, mode)
163                self.f.write(data)
164                self.assertRaises(IOError, getattr(self.f, attr))
165                self.f.close()
166
167            self.f = open(TESTFN, mode)
168            self.f.write(data)
169            self.assertRaises(IOError, lambda: [line for line in self.f])
170            self.f.close()
171
172            self.f = open(TESTFN, mode)
173            self.f.write(data)
174            self.assertRaises(IOError, self.f.readinto, bytearray(len(data)))
175            self.f.close()
176
177        for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']:
178            self.f = open(TESTFN, mode)
179            self.assertRaises(IOError, self.f.write, data)
180            self.f.close()
181
182            self.f = open(TESTFN, mode)
183            self.assertRaises(IOError, self.f.writelines, [data, data])
184            self.f.close()
185
186            self.f = open(TESTFN, mode)
187            self.assertRaises(IOError, self.f.truncate)
188            self.f.close()
189
190class OtherFileTests(unittest.TestCase):
191
192    def testOpenDir(self):
193        this_dir = os.path.dirname(__file__) or os.curdir
194        for mode in (None, "w"):
195            try:
196                if mode:
197                    f = open(this_dir, mode)
198                else:
199                    f = open(this_dir)
200            except IOError as e:
201                self.assertEqual(e.filename, this_dir)
202            else:
203                self.fail("opening a directory didn't raise an IOError")
204
205    def testModeStrings(self):
206        # check invalid mode strings
207        for mode in ("", "aU", "wU+"):
208            try:
209                f = open(TESTFN, mode)
210            except ValueError:
211                pass
212            else:
213                f.close()
214                self.fail('%r is an invalid file mode' % mode)
215
216        # Some invalid modes fail on Windows, but pass on Unix
217        # Issue3965: avoid a crash on Windows when filename is unicode
218        for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
219            try:
220                f = open(name, "rr")
221            except (IOError, ValueError):
222                pass
223            else:
224                f.close()
225
226    def testStdin(self):
227        # This causes the interpreter to exit on OSF1 v5.1.
228        if sys.platform != 'osf1V5':
229            self.assertRaises(IOError, sys.stdin.seek, -1)
230        else:
231            print >>sys.__stdout__, (
232                '  Skipping sys.stdin.seek(-1), it may crash the interpreter.'
233                ' Test manually.')
234        self.assertRaises(IOError, sys.stdin.truncate)
235
236    def testUnicodeOpen(self):
237        # verify repr works for unicode too
238        f = open(unicode(TESTFN), "w")
239        self.assertTrue(repr(f).startswith("<open file u'" + TESTFN))
240        f.close()
241        os.unlink(TESTFN)
242
243    def testBadModeArgument(self):
244        # verify that we get a sensible error message for bad mode argument
245        bad_mode = "qwerty"
246        try:
247            f = open(TESTFN, bad_mode)
248        except ValueError, msg:
249            if msg.args[0] != 0:
250                s = str(msg)
251                if TESTFN in s or bad_mode not in s:
252                    self.fail("bad error message for invalid mode: %s" % s)
253            # if msg.args[0] == 0, we're probably on Windows where there may
254            # be no obvious way to discover why open() failed.
255        else:
256            f.close()
257            self.fail("no error for invalid mode: %s" % bad_mode)
258
259    def testSetBufferSize(self):
260        # make sure that explicitly setting the buffer size doesn't cause
261        # misbehaviour especially with repeated close() calls
262        for s in (-1, 0, 1, 512):
263            try:
264                f = open(TESTFN, 'w', s)
265                f.write(str(s))
266                f.close()
267                f.close()
268                f = open(TESTFN, 'r', s)
269                d = int(f.read())
270                f.close()
271                f.close()
272            except IOError, msg:
273                self.fail('error setting buffer size %d: %s' % (s, str(msg)))
274            self.assertEqual(d, s)
275
276    def testTruncateOnWindows(self):
277        os.unlink(TESTFN)
278
279        def bug801631():
280            # SF bug <http://www.python.org/sf/801631>
281            # "file.truncate fault on windows"
282            f = open(TESTFN, 'wb')
283            f.write('12345678901')   # 11 bytes
284            f.close()
285
286            f = open(TESTFN,'rb+')
287            data = f.read(5)
288            if data != '12345':
289                self.fail("Read on file opened for update failed %r" % data)
290            if f.tell() != 5:
291                self.fail("File pos after read wrong %d" % f.tell())
292
293            f.truncate()
294            if f.tell() != 5:
295                self.fail("File pos after ftruncate wrong %d" % f.tell())
296
297            f.close()
298            size = os.path.getsize(TESTFN)
299            if size != 5:
300                self.fail("File size after ftruncate wrong %d" % size)
301
302        try:
303            bug801631()
304        finally:
305            os.unlink(TESTFN)
306
307    def testIteration(self):
308        # Test the complex interaction when mixing file-iteration and the
309        # various read* methods. Ostensibly, the mixture could just be tested
310        # to work when it should work according to the Python language,
311        # instead of fail when it should fail according to the current CPython
312        # implementation.  People don't always program Python the way they
313        # should, though, and the implemenation might change in subtle ways,
314        # so we explicitly test for errors, too; the test will just have to
315        # be updated when the implementation changes.
316        dataoffset = 16384
317        filler = "ham\n"
318        assert not dataoffset % len(filler), \
319            "dataoffset must be multiple of len(filler)"
320        nchunks = dataoffset // len(filler)
321        testlines = [
322            "spam, spam and eggs\n",
323            "eggs, spam, ham and spam\n",
324            "saussages, spam, spam and eggs\n",
325            "spam, ham, spam and eggs\n",
326            "spam, spam, spam, spam, spam, ham, spam\n",
327            "wonderful spaaaaaam.\n"
328        ]
329        methods = [("readline", ()), ("read", ()), ("readlines", ()),
330                   ("readinto", (array("c", " "*100),))]
331
332        try:
333            # Prepare the testfile
334            bag = open(TESTFN, "w")
335            bag.write(filler * nchunks)
336            bag.writelines(testlines)
337            bag.close()
338            # Test for appropriate errors mixing read* and iteration
339            for methodname, args in methods:
340                f = open(TESTFN)
341                if f.next() != filler:
342                    self.fail, "Broken testfile"
343                meth = getattr(f, methodname)
344                try:
345                    meth(*args)
346                except ValueError:
347                    pass
348                else:
349                    self.fail("%s%r after next() didn't raise ValueError" %
350                                     (methodname, args))
351                f.close()
352
353            # Test to see if harmless (by accident) mixing of read* and
354            # iteration still works. This depends on the size of the internal
355            # iteration buffer (currently 8192,) but we can test it in a
356            # flexible manner.  Each line in the bag o' ham is 4 bytes
357            # ("h", "a", "m", "\n"), so 4096 lines of that should get us
358            # exactly on the buffer boundary for any power-of-2 buffersize
359            # between 4 and 16384 (inclusive).
360            f = open(TESTFN)
361            for i in range(nchunks):
362                f.next()
363            testline = testlines.pop(0)
364            try:
365                line = f.readline()
366            except ValueError:
367                self.fail("readline() after next() with supposedly empty "
368                          "iteration-buffer failed anyway")
369            if line != testline:
370                self.fail("readline() after next() with empty buffer "
371                          "failed. Got %r, expected %r" % (line, testline))
372            testline = testlines.pop(0)
373            buf = array("c", "\x00" * len(testline))
374            try:
375                f.readinto(buf)
376            except ValueError:
377                self.fail("readinto() after next() with supposedly empty "
378                          "iteration-buffer failed anyway")
379            line = buf.tostring()
380            if line != testline:
381                self.fail("readinto() after next() with empty buffer "
382                          "failed. Got %r, expected %r" % (line, testline))
383
384            testline = testlines.pop(0)
385            try:
386                line = f.read(len(testline))
387            except ValueError:
388                self.fail("read() after next() with supposedly empty "
389                          "iteration-buffer failed anyway")
390            if line != testline:
391                self.fail("read() after next() with empty buffer "
392                          "failed. Got %r, expected %r" % (line, testline))
393            try:
394                lines = f.readlines()
395            except ValueError:
396                self.fail("readlines() after next() with supposedly empty "
397                          "iteration-buffer failed anyway")
398            if lines != testlines:
399                self.fail("readlines() after next() with empty buffer "
400                          "failed. Got %r, expected %r" % (line, testline))
401            # Reading after iteration hit EOF shouldn't hurt either
402            f = open(TESTFN)
403            try:
404                for line in f:
405                    pass
406                try:
407                    f.readline()
408                    f.readinto(buf)
409                    f.read()
410                    f.readlines()
411                except ValueError:
412                    self.fail("read* failed after next() consumed file")
413            finally:
414                f.close()
415        finally:
416            os.unlink(TESTFN)
417
418class FileSubclassTests(unittest.TestCase):
419
420    def testExit(self):
421        # test that exiting with context calls subclass' close
422        class C(file):
423            def __init__(self, *args):
424                self.subclass_closed = False
425                file.__init__(self, *args)
426            def close(self):
427                self.subclass_closed = True
428                file.close(self)
429
430        with C(TESTFN, 'w') as f:
431            pass
432        self.assertTrue(f.subclass_closed)
433
434
435@unittest.skipUnless(threading, 'Threading required for this test.')
436class FileThreadingTests(unittest.TestCase):
437    # These tests check the ability to call various methods of file objects
438    # (including close()) concurrently without crashing the Python interpreter.
439    # See #815646, #595601
440
441    def setUp(self):
442        self._threads = test_support.threading_setup()
443        self.f = None
444        self.filename = TESTFN
445        with open(self.filename, "w") as f:
446            f.write("\n".join("0123456789"))
447        self._count_lock = threading.Lock()
448        self.close_count = 0
449        self.close_success_count = 0
450        self.use_buffering = False
451
452    def tearDown(self):
453        if self.f:
454            try:
455                self.f.close()
456            except (EnvironmentError, ValueError):
457                pass
458        try:
459            os.remove(self.filename)
460        except EnvironmentError:
461            pass
462        test_support.threading_cleanup(*self._threads)
463
464    def _create_file(self):
465        if self.use_buffering:
466            self.f = open(self.filename, "w+", buffering=1024*16)
467        else:
468            self.f = open(self.filename, "w+")
469
470    def _close_file(self):
471        with self._count_lock:
472            self.close_count += 1
473        self.f.close()
474        with self._count_lock:
475            self.close_success_count += 1
476
477    def _close_and_reopen_file(self):
478        self._close_file()
479        # if close raises an exception thats fine, self.f remains valid so
480        # we don't need to reopen.
481        self._create_file()
482
483    def _run_workers(self, func, nb_workers, duration=0.2):
484        with self._count_lock:
485            self.close_count = 0
486            self.close_success_count = 0
487        self.do_continue = True
488        threads = []
489        try:
490            for i in range(nb_workers):
491                t = threading.Thread(target=func)
492                t.start()
493                threads.append(t)
494            for _ in xrange(100):
495                time.sleep(duration/100)
496                with self._count_lock:
497                    if self.close_count-self.close_success_count > nb_workers+1:
498                        if test_support.verbose:
499                            print 'Q',
500                        break
501            time.sleep(duration)
502        finally:
503            self.do_continue = False
504            for t in threads:
505                t.join()
506
507    def _test_close_open_io(self, io_func, nb_workers=5):
508        def worker():
509            self._create_file()
510            funcs = itertools.cycle((
511                lambda: io_func(),
512                lambda: self._close_and_reopen_file(),
513            ))
514            for f in funcs:
515                if not self.do_continue:
516                    break
517                try:
518                    f()
519                except (IOError, ValueError):
520                    pass
521        self._run_workers(worker, nb_workers)
522        if test_support.verbose:
523            # Useful verbose statistics when tuning this test to take
524            # less time to run but still ensuring that its still useful.
525            #
526            # the percent of close calls that raised an error
527            percent = 100. - 100.*self.close_success_count/self.close_count
528            print self.close_count, ('%.4f ' % percent),
529
530    def test_close_open(self):
531        def io_func():
532            pass
533        self._test_close_open_io(io_func)
534
535    def test_close_open_flush(self):
536        def io_func():
537            self.f.flush()
538        self._test_close_open_io(io_func)
539
540    def test_close_open_iter(self):
541        def io_func():
542            list(iter(self.f))
543        self._test_close_open_io(io_func)
544
545    def test_close_open_isatty(self):
546        def io_func():
547            self.f.isatty()
548        self._test_close_open_io(io_func)
549
550    def test_close_open_print(self):
551        def io_func():
552            print >> self.f, ''
553        self._test_close_open_io(io_func)
554
555    def test_close_open_print_buffered(self):
556        self.use_buffering = True
557        def io_func():
558            print >> self.f, ''
559        self._test_close_open_io(io_func)
560
561    def test_close_open_read(self):
562        def io_func():
563            self.f.read(0)
564        self._test_close_open_io(io_func)
565
566    def test_close_open_readinto(self):
567        def io_func():
568            a = array('c', 'xxxxx')
569            self.f.readinto(a)
570        self._test_close_open_io(io_func)
571
572    def test_close_open_readline(self):
573        def io_func():
574            self.f.readline()
575        self._test_close_open_io(io_func)
576
577    def test_close_open_readlines(self):
578        def io_func():
579            self.f.readlines()
580        self._test_close_open_io(io_func)
581
582    def test_close_open_seek(self):
583        def io_func():
584            self.f.seek(0, 0)
585        self._test_close_open_io(io_func)
586
587    def test_close_open_tell(self):
588        def io_func():
589            self.f.tell()
590        self._test_close_open_io(io_func)
591
592    def test_close_open_truncate(self):
593        def io_func():
594            self.f.truncate()
595        self._test_close_open_io(io_func)
596
597    def test_close_open_write(self):
598        def io_func():
599            self.f.write('')
600        self._test_close_open_io(io_func)
601
602    def test_close_open_writelines(self):
603        def io_func():
604            self.f.writelines('')
605        self._test_close_open_io(io_func)
606
607
608@unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
609class TestFileSignalEINTR(unittest.TestCase):
610    def _test_reading(self, data_to_write, read_and_verify_code, method_name,
611                      universal_newlines=False):
612        """Generic buffered read method test harness to verify EINTR behavior.
613
614        Also validates that Python signal handlers are run during the read.
615
616        Args:
617            data_to_write: String to write to the child process for reading
618                before sending it a signal, confirming the signal was handled,
619                writing a final newline char and closing the infile pipe.
620            read_and_verify_code: Single "line" of code to read from a file
621                object named 'infile' and validate the result.  This will be
622                executed as part of a python subprocess fed data_to_write.
623            method_name: The name of the read method being tested, for use in
624                an error message on failure.
625            universal_newlines: If True, infile will be opened in universal
626                newline mode in the child process.
627        """
628        if universal_newlines:
629            # Test the \r\n -> \n conversion while we're at it.
630            data_to_write = data_to_write.replace('\n', '\r\n')
631            infile_setup_code = 'infile = os.fdopen(sys.stdin.fileno(), "rU")'
632        else:
633            infile_setup_code = 'infile = sys.stdin'
634        # Total pipe IO in this function is smaller than the minimum posix OS
635        # pipe buffer size of 512 bytes.  No writer should block.
636        assert len(data_to_write) < 512, 'data_to_write must fit in pipe buf.'
637
638        child_code = (
639             'import os, signal, sys ;'
640             'signal.signal('
641                     'signal.SIGINT, lambda s, f: sys.stderr.write("$\\n")) ;'
642             + infile_setup_code + ' ;' +
643             'assert isinstance(infile, file) ;'
644             'sys.stderr.write("Go.\\n") ;'
645             + read_and_verify_code)
646        reader_process = subprocess.Popen(
647                [sys.executable, '-c', child_code],
648                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
649                stderr=subprocess.PIPE)
650        # Wait for the signal handler to be installed.
651        go = reader_process.stderr.read(4)
652        if go != 'Go.\n':
653            reader_process.kill()
654            self.fail('Error from %s process while awaiting "Go":\n%s' % (
655                    method_name, go+reader_process.stderr.read()))
656        reader_process.stdin.write(data_to_write)
657        signals_sent = 0
658        rlist = []
659        # We don't know when the read_and_verify_code in our child is actually
660        # executing within the read system call we want to interrupt.  This
661        # loop waits for a bit before sending the first signal to increase
662        # the likelihood of that.  Implementations without correct EINTR
663        # and signal handling usually fail this test.
664        while not rlist:
665            rlist, _, _ = select.select([reader_process.stderr], (), (), 0.05)
666            reader_process.send_signal(signal.SIGINT)
667            # Give the subprocess time to handle it before we loop around and
668            # send another one.  On OSX the second signal happening close to
669            # immediately after the first was causing the subprocess to crash
670            # via the OS's default SIGINT handler.
671            time.sleep(0.1)
672            signals_sent += 1
673            if signals_sent > 200:
674                reader_process.kill()
675                self.fail("failed to handle signal during %s." % method_name)
676        # This assumes anything unexpected that writes to stderr will also
677        # write a newline.  That is true of the traceback printing code.
678        signal_line = reader_process.stderr.readline()
679        if signal_line != '$\n':
680            reader_process.kill()
681            self.fail('Error from %s process while awaiting signal:\n%s' % (
682                    method_name, signal_line+reader_process.stderr.read()))
683        # We append a newline to our input so that a readline call can
684        # end on its own before the EOF is seen.
685        stdout, stderr = reader_process.communicate(input='\n')
686        if reader_process.returncode != 0:
687            self.fail('%s() process exited rc=%d.\nSTDOUT:\n%s\nSTDERR:\n%s' % (
688                    method_name, reader_process.returncode, stdout, stderr))
689
690    def test_readline(self, universal_newlines=False):
691        """file.readline must handle signals and not lose data."""
692        self._test_reading(
693                data_to_write='hello, world!',
694                read_and_verify_code=(
695                        'line = infile.readline() ;'
696                        'expected_line = "hello, world!\\n" ;'
697                        'assert line == expected_line, ('
698                        '"read %r expected %r" % (line, expected_line))'
699                ),
700                method_name='readline',
701                universal_newlines=universal_newlines)
702
703    def test_readline_with_universal_newlines(self):
704        self.test_readline(universal_newlines=True)
705
706    def test_readlines(self, universal_newlines=False):
707        """file.readlines must handle signals and not lose data."""
708        self._test_reading(
709                data_to_write='hello\nworld!',
710                read_and_verify_code=(
711                        'lines = infile.readlines() ;'
712                        'expected_lines = ["hello\\n", "world!\\n"] ;'
713                        'assert lines == expected_lines, ('
714                        '"readlines returned wrong data.\\n" '
715                        '"got lines %r\\nexpected  %r" '
716                        '% (lines, expected_lines))'
717                ),
718                method_name='readlines',
719                universal_newlines=universal_newlines)
720
721    def test_readlines_with_universal_newlines(self):
722        self.test_readlines(universal_newlines=True)
723
724    def test_readall(self):
725        """Unbounded file.read() must handle signals and not lose data."""
726        self._test_reading(
727                data_to_write='hello, world!abcdefghijklm',
728                read_and_verify_code=(
729                        'data = infile.read() ;'
730                        'expected_data = "hello, world!abcdefghijklm\\n";'
731                        'assert data == expected_data, ('
732                        '"read %r expected %r" % (data, expected_data))'
733                ),
734                method_name='unbounded read')
735
736    def test_readinto(self):
737        """file.readinto must handle signals and not lose data."""
738        self._test_reading(
739                data_to_write='hello, world!',
740                read_and_verify_code=(
741                        'data = bytearray(50) ;'
742                        'num_read = infile.readinto(data) ;'
743                        'expected_data = "hello, world!\\n";'
744                        'assert data[:num_read] == expected_data, ('
745                        '"read %r expected %r" % (data, expected_data))'
746                ),
747                method_name='readinto')
748
749
750class StdoutTests(unittest.TestCase):
751
752    def test_move_stdout_on_write(self):
753        # Issue 3242: sys.stdout can be replaced (and freed) during a
754        # print statement; prevent a segfault in this case
755        save_stdout = sys.stdout
756
757        class File:
758            def write(self, data):
759                if '\n' in data:
760                    sys.stdout = save_stdout
761
762        try:
763            sys.stdout = File()
764            print "some text"
765        finally:
766            sys.stdout = save_stdout
767
768    def test_del_stdout_before_print(self):
769        # Issue 4597: 'print' with no argument wasn't reporting when
770        # sys.stdout was deleted.
771        save_stdout = sys.stdout
772        del sys.stdout
773        try:
774            print
775        except RuntimeError as e:
776            self.assertEqual(str(e), "lost sys.stdout")
777        else:
778            self.fail("Expected RuntimeError")
779        finally:
780            sys.stdout = save_stdout
781
782    def test_unicode(self):
783        import subprocess
784
785        def get_message(encoding, *code):
786            code = '\n'.join(code)
787            env = os.environ.copy()
788            env['PYTHONIOENCODING'] = encoding
789            process = subprocess.Popen([sys.executable, "-c", code],
790                                       stdout=subprocess.PIPE, env=env)
791            stdout, stderr = process.communicate()
792            self.assertEqual(process.returncode, 0)
793            return stdout
794
795        def check_message(text, encoding, expected):
796            stdout = get_message(encoding,
797                "import sys",
798                "sys.stdout.write(%r)" % text,
799                "sys.stdout.flush()")
800            self.assertEqual(stdout, expected)
801
802        # test the encoding
803        check_message(u'15\u20ac', "iso-8859-15", "15\xa4")
804        check_message(u'15\u20ac', "utf-8", '15\xe2\x82\xac')
805        check_message(u'15\u20ac', "utf-16-le", '1\x005\x00\xac\x20')
806
807        # test the error handler
808        check_message(u'15\u20ac', "iso-8859-1:ignore", "15")
809        check_message(u'15\u20ac', "iso-8859-1:replace", "15?")
810        check_message(u'15\u20ac', "iso-8859-1:backslashreplace", "15\\u20ac")
811
812        # test the buffer API
813        for objtype in ('buffer', 'bytearray'):
814            stdout = get_message('ascii',
815                'import sys',
816                r'sys.stdout.write(%s("\xe9"))' % objtype,
817                'sys.stdout.flush()')
818            self.assertEqual(stdout, "\xe9")
819
820
821def test_main():
822    # Historically, these tests have been sloppy about removing TESTFN.
823    # So get rid of it no matter what.
824    try:
825        run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
826            FileThreadingTests, TestFileSignalEINTR, StdoutTests)
827    finally:
828        if os.path.exists(TESTFN):
829            os.unlink(TESTFN)
830
831if __name__ == '__main__':
832    test_main()
833