1# NOTE: this file tests the new `io` library backported from Python 3.x.
2# Similar tests for the builtin file object can be found in test_file2k.py.
3
4from __future__ import print_function
5
6import sys
7import os
8import unittest
9from array import array
10from weakref import proxy
11
12import io
13import _pyio as pyio
14
15from test.test_support import TESTFN, run_unittest
16from UserList import UserList
17
18class AutoFileTests(unittest.TestCase):
19    # file tests for which a test file is automatically set up
20
21    def setUp(self):
22        self.f = self.open(TESTFN, 'wb')
23
24    def tearDown(self):
25        if self.f:
26            self.f.close()
27        os.remove(TESTFN)
28
29    def testWeakRefs(self):
30        # verify weak references
31        p = proxy(self.f)
32        p.write(b'teststring')
33        self.assertEqual(self.f.tell(), p.tell())
34        self.f.close()
35        self.f = None
36        self.assertRaises(ReferenceError, getattr, p, 'tell')
37
38    def testAttributes(self):
39        # verify expected attributes exist
40        f = self.f
41        f.name     # merely shouldn't blow up
42        f.mode     # ditto
43        f.closed   # ditto
44
45    def testReadinto(self):
46        # verify readinto
47        self.f.write(b'12')
48        self.f.close()
49        a = array('b', b'x'*10)
50        self.f = self.open(TESTFN, 'rb')
51        n = self.f.readinto(a)
52        self.assertEqual(b'12', a.tostring()[:n])
53
54    def testReadinto_text(self):
55        # verify readinto refuses text files
56        a = array('b', b'x'*10)
57        self.f.close()
58        self.f = self.open(TESTFN, 'r')
59        if hasattr(self.f, "readinto"):
60            self.assertRaises(TypeError, self.f.readinto, a)
61
62    def testWritelinesUserList(self):
63        # verify writelines with instance sequence
64        l = UserList([b'1', b'2'])
65        self.f.writelines(l)
66        self.f.close()
67        self.f = self.open(TESTFN, 'rb')
68        buf = self.f.read()
69        self.assertEqual(buf, b'12')
70
71    def testWritelinesIntegers(self):
72        # verify writelines with integers
73        self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
74
75    def testWritelinesIntegersUserList(self):
76        # verify writelines with integers in UserList
77        l = UserList([1,2,3])
78        self.assertRaises(TypeError, self.f.writelines, l)
79
80    def testWritelinesNonString(self):
81        # verify writelines with non-string object
82        class NonString:
83            pass
84
85        self.assertRaises(TypeError, self.f.writelines,
86                          [NonString(), NonString()])
87
88    def testErrors(self):
89        f = self.f
90        self.assertEqual(f.name, TESTFN)
91        self.assertTrue(not f.isatty())
92        self.assertTrue(not f.closed)
93
94        if hasattr(f, "readinto"):
95            self.assertRaises((IOError, TypeError), f.readinto, "")
96        f.close()
97        self.assertTrue(f.closed)
98
99    def testMethods(self):
100        methods = [('fileno', ()),
101                   ('flush', ()),
102                   ('isatty', ()),
103                   ('next', ()),
104                   ('read', ()),
105                   ('write', (b"",)),
106                   ('readline', ()),
107                   ('readlines', ()),
108                   ('seek', (0,)),
109                   ('tell', ()),
110                   ('write', (b"",)),
111                   ('writelines', ([],)),
112                   ('__iter__', ()),
113                   ]
114        if not sys.platform.startswith('atheos'):
115            methods.append(('truncate', ()))
116
117        # __exit__ should close the file
118        self.f.__exit__(None, None, None)
119        self.assertTrue(self.f.closed)
120
121        for methodname, args in methods:
122            method = getattr(self.f, methodname)
123            # should raise on closed file
124            self.assertRaises(ValueError, method, *args)
125
126        # file is closed, __exit__ shouldn't do anything
127        self.assertEqual(self.f.__exit__(None, None, None), None)
128        # it must also return None if an exception was given
129        try:
130            1 // 0
131        except:
132            self.assertEqual(self.f.__exit__(*sys.exc_info()), None)
133
134    def testReadWhenWriting(self):
135        self.assertRaises(IOError, self.f.read)
136
137class CAutoFileTests(AutoFileTests):
138    open = io.open
139
140class PyAutoFileTests(AutoFileTests):
141    open = staticmethod(pyio.open)
142
143
144class OtherFileTests(unittest.TestCase):
145
146    def testModeStrings(self):
147        # check invalid mode strings
148        for mode in ("", "aU", "wU+"):
149            try:
150                f = self.open(TESTFN, mode)
151            except ValueError:
152                pass
153            else:
154                f.close()
155                self.fail('%r is an invalid file mode' % mode)
156
157    def testStdin(self):
158        # This causes the interpreter to exit on OSF1 v5.1.
159        if sys.platform != 'osf1V5':
160            self.assertRaises((IOError, ValueError), sys.stdin.seek, -1)
161        else:
162            print((
163                '  Skipping sys.stdin.seek(-1), it may crash the interpreter.'
164                ' Test manually.'), file=sys.__stdout__)
165        self.assertRaises((IOError, ValueError), sys.stdin.truncate)
166
167    def testBadModeArgument(self):
168        # verify that we get a sensible error message for bad mode argument
169        bad_mode = "qwerty"
170        try:
171            f = self.open(TESTFN, bad_mode)
172        except ValueError as msg:
173            if msg.args[0] != 0:
174                s = str(msg)
175                if TESTFN in s or bad_mode not in s:
176                    self.fail("bad error message for invalid mode: %s" % s)
177            # if msg.args[0] == 0, we're probably on Windows where there may be
178            # no obvious way to discover why open() failed.
179        else:
180            f.close()
181            self.fail("no error for invalid mode: %s" % bad_mode)
182
183    def testSetBufferSize(self):
184        # make sure that explicitly setting the buffer size doesn't cause
185        # misbehaviour especially with repeated close() calls
186        for s in (-1, 0, 1, 512):
187            try:
188                f = self.open(TESTFN, 'wb', s)
189                f.write(str(s).encode("ascii"))
190                f.close()
191                f.close()
192                f = self.open(TESTFN, 'rb', s)
193                d = int(f.read().decode("ascii"))
194                f.close()
195                f.close()
196            except IOError as msg:
197                self.fail('error setting buffer size %d: %s' % (s, str(msg)))
198            self.assertEqual(d, s)
199
200    def testTruncateOnWindows(self):
201        # SF bug <http://www.python.org/sf/801631>
202        # "file.truncate fault on windows"
203
204        os.unlink(TESTFN)
205        f = self.open(TESTFN, 'wb')
206
207        try:
208            f.write(b'12345678901')   # 11 bytes
209            f.close()
210
211            f = self.open(TESTFN,'rb+')
212            data = f.read(5)
213            if data != b'12345':
214                self.fail("Read on file opened for update failed %r" % data)
215            if f.tell() != 5:
216                self.fail("File pos after read wrong %d" % f.tell())
217
218            f.truncate()
219            if f.tell() != 5:
220                self.fail("File pos after ftruncate wrong %d" % f.tell())
221
222            f.close()
223            size = os.path.getsize(TESTFN)
224            if size != 5:
225                self.fail("File size after ftruncate wrong %d" % size)
226        finally:
227            f.close()
228            os.unlink(TESTFN)
229
230    def testIteration(self):
231        # Test the complex interaction when mixing file-iteration and the
232        # various read* methods.
233        dataoffset = 16384
234        filler = b"ham\n"
235        assert not dataoffset % len(filler), \
236            "dataoffset must be multiple of len(filler)"
237        nchunks = dataoffset // len(filler)
238        testlines = [
239            b"spam, spam and eggs\n",
240            b"eggs, spam, ham and spam\n",
241            b"saussages, spam, spam and eggs\n",
242            b"spam, ham, spam and eggs\n",
243            b"spam, spam, spam, spam, spam, ham, spam\n",
244            b"wonderful spaaaaaam.\n"
245        ]
246        methods = [("readline", ()), ("read", ()), ("readlines", ()),
247                   ("readinto", (array("b", b" "*100),))]
248
249        try:
250            # Prepare the testfile
251            bag = self.open(TESTFN, "wb")
252            bag.write(filler * nchunks)
253            bag.writelines(testlines)
254            bag.close()
255            # Test for appropriate errors mixing read* and iteration
256            for methodname, args in methods:
257                f = self.open(TESTFN, 'rb')
258                if next(f) != filler:
259                    self.fail, "Broken testfile"
260                meth = getattr(f, methodname)
261                meth(*args)  # This simply shouldn't fail
262                f.close()
263
264            # Test to see if harmless (by accident) mixing of read* and
265            # iteration still works. This depends on the size of the internal
266            # iteration buffer (currently 8192,) but we can test it in a
267            # flexible manner.  Each line in the bag o' ham is 4 bytes
268            # ("h", "a", "m", "\n"), so 4096 lines of that should get us
269            # exactly on the buffer boundary for any power-of-2 buffersize
270            # between 4 and 16384 (inclusive).
271            f = self.open(TESTFN, 'rb')
272            for i in range(nchunks):
273                next(f)
274            testline = testlines.pop(0)
275            try:
276                line = f.readline()
277            except ValueError:
278                self.fail("readline() after next() with supposedly empty "
279                          "iteration-buffer failed anyway")
280            if line != testline:
281                self.fail("readline() after next() with empty buffer "
282                          "failed. Got %r, expected %r" % (line, testline))
283            testline = testlines.pop(0)
284            buf = array("b", b"\x00" * len(testline))
285            try:
286                f.readinto(buf)
287            except ValueError:
288                self.fail("readinto() after next() with supposedly empty "
289                          "iteration-buffer failed anyway")
290            line = buf.tostring()
291            if line != testline:
292                self.fail("readinto() after next() with empty buffer "
293                          "failed. Got %r, expected %r" % (line, testline))
294
295            testline = testlines.pop(0)
296            try:
297                line = f.read(len(testline))
298            except ValueError:
299                self.fail("read() after next() with supposedly empty "
300                          "iteration-buffer failed anyway")
301            if line != testline:
302                self.fail("read() after next() with empty buffer "
303                          "failed. Got %r, expected %r" % (line, testline))
304            try:
305                lines = f.readlines()
306            except ValueError:
307                self.fail("readlines() after next() with supposedly empty "
308                          "iteration-buffer failed anyway")
309            if lines != testlines:
310                self.fail("readlines() after next() with empty buffer "
311                          "failed. Got %r, expected %r" % (line, testline))
312            # Reading after iteration hit EOF shouldn't hurt either
313            f = self.open(TESTFN, 'rb')
314            try:
315                for line in f:
316                    pass
317                try:
318                    f.readline()
319                    f.readinto(buf)
320                    f.read()
321                    f.readlines()
322                except ValueError:
323                    self.fail("read* failed after next() consumed file")
324            finally:
325                f.close()
326        finally:
327            os.unlink(TESTFN)
328
329class COtherFileTests(OtherFileTests):
330    open = io.open
331
332class PyOtherFileTests(OtherFileTests):
333    open = staticmethod(pyio.open)
334
335
336def test_main():
337    # Historically, these tests have been sloppy about removing TESTFN.
338    # So get rid of it no matter what.
339    try:
340        run_unittest(CAutoFileTests, PyAutoFileTests,
341                     COtherFileTests, PyOtherFileTests)
342    finally:
343        if os.path.exists(TESTFN):
344            os.unlink(TESTFN)
345
346if __name__ == '__main__':
347    test_main()
348