1# Adapted from test_file.py by Daniel Stutzbach
2
3from __future__ import unicode_literals
4
5import sys
6import os
7import errno
8import unittest
9from array import array
10from weakref import proxy
11from functools import wraps
12from UserList import UserList
13import _testcapi
14
15from test.test_support import TESTFN, check_warnings, run_unittest, make_bad_fd
16from test.test_support import py3k_bytes as bytes
17from test.script_helper import run_python
18
19from _io import FileIO as _FileIO
20
21class AutoFileTests(unittest.TestCase):
22    # file tests for which a test file is automatically set up
23
24    def setUp(self):
25        self.f = _FileIO(TESTFN, 'w')
26
27    def tearDown(self):
28        if self.f:
29            self.f.close()
30        os.remove(TESTFN)
31
32    def testWeakRefs(self):
33        # verify weak references
34        p = proxy(self.f)
35        p.write(bytes(range(10)))
36        self.assertEqual(self.f.tell(), p.tell())
37        self.f.close()
38        self.f = None
39        self.assertRaises(ReferenceError, getattr, p, 'tell')
40
41    def testSeekTell(self):
42        self.f.write(bytes(range(20)))
43        self.assertEqual(self.f.tell(), 20)
44        self.f.seek(0)
45        self.assertEqual(self.f.tell(), 0)
46        self.f.seek(10)
47        self.assertEqual(self.f.tell(), 10)
48        self.f.seek(5, 1)
49        self.assertEqual(self.f.tell(), 15)
50        self.f.seek(-5, 1)
51        self.assertEqual(self.f.tell(), 10)
52        self.f.seek(-5, 2)
53        self.assertEqual(self.f.tell(), 15)
54
55    def testAttributes(self):
56        # verify expected attributes exist
57        f = self.f
58
59        self.assertEqual(f.mode, "wb")
60        self.assertEqual(f.closed, False)
61
62        # verify the attributes are readonly
63        for attr in 'mode', 'closed':
64            self.assertRaises((AttributeError, TypeError),
65                              setattr, f, attr, 'oops')
66
67    def testReadinto(self):
68        # verify readinto
69        self.f.write(b"\x01\x02")
70        self.f.close()
71        a = array(b'b', b'x'*10)
72        self.f = _FileIO(TESTFN, 'r')
73        n = self.f.readinto(a)
74        self.assertEqual(array(b'b', [1, 2]), a[:n])
75
76    def testWritelinesList(self):
77        l = [b'123', b'456']
78        self.f.writelines(l)
79        self.f.close()
80        self.f = _FileIO(TESTFN, 'rb')
81        buf = self.f.read()
82        self.assertEqual(buf, b'123456')
83
84    def testWritelinesUserList(self):
85        l = UserList([b'123', b'456'])
86        self.f.writelines(l)
87        self.f.close()
88        self.f = _FileIO(TESTFN, 'rb')
89        buf = self.f.read()
90        self.assertEqual(buf, b'123456')
91
92    def testWritelinesError(self):
93        self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
94        self.assertRaises(TypeError, self.f.writelines, None)
95
96    def test_none_args(self):
97        self.f.write(b"hi\nbye\nabc")
98        self.f.close()
99        self.f = _FileIO(TESTFN, 'r')
100        self.assertEqual(self.f.read(None), b"hi\nbye\nabc")
101        self.f.seek(0)
102        self.assertEqual(self.f.readline(None), b"hi\n")
103        self.assertEqual(self.f.readlines(None), [b"bye\n", b"abc"])
104
105    def testRepr(self):
106        self.assertEqual(repr(self.f), "<_io.FileIO name=%r mode='%s'>"
107                                       % (self.f.name, self.f.mode))
108        del self.f.name
109        self.assertEqual(repr(self.f), "<_io.FileIO fd=%r mode='%s'>"
110                                       % (self.f.fileno(), self.f.mode))
111        self.f.close()
112        self.assertEqual(repr(self.f), "<_io.FileIO [closed]>")
113
114    def testErrors(self):
115        f = self.f
116        self.assertTrue(not f.isatty())
117        self.assertTrue(not f.closed)
118        #self.assertEqual(f.name, TESTFN)
119        self.assertRaises(ValueError, f.read, 10) # Open for reading
120        f.close()
121        self.assertTrue(f.closed)
122        f = _FileIO(TESTFN, 'r')
123        self.assertRaises(TypeError, f.readinto, "")
124        self.assertTrue(not f.closed)
125        f.close()
126        self.assertTrue(f.closed)
127
128    def testMethods(self):
129        methods = ['fileno', 'isatty', 'read', 'readinto',
130                   'seek', 'tell', 'truncate', 'write', 'seekable',
131                   'readable', 'writable']
132        if sys.platform.startswith('atheos'):
133            methods.remove('truncate')
134
135        self.f.close()
136        self.assertTrue(self.f.closed)
137
138        for methodname in methods:
139            method = getattr(self.f, methodname)
140            # should raise on closed file
141            self.assertRaises(ValueError, method)
142
143    def testOpendir(self):
144        # Issue 3703: opening a directory should fill the errno
145        # Windows always returns "[Errno 13]: Permission denied
146        # Unix calls dircheck() and returns "[Errno 21]: Is a directory"
147        try:
148            _FileIO('.', 'r')
149        except IOError as e:
150            self.assertNotEqual(e.errno, 0)
151            self.assertEqual(e.filename, ".")
152        else:
153            self.fail("Should have raised IOError")
154
155    @unittest.skipIf(os.name == 'nt', "test only works on a POSIX-like system")
156    def testOpenDirFD(self):
157        fd = os.open('.', os.O_RDONLY)
158        with self.assertRaises(IOError) as cm:
159            _FileIO(fd, 'r')
160        os.close(fd)
161        self.assertEqual(cm.exception.errno, errno.EISDIR)
162
163    #A set of functions testing that we get expected behaviour if someone has
164    #manually closed the internal file descriptor.  First, a decorator:
165    def ClosedFD(func):
166        @wraps(func)
167        def wrapper(self):
168            #forcibly close the fd before invoking the problem function
169            f = self.f
170            os.close(f.fileno())
171            try:
172                func(self, f)
173            finally:
174                try:
175                    self.f.close()
176                except IOError:
177                    pass
178        return wrapper
179
180    def ClosedFDRaises(func):
181        @wraps(func)
182        def wrapper(self):
183            #forcibly close the fd before invoking the problem function
184            f = self.f
185            os.close(f.fileno())
186            try:
187                func(self, f)
188            except IOError as e:
189                self.assertEqual(e.errno, errno.EBADF)
190            else:
191                self.fail("Should have raised IOError")
192            finally:
193                try:
194                    self.f.close()
195                except IOError:
196                    pass
197        return wrapper
198
199    @ClosedFDRaises
200    def testErrnoOnClose(self, f):
201        f.close()
202
203    @ClosedFDRaises
204    def testErrnoOnClosedWrite(self, f):
205        f.write('a')
206
207    @ClosedFDRaises
208    def testErrnoOnClosedSeek(self, f):
209        f.seek(0)
210
211    @ClosedFDRaises
212    def testErrnoOnClosedTell(self, f):
213        f.tell()
214
215    @ClosedFDRaises
216    def testErrnoOnClosedTruncate(self, f):
217        f.truncate(0)
218
219    @ClosedFD
220    def testErrnoOnClosedSeekable(self, f):
221        f.seekable()
222
223    @ClosedFD
224    def testErrnoOnClosedReadable(self, f):
225        f.readable()
226
227    @ClosedFD
228    def testErrnoOnClosedWritable(self, f):
229        f.writable()
230
231    @ClosedFD
232    def testErrnoOnClosedFileno(self, f):
233        f.fileno()
234
235    @ClosedFD
236    def testErrnoOnClosedIsatty(self, f):
237        self.assertEqual(f.isatty(), False)
238
239    def ReopenForRead(self):
240        try:
241            self.f.close()
242        except IOError:
243            pass
244        self.f = _FileIO(TESTFN, 'r')
245        os.close(self.f.fileno())
246        return self.f
247
248    @ClosedFDRaises
249    def testErrnoOnClosedRead(self, f):
250        f = self.ReopenForRead()
251        f.read(1)
252
253    @ClosedFDRaises
254    def testErrnoOnClosedReadall(self, f):
255        f = self.ReopenForRead()
256        f.readall()
257
258    @ClosedFDRaises
259    def testErrnoOnClosedReadinto(self, f):
260        f = self.ReopenForRead()
261        a = array(b'b', b'x'*10)
262        f.readinto(a)
263
264class OtherFileTests(unittest.TestCase):
265
266    def testAbles(self):
267        try:
268            f = _FileIO(TESTFN, "w")
269            self.assertEqual(f.readable(), False)
270            self.assertEqual(f.writable(), True)
271            self.assertEqual(f.seekable(), True)
272            f.close()
273
274            f = _FileIO(TESTFN, "r")
275            self.assertEqual(f.readable(), True)
276            self.assertEqual(f.writable(), False)
277            self.assertEqual(f.seekable(), True)
278            f.close()
279
280            f = _FileIO(TESTFN, "a+")
281            self.assertEqual(f.readable(), True)
282            self.assertEqual(f.writable(), True)
283            self.assertEqual(f.seekable(), True)
284            self.assertEqual(f.isatty(), False)
285            f.close()
286
287            if sys.platform != "win32":
288                try:
289                    f = _FileIO("/dev/tty", "a")
290                except EnvironmentError:
291                    # When run in a cron job there just aren't any
292                    # ttys, so skip the test.  This also handles other
293                    # OS'es that don't support /dev/tty.
294                    pass
295                else:
296                    self.assertEqual(f.readable(), False)
297                    self.assertEqual(f.writable(), True)
298                    if sys.platform != "darwin" and \
299                       'bsd' not in sys.platform and \
300                       not sys.platform.startswith('sunos'):
301                        # Somehow /dev/tty appears seekable on some BSDs
302                        self.assertEqual(f.seekable(), False)
303                    self.assertEqual(f.isatty(), True)
304                    f.close()
305        finally:
306            os.unlink(TESTFN)
307
308    def testModeStrings(self):
309        # check invalid mode strings
310        for mode in ("", "aU", "wU+", "rw", "rt"):
311            try:
312                f = _FileIO(TESTFN, mode)
313            except ValueError:
314                pass
315            else:
316                f.close()
317                self.fail('%r is an invalid file mode' % mode)
318
319    def testUnicodeOpen(self):
320        # verify repr works for unicode too
321        f = _FileIO(str(TESTFN), "w")
322        f.close()
323        os.unlink(TESTFN)
324
325    def testBytesOpen(self):
326        # Opening a bytes filename
327        try:
328            fn = TESTFN.encode("ascii")
329        except UnicodeEncodeError:
330            # Skip test
331            return
332        f = _FileIO(fn, "w")
333        try:
334            f.write(b"abc")
335            f.close()
336            with open(TESTFN, "rb") as f:
337                self.assertEqual(f.read(), b"abc")
338        finally:
339            os.unlink(TESTFN)
340
341    def testInvalidFd(self):
342        self.assertRaises(ValueError, _FileIO, -10)
343        self.assertRaises(OSError, _FileIO, make_bad_fd())
344        if sys.platform == 'win32':
345            import msvcrt
346            self.assertRaises(IOError, msvcrt.get_osfhandle, make_bad_fd())
347        # Issue 15989
348        self.assertRaises(TypeError, _FileIO, _testcapi.INT_MAX + 1)
349        self.assertRaises(TypeError, _FileIO, _testcapi.INT_MIN - 1)
350
351    def testBadModeArgument(self):
352        # verify that we get a sensible error message for bad mode argument
353        bad_mode = "qwerty"
354        try:
355            f = _FileIO(TESTFN, bad_mode)
356        except ValueError as msg:
357            if msg.args[0] != 0:
358                s = str(msg)
359                if TESTFN in s or bad_mode not in s:
360                    self.fail("bad error message for invalid mode: %s" % s)
361            # if msg.args[0] == 0, we're probably on Windows where there may be
362            # no obvious way to discover why open() failed.
363        else:
364            f.close()
365            self.fail("no error for invalid mode: %s" % bad_mode)
366
367    def testTruncate(self):
368        f = _FileIO(TESTFN, 'w')
369        f.write(bytes(bytearray(range(10))))
370        self.assertEqual(f.tell(), 10)
371        f.truncate(5)
372        self.assertEqual(f.tell(), 10)
373        self.assertEqual(f.seek(0, os.SEEK_END), 5)
374        f.truncate(15)
375        self.assertEqual(f.tell(), 5)
376        self.assertEqual(f.seek(0, os.SEEK_END), 15)
377        f.close()
378
379    def testTruncateOnWindows(self):
380        def bug801631():
381            # SF bug <http://www.python.org/sf/801631>
382            # "file.truncate fault on windows"
383            f = _FileIO(TESTFN, 'w')
384            f.write(bytes(range(11)))
385            f.close()
386
387            f = _FileIO(TESTFN,'r+')
388            data = f.read(5)
389            if data != bytes(range(5)):
390                self.fail("Read on file opened for update failed %r" % data)
391            if f.tell() != 5:
392                self.fail("File pos after read wrong %d" % f.tell())
393
394            f.truncate()
395            if f.tell() != 5:
396                self.fail("File pos after ftruncate wrong %d" % f.tell())
397
398            f.close()
399            size = os.path.getsize(TESTFN)
400            if size != 5:
401                self.fail("File size after ftruncate wrong %d" % size)
402
403        try:
404            bug801631()
405        finally:
406            os.unlink(TESTFN)
407
408    def testAppend(self):
409        try:
410            f = open(TESTFN, 'wb')
411            f.write(b'spam')
412            f.close()
413            f = open(TESTFN, 'ab')
414            f.write(b'eggs')
415            f.close()
416            f = open(TESTFN, 'rb')
417            d = f.read()
418            f.close()
419            self.assertEqual(d, b'spameggs')
420        finally:
421            try:
422                os.unlink(TESTFN)
423            except:
424                pass
425
426    def testInvalidInit(self):
427        self.assertRaises(TypeError, _FileIO, "1", 0, 0)
428
429    def testWarnings(self):
430        with check_warnings(quiet=True) as w:
431            self.assertEqual(w.warnings, [])
432            self.assertRaises(TypeError, _FileIO, [])
433            self.assertEqual(w.warnings, [])
434            self.assertRaises(ValueError, _FileIO, "/some/invalid/name", "rt")
435            self.assertEqual(w.warnings, [])
436
437    def test_surrogates(self):
438        # Issue #8438: try to open a filename containing surrogates.
439        # It should either fail because the file doesn't exist or the filename
440        # can't be represented using the filesystem encoding, but not because
441        # of a LookupError for the error handler "surrogateescape".
442        filename = u'\udc80.txt'
443        try:
444            with _FileIO(filename):
445                pass
446        except (UnicodeEncodeError, IOError):
447            pass
448        # Spawn a separate Python process with a different "file system
449        # default encoding", to exercise this further.
450        env = dict(os.environ)
451        env[b'LC_CTYPE'] = b'C'
452        _, out = run_python('-c', 'import _io; _io.FileIO(%r)' % filename, env=env)
453        if ('UnicodeEncodeError' not in out and not
454                ( ('IOError: [Errno 2] No such file or directory' in out) or
455                  ('IOError: [Errno 22] Invalid argument' in out) ) ):
456            self.fail('Bad output: %r' % out)
457
458    def testUnclosedFDOnException(self):
459        class MyException(Exception): pass
460        class MyFileIO(_FileIO):
461            def __setattr__(self, name, value):
462                if name == "name":
463                    raise MyException("blocked setting name")
464                return super(MyFileIO, self).__setattr__(name, value)
465        fd = os.open(__file__, os.O_RDONLY)
466        self.assertRaises(MyException, MyFileIO, fd)
467        os.close(fd)  # should not raise OSError(EBADF)
468
469def test_main():
470    # Historically, these tests have been sloppy about removing TESTFN.
471    # So get rid of it no matter what.
472    try:
473        run_unittest(AutoFileTests, OtherFileTests)
474    finally:
475        if os.path.exists(TESTFN):
476            os.unlink(TESTFN)
477
478if __name__ == '__main__':
479    test_main()
480