test_zipfile.py revision 49259359eebaead3973816731fcb98bff0673900
1# We can test part of the module without zlib.
2try:
3    import zlib
4except ImportError:
5    zlib = None
6
7import os
8import io
9import sys
10import time
11import shutil
12import struct
13import zipfile
14import unittest
15
16from StringIO import StringIO
17from tempfile import TemporaryFile
18from random import randint, random
19from unittest import skipUnless
20
21from test.test_support import TESTFN, TESTFN_UNICODE, TESTFN_ENCODING, \
22                              run_unittest, findfile, unlink, check_warnings
23try:
24    TESTFN_UNICODE.encode(TESTFN_ENCODING)
25except (UnicodeError, TypeError):
26    # Either the file system encoding is None, or the file name
27    # cannot be encoded in the file system encoding.
28    TESTFN_UNICODE = None
29
30TESTFN2 = TESTFN + "2"
31TESTFNDIR = TESTFN + "d"
32FIXEDTEST_SIZE = 1000
33
34SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'),
35                   ('ziptest2dir/_ziptest2', 'qawsedrftg'),
36                   ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'),
37                   ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')]
38
39
40class TestsWithSourceFile(unittest.TestCase):
41    def setUp(self):
42        self.line_gen = ["Zipfile test line %d. random float: %f" % (i, random())
43                         for i in xrange(FIXEDTEST_SIZE)]
44        self.data = '\n'.join(self.line_gen) + '\n'
45
46        # Make a source file with some lines
47        with open(TESTFN, "wb") as fp:
48            fp.write(self.data)
49
50    def make_test_archive(self, f, compression):
51        # Create the ZIP archive
52        with zipfile.ZipFile(f, "w", compression) as zipfp:
53            zipfp.write(TESTFN, "another.name")
54            zipfp.write(TESTFN, TESTFN)
55            zipfp.writestr("strfile", self.data)
56
57    def zip_test(self, f, compression):
58        self.make_test_archive(f, compression)
59
60        # Read the ZIP archive
61        with zipfile.ZipFile(f, "r", compression) as zipfp:
62            self.assertEqual(zipfp.read(TESTFN), self.data)
63            self.assertEqual(zipfp.read("another.name"), self.data)
64            self.assertEqual(zipfp.read("strfile"), self.data)
65
66            # Print the ZIP directory
67            fp = StringIO()
68            stdout = sys.stdout
69            try:
70                sys.stdout = fp
71                zipfp.printdir()
72            finally:
73                sys.stdout = stdout
74
75            directory = fp.getvalue()
76            lines = directory.splitlines()
77            self.assertEqual(len(lines), 4) # Number of files + header
78
79            self.assertIn('File Name', lines[0])
80            self.assertIn('Modified', lines[0])
81            self.assertIn('Size', lines[0])
82
83            fn, date, time_, size = lines[1].split()
84            self.assertEqual(fn, 'another.name')
85            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
86            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
87            self.assertEqual(size, str(len(self.data)))
88
89            # Check the namelist
90            names = zipfp.namelist()
91            self.assertEqual(len(names), 3)
92            self.assertIn(TESTFN, names)
93            self.assertIn("another.name", names)
94            self.assertIn("strfile", names)
95
96            # Check infolist
97            infos = zipfp.infolist()
98            names = [i.filename for i in infos]
99            self.assertEqual(len(names), 3)
100            self.assertIn(TESTFN, names)
101            self.assertIn("another.name", names)
102            self.assertIn("strfile", names)
103            for i in infos:
104                self.assertEqual(i.file_size, len(self.data))
105
106            # check getinfo
107            for nm in (TESTFN, "another.name", "strfile"):
108                info = zipfp.getinfo(nm)
109                self.assertEqual(info.filename, nm)
110                self.assertEqual(info.file_size, len(self.data))
111
112            # Check that testzip doesn't raise an exception
113            zipfp.testzip()
114
115    def test_stored(self):
116        for f in (TESTFN2, TemporaryFile(), StringIO()):
117            self.zip_test(f, zipfile.ZIP_STORED)
118
119    def zip_open_test(self, f, compression):
120        self.make_test_archive(f, compression)
121
122        # Read the ZIP archive
123        with zipfile.ZipFile(f, "r", compression) as zipfp:
124            zipdata1 = []
125            with zipfp.open(TESTFN) as zipopen1:
126                while True:
127                    read_data = zipopen1.read(256)
128                    if not read_data:
129                        break
130                    zipdata1.append(read_data)
131
132            zipdata2 = []
133            with zipfp.open("another.name") as zipopen2:
134                while True:
135                    read_data = zipopen2.read(256)
136                    if not read_data:
137                        break
138                    zipdata2.append(read_data)
139
140            self.assertEqual(''.join(zipdata1), self.data)
141            self.assertEqual(''.join(zipdata2), self.data)
142
143    def test_open_stored(self):
144        for f in (TESTFN2, TemporaryFile(), StringIO()):
145            self.zip_open_test(f, zipfile.ZIP_STORED)
146
147    def test_open_via_zip_info(self):
148        # Create the ZIP archive
149        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
150            zipfp.writestr("name", "foo")
151            with check_warnings(('', UserWarning)):
152                zipfp.writestr("name", "bar")
153            self.assertEqual(zipfp.namelist(), ["name"] * 2)
154
155        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
156            infos = zipfp.infolist()
157            data = ""
158            for info in infos:
159                with zipfp.open(info) as f:
160                    data += f.read()
161            self.assertTrue(data == "foobar" or data == "barfoo")
162            data = ""
163            for info in infos:
164                data += zipfp.read(info)
165            self.assertTrue(data == "foobar" or data == "barfoo")
166
167    def zip_random_open_test(self, f, compression):
168        self.make_test_archive(f, compression)
169
170        # Read the ZIP archive
171        with zipfile.ZipFile(f, "r", compression) as zipfp:
172            zipdata1 = []
173            with zipfp.open(TESTFN) as zipopen1:
174                while True:
175                    read_data = zipopen1.read(randint(1, 1024))
176                    if not read_data:
177                        break
178                    zipdata1.append(read_data)
179
180            self.assertEqual(''.join(zipdata1), self.data)
181
182    def test_random_open_stored(self):
183        for f in (TESTFN2, TemporaryFile(), StringIO()):
184            self.zip_random_open_test(f, zipfile.ZIP_STORED)
185
186    def test_univeral_readaheads(self):
187        f = StringIO()
188
189        data = 'a\r\n' * 16 * 1024
190        with zipfile.ZipFile(f, 'w', zipfile.ZIP_STORED) as zipfp:
191            zipfp.writestr(TESTFN, data)
192
193        data2 = ''
194        with zipfile.ZipFile(f, 'r') as zipfp:
195            with zipfp.open(TESTFN, 'rU') as zipopen:
196                for line in zipopen:
197                    data2 += line
198
199        self.assertEqual(data, data2.replace('\n', '\r\n'))
200
201    def zip_readline_read_test(self, f, compression):
202        self.make_test_archive(f, compression)
203
204        # Read the ZIP archive
205        with zipfile.ZipFile(f, "r") as zipfp:
206            with zipfp.open(TESTFN) as zipopen:
207                data = ''
208                while True:
209                    read = zipopen.readline()
210                    if not read:
211                        break
212                    data += read
213
214                    read = zipopen.read(100)
215                    if not read:
216                        break
217                    data += read
218
219        self.assertEqual(data, self.data)
220
221    def zip_readline_test(self, f, compression):
222        self.make_test_archive(f, compression)
223
224        # Read the ZIP archive
225        with zipfile.ZipFile(f, "r") as zipfp:
226            with zipfp.open(TESTFN) as zipopen:
227                for line in self.line_gen:
228                    linedata = zipopen.readline()
229                    self.assertEqual(linedata, line + '\n')
230
231    def zip_readlines_test(self, f, compression):
232        self.make_test_archive(f, compression)
233
234        # Read the ZIP archive
235        with zipfile.ZipFile(f, "r") as zipfp:
236            with zipfp.open(TESTFN) as zo:
237                ziplines = zo.readlines()
238                for line, zipline in zip(self.line_gen, ziplines):
239                    self.assertEqual(zipline, line + '\n')
240
241    def zip_iterlines_test(self, f, compression):
242        self.make_test_archive(f, compression)
243
244        # Read the ZIP archive
245        with zipfile.ZipFile(f, "r") as zipfp:
246            for line, zipline in zip(self.line_gen, zipfp.open(TESTFN)):
247                self.assertEqual(zipline, line + '\n')
248
249    def test_readline_read_stored(self):
250        # Issue #7610: calls to readline() interleaved with calls to read().
251        for f in (TESTFN2, TemporaryFile(), StringIO()):
252            self.zip_readline_read_test(f, zipfile.ZIP_STORED)
253
254    def test_readline_stored(self):
255        for f in (TESTFN2, TemporaryFile(), StringIO()):
256            self.zip_readline_test(f, zipfile.ZIP_STORED)
257
258    def test_readlines_stored(self):
259        for f in (TESTFN2, TemporaryFile(), StringIO()):
260            self.zip_readlines_test(f, zipfile.ZIP_STORED)
261
262    def test_iterlines_stored(self):
263        for f in (TESTFN2, TemporaryFile(), StringIO()):
264            self.zip_iterlines_test(f, zipfile.ZIP_STORED)
265
266    @skipUnless(zlib, "requires zlib")
267    def test_deflated(self):
268        for f in (TESTFN2, TemporaryFile(), StringIO()):
269            self.zip_test(f, zipfile.ZIP_DEFLATED)
270
271    @skipUnless(zlib, "requires zlib")
272    def test_open_deflated(self):
273        for f in (TESTFN2, TemporaryFile(), StringIO()):
274            self.zip_open_test(f, zipfile.ZIP_DEFLATED)
275
276    @skipUnless(zlib, "requires zlib")
277    def test_random_open_deflated(self):
278        for f in (TESTFN2, TemporaryFile(), StringIO()):
279            self.zip_random_open_test(f, zipfile.ZIP_DEFLATED)
280
281    @skipUnless(zlib, "requires zlib")
282    def test_readline_read_deflated(self):
283        # Issue #7610: calls to readline() interleaved with calls to read().
284        for f in (TESTFN2, TemporaryFile(), StringIO()):
285            self.zip_readline_read_test(f, zipfile.ZIP_DEFLATED)
286
287    @skipUnless(zlib, "requires zlib")
288    def test_readline_deflated(self):
289        for f in (TESTFN2, TemporaryFile(), StringIO()):
290            self.zip_readline_test(f, zipfile.ZIP_DEFLATED)
291
292    @skipUnless(zlib, "requires zlib")
293    def test_readlines_deflated(self):
294        for f in (TESTFN2, TemporaryFile(), StringIO()):
295            self.zip_readlines_test(f, zipfile.ZIP_DEFLATED)
296
297    @skipUnless(zlib, "requires zlib")
298    def test_iterlines_deflated(self):
299        for f in (TESTFN2, TemporaryFile(), StringIO()):
300            self.zip_iterlines_test(f, zipfile.ZIP_DEFLATED)
301
302    @skipUnless(zlib, "requires zlib")
303    def test_low_compression(self):
304        """Check for cases where compressed data is larger than original."""
305        # Create the ZIP archive
306        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp:
307            zipfp.writestr("strfile", '12')
308
309        # Get an open object for strfile
310        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED) as zipfp:
311            with zipfp.open("strfile") as openobj:
312                self.assertEqual(openobj.read(1), '1')
313                self.assertEqual(openobj.read(1), '2')
314
315    def test_absolute_arcnames(self):
316        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
317            zipfp.write(TESTFN, "/absolute")
318
319        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
320            self.assertEqual(zipfp.namelist(), ["absolute"])
321
322    def test_append_to_zip_file(self):
323        """Test appending to an existing zipfile."""
324        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
325            zipfp.write(TESTFN, TESTFN)
326
327        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
328            zipfp.writestr("strfile", self.data)
329            self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"])
330
331    def test_append_to_non_zip_file(self):
332        """Test appending to an existing file that is not a zipfile."""
333        # NOTE: this test fails if len(d) < 22 because of the first
334        # line "fpin.seek(-22, 2)" in _EndRecData
335        data = 'I am not a ZipFile!'*10
336        with open(TESTFN2, 'wb') as f:
337            f.write(data)
338
339        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
340            zipfp.write(TESTFN, TESTFN)
341
342        with open(TESTFN2, 'rb') as f:
343            f.seek(len(data))
344            with zipfile.ZipFile(f, "r") as zipfp:
345                self.assertEqual(zipfp.namelist(), [TESTFN])
346
347    def test_ignores_newline_at_end(self):
348        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
349            zipfp.write(TESTFN, TESTFN)
350        with open(TESTFN2, 'a') as f:
351            f.write("\r\n\00\00\00")
352        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
353            self.assertIsInstance(zipfp, zipfile.ZipFile)
354
355    def test_ignores_stuff_appended_past_comments(self):
356        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
357            zipfp.comment = b"this is a comment"
358            zipfp.write(TESTFN, TESTFN)
359        with open(TESTFN2, 'a') as f:
360            f.write("abcdef\r\n")
361        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
362            self.assertIsInstance(zipfp, zipfile.ZipFile)
363            self.assertEqual(zipfp.comment, b"this is a comment")
364
365    def test_write_default_name(self):
366        """Check that calling ZipFile.write without arcname specified
367        produces the expected result."""
368        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
369            zipfp.write(TESTFN)
370            self.assertEqual(zipfp.read(TESTFN), open(TESTFN).read())
371
372    @skipUnless(zlib, "requires zlib")
373    def test_per_file_compression(self):
374        """Check that files within a Zip archive can have different
375        compression options."""
376        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
377            zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED)
378            zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED)
379            sinfo = zipfp.getinfo('storeme')
380            dinfo = zipfp.getinfo('deflateme')
381            self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED)
382            self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED)
383
384    def test_write_to_readonly(self):
385        """Check that trying to call write() on a readonly ZipFile object
386        raises a RuntimeError."""
387        with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
388            zipfp.writestr("somefile.txt", "bogus")
389
390        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
391            self.assertRaises(RuntimeError, zipfp.write, TESTFN)
392
393    def test_extract(self):
394        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
395            for fpath, fdata in SMALL_TEST_DATA:
396                zipfp.writestr(fpath, fdata)
397
398        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
399            for fpath, fdata in SMALL_TEST_DATA:
400                writtenfile = zipfp.extract(fpath)
401
402                # make sure it was written to the right place
403                correctfile = os.path.join(os.getcwd(), fpath)
404                correctfile = os.path.normpath(correctfile)
405
406                self.assertEqual(writtenfile, correctfile)
407
408                # make sure correct data is in correct file
409                self.assertEqual(fdata, open(writtenfile, "rb").read())
410                os.remove(writtenfile)
411
412        # remove the test file subdirectories
413        shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
414
415    def test_extract_all(self):
416        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
417            for fpath, fdata in SMALL_TEST_DATA:
418                zipfp.writestr(fpath, fdata)
419
420        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
421            zipfp.extractall()
422            for fpath, fdata in SMALL_TEST_DATA:
423                outfile = os.path.join(os.getcwd(), fpath)
424
425                self.assertEqual(fdata, open(outfile, "rb").read())
426                os.remove(outfile)
427
428        # remove the test file subdirectories
429        shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
430
431    def check_file(self, filename, content):
432        self.assertTrue(os.path.isfile(filename))
433        with open(filename, 'rb') as f:
434            self.assertEqual(f.read(), content)
435
436    @skipUnless(TESTFN_UNICODE, "No Unicode filesystem semantics on this platform.")
437    def test_extract_unicode_filenames(self):
438        fnames = [u'foo.txt', os.path.basename(TESTFN_UNICODE)]
439        content = 'Test for unicode filename'
440        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
441            for fname in fnames:
442                zipfp.writestr(fname, content)
443
444        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
445            for fname in fnames:
446                writtenfile = zipfp.extract(fname)
447
448                # make sure it was written to the right place
449                correctfile = os.path.join(os.getcwd(), fname)
450                correctfile = os.path.normpath(correctfile)
451                self.assertEqual(writtenfile, correctfile)
452
453                self.check_file(writtenfile, content)
454                os.remove(writtenfile)
455
456    def test_extract_hackers_arcnames(self):
457        hacknames = [
458            ('../foo/bar', 'foo/bar'),
459            ('foo/../bar', 'foo/bar'),
460            ('foo/../../bar', 'foo/bar'),
461            ('foo/bar/..', 'foo/bar'),
462            ('./../foo/bar', 'foo/bar'),
463            ('/foo/bar', 'foo/bar'),
464            ('/foo/../bar', 'foo/bar'),
465            ('/foo/../../bar', 'foo/bar'),
466        ]
467        if os.path.sep == '\\':
468            hacknames.extend([
469                (r'..\foo\bar', 'foo/bar'),
470                (r'..\/foo\/bar', 'foo/bar'),
471                (r'foo/\..\/bar', 'foo/bar'),
472                (r'foo\/../\bar', 'foo/bar'),
473                (r'C:foo/bar', 'foo/bar'),
474                (r'C:/foo/bar', 'foo/bar'),
475                (r'C://foo/bar', 'foo/bar'),
476                (r'C:\foo\bar', 'foo/bar'),
477                (r'//conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
478                (r'\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
479                (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
480                (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
481                (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
482                (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
483                (r'//?/C:/foo/bar', '_/C_/foo/bar'),
484                (r'\\?\C:\foo\bar', '_/C_/foo/bar'),
485                (r'C:/../C:/foo/bar', 'C_/foo/bar'),
486                (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'),
487                ('../../foo../../ba..r', 'foo/ba..r'),
488            ])
489        else:  # Unix
490            hacknames.extend([
491                ('//foo/bar', 'foo/bar'),
492                ('../../foo../../ba..r', 'foo../ba..r'),
493                (r'foo/..\bar', r'foo/..\bar'),
494            ])
495
496        for arcname, fixedname in hacknames:
497            content = b'foobar' + arcname.encode()
498            with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp:
499                zinfo = zipfile.ZipInfo()
500                # preserve backslashes
501                zinfo.filename = arcname
502                zinfo.external_attr = 0o600 << 16
503                zipfp.writestr(zinfo, content)
504
505            arcname = arcname.replace(os.sep, "/")
506            targetpath = os.path.join('target', 'subdir', 'subsub')
507            correctfile = os.path.join(targetpath, *fixedname.split('/'))
508
509            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
510                writtenfile = zipfp.extract(arcname, targetpath)
511                self.assertEqual(writtenfile, correctfile,
512                                 msg="extract %r" % arcname)
513            self.check_file(correctfile, content)
514            shutil.rmtree('target')
515
516            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
517                zipfp.extractall(targetpath)
518            self.check_file(correctfile, content)
519            shutil.rmtree('target')
520
521            correctfile = os.path.join(os.getcwd(), *fixedname.split('/'))
522
523            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
524                writtenfile = zipfp.extract(arcname)
525                self.assertEqual(writtenfile, correctfile,
526                                 msg="extract %r" % arcname)
527            self.check_file(correctfile, content)
528            shutil.rmtree(fixedname.split('/')[0])
529
530            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
531                zipfp.extractall()
532            self.check_file(correctfile, content)
533            shutil.rmtree(fixedname.split('/')[0])
534
535            os.remove(TESTFN2)
536
537    def test_writestr_compression(self):
538        zipfp = zipfile.ZipFile(TESTFN2, "w")
539        zipfp.writestr("a.txt", "hello world", compress_type=zipfile.ZIP_STORED)
540        if zlib:
541            zipfp.writestr("b.txt", "hello world", compress_type=zipfile.ZIP_DEFLATED)
542
543        info = zipfp.getinfo('a.txt')
544        self.assertEqual(info.compress_type, zipfile.ZIP_STORED)
545
546        if zlib:
547            info = zipfp.getinfo('b.txt')
548            self.assertEqual(info.compress_type, zipfile.ZIP_DEFLATED)
549
550
551    def zip_test_writestr_permissions(self, f, compression):
552        # Make sure that writestr creates files with mode 0600,
553        # when it is passed a name rather than a ZipInfo instance.
554
555        self.make_test_archive(f, compression)
556        with zipfile.ZipFile(f, "r") as zipfp:
557            zinfo = zipfp.getinfo('strfile')
558            self.assertEqual(zinfo.external_attr, 0600 << 16)
559
560    def test_writestr_permissions(self):
561        for f in (TESTFN2, TemporaryFile(), StringIO()):
562            self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED)
563
564    def test_close(self):
565        """Check that the zipfile is closed after the 'with' block."""
566        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
567            for fpath, fdata in SMALL_TEST_DATA:
568                zipfp.writestr(fpath, fdata)
569                self.assertTrue(zipfp.fp is not None, 'zipfp is not open')
570        self.assertTrue(zipfp.fp is None, 'zipfp is not closed')
571
572        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
573            self.assertTrue(zipfp.fp is not None, 'zipfp is not open')
574        self.assertTrue(zipfp.fp is None, 'zipfp is not closed')
575
576    def test_close_on_exception(self):
577        """Check that the zipfile is closed if an exception is raised in the
578        'with' block."""
579        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
580            for fpath, fdata in SMALL_TEST_DATA:
581                zipfp.writestr(fpath, fdata)
582
583        try:
584            with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
585                raise zipfile.BadZipfile()
586        except zipfile.BadZipfile:
587            self.assertTrue(zipfp2.fp is None, 'zipfp is not closed')
588
589    def test_add_file_before_1980(self):
590        # Set atime and mtime to 1970-01-01
591        os.utime(TESTFN, (0, 0))
592        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
593            self.assertRaises(ValueError, zipfp.write, TESTFN)
594
595    def tearDown(self):
596        unlink(TESTFN)
597        unlink(TESTFN2)
598
599
600class TestZip64InSmallFiles(unittest.TestCase):
601    # These tests test the ZIP64 functionality without using large files,
602    # see test_zipfile64 for proper tests.
603
604    def setUp(self):
605        self._limit = zipfile.ZIP64_LIMIT
606        zipfile.ZIP64_LIMIT = 5
607
608        line_gen = ("Test of zipfile line %d." % i
609                    for i in range(0, FIXEDTEST_SIZE))
610        self.data = '\n'.join(line_gen)
611
612        # Make a source file with some lines
613        with open(TESTFN, "wb") as fp:
614            fp.write(self.data)
615
616    def large_file_exception_test(self, f, compression):
617        with zipfile.ZipFile(f, "w", compression) as zipfp:
618            self.assertRaises(zipfile.LargeZipFile,
619                              zipfp.write, TESTFN, "another.name")
620
621    def large_file_exception_test2(self, f, compression):
622        with zipfile.ZipFile(f, "w", compression) as zipfp:
623            self.assertRaises(zipfile.LargeZipFile,
624                              zipfp.writestr, "another.name", self.data)
625
626    def test_large_file_exception(self):
627        for f in (TESTFN2, TemporaryFile(), StringIO()):
628            self.large_file_exception_test(f, zipfile.ZIP_STORED)
629            self.large_file_exception_test2(f, zipfile.ZIP_STORED)
630
631    def zip_test(self, f, compression):
632        # Create the ZIP archive
633        with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp:
634            zipfp.write(TESTFN, "another.name")
635            zipfp.write(TESTFN, TESTFN)
636            zipfp.writestr("strfile", self.data)
637
638        # Read the ZIP archive
639        with zipfile.ZipFile(f, "r", compression) as zipfp:
640            self.assertEqual(zipfp.read(TESTFN), self.data)
641            self.assertEqual(zipfp.read("another.name"), self.data)
642            self.assertEqual(zipfp.read("strfile"), self.data)
643
644            # Print the ZIP directory
645            fp = StringIO()
646            stdout = sys.stdout
647            try:
648                sys.stdout = fp
649                zipfp.printdir()
650            finally:
651                sys.stdout = stdout
652
653            directory = fp.getvalue()
654            lines = directory.splitlines()
655            self.assertEqual(len(lines), 4) # Number of files + header
656
657            self.assertIn('File Name', lines[0])
658            self.assertIn('Modified', lines[0])
659            self.assertIn('Size', lines[0])
660
661            fn, date, time_, size = lines[1].split()
662            self.assertEqual(fn, 'another.name')
663            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
664            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
665            self.assertEqual(size, str(len(self.data)))
666
667            # Check the namelist
668            names = zipfp.namelist()
669            self.assertEqual(len(names), 3)
670            self.assertIn(TESTFN, names)
671            self.assertIn("another.name", names)
672            self.assertIn("strfile", names)
673
674            # Check infolist
675            infos = zipfp.infolist()
676            names = [i.filename for i in infos]
677            self.assertEqual(len(names), 3)
678            self.assertIn(TESTFN, names)
679            self.assertIn("another.name", names)
680            self.assertIn("strfile", names)
681            for i in infos:
682                self.assertEqual(i.file_size, len(self.data))
683
684            # check getinfo
685            for nm in (TESTFN, "another.name", "strfile"):
686                info = zipfp.getinfo(nm)
687                self.assertEqual(info.filename, nm)
688                self.assertEqual(info.file_size, len(self.data))
689
690            # Check that testzip doesn't raise an exception
691            zipfp.testzip()
692
693    def test_stored(self):
694        for f in (TESTFN2, TemporaryFile(), StringIO()):
695            self.zip_test(f, zipfile.ZIP_STORED)
696
697    @skipUnless(zlib, "requires zlib")
698    def test_deflated(self):
699        for f in (TESTFN2, TemporaryFile(), StringIO()):
700            self.zip_test(f, zipfile.ZIP_DEFLATED)
701
702    def test_absolute_arcnames(self):
703        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED,
704                             allowZip64=True) as zipfp:
705            zipfp.write(TESTFN, "/absolute")
706
707        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
708            self.assertEqual(zipfp.namelist(), ["absolute"])
709
710    def tearDown(self):
711        zipfile.ZIP64_LIMIT = self._limit
712        unlink(TESTFN)
713        unlink(TESTFN2)
714
715
716class PyZipFileTests(unittest.TestCase):
717    def test_write_pyfile(self):
718        with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
719            fn = __file__
720            if fn.endswith('.pyc') or fn.endswith('.pyo'):
721                fn = fn[:-1]
722
723            zipfp.writepy(fn)
724
725            bn = os.path.basename(fn)
726            self.assertNotIn(bn, zipfp.namelist())
727            self.assertTrue(bn + 'o' in zipfp.namelist() or
728                            bn + 'c' in zipfp.namelist())
729
730        with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
731            fn = __file__
732            if fn.endswith(('.pyc', '.pyo')):
733                fn = fn[:-1]
734
735            zipfp.writepy(fn, "testpackage")
736
737            bn = "%s/%s" % ("testpackage", os.path.basename(fn))
738            self.assertNotIn(bn, zipfp.namelist())
739            self.assertTrue(bn + 'o' in zipfp.namelist() or
740                            bn + 'c' in zipfp.namelist())
741
742    def test_write_python_package(self):
743        import email
744        packagedir = os.path.dirname(email.__file__)
745
746        with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
747            zipfp.writepy(packagedir)
748
749            # Check for a couple of modules at different levels of the
750            # hierarchy
751            names = zipfp.namelist()
752            self.assertTrue('email/__init__.pyo' in names or
753                            'email/__init__.pyc' in names)
754            self.assertTrue('email/mime/text.pyo' in names or
755                            'email/mime/text.pyc' in names)
756
757    def test_write_python_directory(self):
758        os.mkdir(TESTFN2)
759        try:
760            with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
761                fp.write("print(42)\n")
762
763            with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp:
764                fp.write("print(42 * 42)\n")
765
766            with open(os.path.join(TESTFN2, "mod2.txt"), "w") as fp:
767                fp.write("bla bla bla\n")
768
769            zipfp  = zipfile.PyZipFile(TemporaryFile(), "w")
770            zipfp.writepy(TESTFN2)
771
772            names = zipfp.namelist()
773            self.assertTrue('mod1.pyc' in names or 'mod1.pyo' in names)
774            self.assertTrue('mod2.pyc' in names or 'mod2.pyo' in names)
775            self.assertNotIn('mod2.txt', names)
776
777        finally:
778            shutil.rmtree(TESTFN2)
779
780    def test_write_non_pyfile(self):
781        with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
782            open(TESTFN, 'w').write('most definitely not a python file')
783            self.assertRaises(RuntimeError, zipfp.writepy, TESTFN)
784            os.remove(TESTFN)
785
786
787class OtherTests(unittest.TestCase):
788    zips_with_bad_crc = {
789        zipfile.ZIP_STORED: (
790            b'PK\003\004\024\0\0\0\0\0 \213\212;:r'
791            b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af'
792            b'ilehello,AworldP'
793            b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:'
794            b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0'
795            b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi'
796            b'lePK\005\006\0\0\0\0\001\0\001\0003\000'
797            b'\0\0/\0\0\0\0\0'),
798        zipfile.ZIP_DEFLATED: (
799            b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA'
800            b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
801            b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0'
802            b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n'
803            b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05'
804            b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00'
805            b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00'
806            b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00'),
807    }
808
809    def test_unicode_filenames(self):
810        with zipfile.ZipFile(TESTFN, "w") as zf:
811            zf.writestr(u"foo.txt", "Test for unicode filename")
812            zf.writestr(u"\xf6.txt", "Test for unicode filename")
813            self.assertIsInstance(zf.infolist()[0].filename, unicode)
814
815        with zipfile.ZipFile(TESTFN, "r") as zf:
816            self.assertEqual(zf.filelist[0].filename, "foo.txt")
817            self.assertEqual(zf.filelist[1].filename, u"\xf6.txt")
818
819    def test_create_non_existent_file_for_append(self):
820        if os.path.exists(TESTFN):
821            os.unlink(TESTFN)
822
823        filename = 'testfile.txt'
824        content = 'hello, world. this is some content.'
825
826        try:
827            with zipfile.ZipFile(TESTFN, 'a') as zf:
828                zf.writestr(filename, content)
829        except IOError:
830            self.fail('Could not append data to a non-existent zip file.')
831
832        self.assertTrue(os.path.exists(TESTFN))
833
834        with zipfile.ZipFile(TESTFN, 'r') as zf:
835            self.assertEqual(zf.read(filename), content)
836
837    def test_close_erroneous_file(self):
838        # This test checks that the ZipFile constructor closes the file object
839        # it opens if there's an error in the file.  If it doesn't, the
840        # traceback holds a reference to the ZipFile object and, indirectly,
841        # the file object.
842        # On Windows, this causes the os.unlink() call to fail because the
843        # underlying file is still open.  This is SF bug #412214.
844        #
845        with open(TESTFN, "w") as fp:
846            fp.write("this is not a legal zip file\n")
847        try:
848            zf = zipfile.ZipFile(TESTFN)
849        except zipfile.BadZipfile:
850            pass
851
852    def test_is_zip_erroneous_file(self):
853        """Check that is_zipfile() correctly identifies non-zip files."""
854        # - passing a filename
855        with open(TESTFN, "w") as fp:
856            fp.write("this is not a legal zip file\n")
857        chk = zipfile.is_zipfile(TESTFN)
858        self.assertFalse(chk)
859        # - passing a file object
860        with open(TESTFN, "rb") as fp:
861            chk = zipfile.is_zipfile(fp)
862            self.assertTrue(not chk)
863        # - passing a file-like object
864        fp = StringIO()
865        fp.write("this is not a legal zip file\n")
866        chk = zipfile.is_zipfile(fp)
867        self.assertTrue(not chk)
868        fp.seek(0, 0)
869        chk = zipfile.is_zipfile(fp)
870        self.assertTrue(not chk)
871
872    def test_damaged_zipfile(self):
873        """Check that zipfiles with missing bytes at the end raise BadZipFile."""
874        # - Create a valid zip file
875        fp = io.BytesIO()
876        with zipfile.ZipFile(fp, mode="w") as zipf:
877            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
878        zipfiledata = fp.getvalue()
879
880        # - Now create copies of it missing the last N bytes and make sure
881        #   a BadZipFile exception is raised when we try to open it
882        for N in range(len(zipfiledata)):
883            fp = io.BytesIO(zipfiledata[:N])
884            self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, fp)
885
886    def test_is_zip_valid_file(self):
887        """Check that is_zipfile() correctly identifies zip files."""
888        # - passing a filename
889        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
890            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
891        chk = zipfile.is_zipfile(TESTFN)
892        self.assertTrue(chk)
893        # - passing a file object
894        with open(TESTFN, "rb") as fp:
895            chk = zipfile.is_zipfile(fp)
896            self.assertTrue(chk)
897            fp.seek(0, 0)
898            zip_contents = fp.read()
899        # - passing a file-like object
900        fp = StringIO()
901        fp.write(zip_contents)
902        chk = zipfile.is_zipfile(fp)
903        self.assertTrue(chk)
904        fp.seek(0, 0)
905        chk = zipfile.is_zipfile(fp)
906        self.assertTrue(chk)
907
908    def test_non_existent_file_raises_IOError(self):
909        # make sure we don't raise an AttributeError when a partially-constructed
910        # ZipFile instance is finalized; this tests for regression on SF tracker
911        # bug #403871.
912
913        # The bug we're testing for caused an AttributeError to be raised
914        # when a ZipFile instance was created for a file that did not
915        # exist; the .fp member was not initialized but was needed by the
916        # __del__() method.  Since the AttributeError is in the __del__(),
917        # it is ignored, but the user should be sufficiently annoyed by
918        # the message on the output that regression will be noticed
919        # quickly.
920        self.assertRaises(IOError, zipfile.ZipFile, TESTFN)
921
922    def test_empty_file_raises_BadZipFile(self):
923        with open(TESTFN, 'w') as f:
924            pass
925        self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN)
926
927        with open(TESTFN, 'w') as fp:
928            fp.write("short file")
929        self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN)
930
931    def test_closed_zip_raises_RuntimeError(self):
932        """Verify that testzip() doesn't swallow inappropriate exceptions."""
933        data = StringIO()
934        with zipfile.ZipFile(data, mode="w") as zipf:
935            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
936
937        # This is correct; calling .read on a closed ZipFile should raise
938        # a RuntimeError, and so should calling .testzip.  An earlier
939        # version of .testzip would swallow this exception (and any other)
940        # and report that the first file in the archive was corrupt.
941        self.assertRaises(RuntimeError, zipf.read, "foo.txt")
942        self.assertRaises(RuntimeError, zipf.open, "foo.txt")
943        self.assertRaises(RuntimeError, zipf.testzip)
944        self.assertRaises(RuntimeError, zipf.writestr, "bogus.txt", "bogus")
945        open(TESTFN, 'w').write('zipfile test data')
946        self.assertRaises(RuntimeError, zipf.write, TESTFN)
947
948    def test_bad_constructor_mode(self):
949        """Check that bad modes passed to ZipFile constructor are caught."""
950        self.assertRaises(RuntimeError, zipfile.ZipFile, TESTFN, "q")
951
952    def test_bad_open_mode(self):
953        """Check that bad modes passed to ZipFile.open are caught."""
954        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
955            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
956
957        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
958        # read the data to make sure the file is there
959            zipf.read("foo.txt")
960            self.assertRaises(RuntimeError, zipf.open, "foo.txt", "q")
961
962    def test_read0(self):
963        """Check that calling read(0) on a ZipExtFile object returns an empty
964        string and doesn't advance file pointer."""
965        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
966            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
967            # read the data to make sure the file is there
968            with zipf.open("foo.txt") as f:
969                for i in xrange(FIXEDTEST_SIZE):
970                    self.assertEqual(f.read(0), '')
971
972                self.assertEqual(f.read(), "O, for a Muse of Fire!")
973
974    def test_open_non_existent_item(self):
975        """Check that attempting to call open() for an item that doesn't
976        exist in the archive raises a RuntimeError."""
977        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
978            self.assertRaises(KeyError, zipf.open, "foo.txt", "r")
979
980    def test_bad_compression_mode(self):
981        """Check that bad compression methods passed to ZipFile.open are
982        caught."""
983        self.assertRaises(RuntimeError, zipfile.ZipFile, TESTFN, "w", -1)
984
985    def test_unsupported_compression(self):
986        # data is declared as shrunk, but actually deflated
987        data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00'
988        b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01'
989        b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00'
990        b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
991        b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00'
992        b'/\x00\x00\x00!\x00\x00\x00\x00\x00')
993        with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf:
994            self.assertRaises(NotImplementedError, zipf.open, 'x')
995
996    def test_null_byte_in_filename(self):
997        """Check that a filename containing a null byte is properly
998        terminated."""
999        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1000            zipf.writestr("foo.txt\x00qqq", "O, for a Muse of Fire!")
1001            self.assertEqual(zipf.namelist(), ['foo.txt'])
1002
1003    def test_struct_sizes(self):
1004        """Check that ZIP internal structure sizes are calculated correctly."""
1005        self.assertEqual(zipfile.sizeEndCentDir, 22)
1006        self.assertEqual(zipfile.sizeCentralDir, 46)
1007        self.assertEqual(zipfile.sizeEndCentDir64, 56)
1008        self.assertEqual(zipfile.sizeEndCentDir64Locator, 20)
1009
1010    def test_comments(self):
1011        """Check that comments on the archive are handled properly."""
1012
1013        # check default comment is empty
1014        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1015            self.assertEqual(zipf.comment, '')
1016            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1017
1018        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
1019            self.assertEqual(zipf.comment, '')
1020
1021        # check a simple short comment
1022        comment = 'Bravely taking to his feet, he beat a very brave retreat.'
1023        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1024            zipf.comment = comment
1025            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1026        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
1027            self.assertEqual(zipf.comment, comment)
1028
1029        # check a comment of max length
1030        comment2 = ''.join(['%d' % (i**3 % 10) for i in xrange((1 << 16)-1)])
1031        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1032            zipf.comment = comment2
1033            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1034
1035        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
1036            self.assertEqual(zipf.comment, comment2)
1037
1038        # check a comment that is too long is truncated
1039        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1040            with check_warnings(('', UserWarning)):
1041                zipf.comment = comment2 + 'oops'
1042            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1043        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
1044            self.assertEqual(zipf.comment, comment2)
1045
1046    def test_change_comment_in_empty_archive(self):
1047        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
1048            self.assertFalse(zipf.filelist)
1049            zipf.comment = b"this is a comment"
1050        with zipfile.ZipFile(TESTFN, "r") as zipf:
1051            self.assertEqual(zipf.comment, b"this is a comment")
1052
1053    def test_change_comment_in_nonempty_archive(self):
1054        with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
1055            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1056        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
1057            self.assertTrue(zipf.filelist)
1058            zipf.comment = b"this is a comment"
1059        with zipfile.ZipFile(TESTFN, "r") as zipf:
1060            self.assertEqual(zipf.comment, b"this is a comment")
1061
1062    def check_testzip_with_bad_crc(self, compression):
1063        """Tests that files with bad CRCs return their name from testzip."""
1064        zipdata = self.zips_with_bad_crc[compression]
1065
1066        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
1067            # testzip returns the name of the first corrupt file, or None
1068            self.assertEqual('afile', zipf.testzip())
1069
1070    def test_testzip_with_bad_crc_stored(self):
1071        self.check_testzip_with_bad_crc(zipfile.ZIP_STORED)
1072
1073    @skipUnless(zlib, "requires zlib")
1074    def test_testzip_with_bad_crc_deflated(self):
1075        self.check_testzip_with_bad_crc(zipfile.ZIP_DEFLATED)
1076
1077    def check_read_with_bad_crc(self, compression):
1078        """Tests that files with bad CRCs raise a BadZipfile exception when read."""
1079        zipdata = self.zips_with_bad_crc[compression]
1080
1081        # Using ZipFile.read()
1082        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
1083            self.assertRaises(zipfile.BadZipfile, zipf.read, 'afile')
1084
1085        # Using ZipExtFile.read()
1086        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
1087            with zipf.open('afile', 'r') as corrupt_file:
1088                self.assertRaises(zipfile.BadZipfile, corrupt_file.read)
1089
1090        # Same with small reads (in order to exercise the buffering logic)
1091        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
1092            with zipf.open('afile', 'r') as corrupt_file:
1093                corrupt_file.MIN_READ_SIZE = 2
1094                with self.assertRaises(zipfile.BadZipfile):
1095                    while corrupt_file.read(2):
1096                        pass
1097
1098    def test_read_with_bad_crc_stored(self):
1099        self.check_read_with_bad_crc(zipfile.ZIP_STORED)
1100
1101    @skipUnless(zlib, "requires zlib")
1102    def test_read_with_bad_crc_deflated(self):
1103        self.check_read_with_bad_crc(zipfile.ZIP_DEFLATED)
1104
1105    def check_read_return_size(self, compression):
1106        # Issue #9837: ZipExtFile.read() shouldn't return more bytes
1107        # than requested.
1108        for test_size in (1, 4095, 4096, 4097, 16384):
1109            file_size = test_size + 1
1110            junk = b''.join(struct.pack('B', randint(0, 255))
1111                            for x in range(file_size))
1112            with zipfile.ZipFile(io.BytesIO(), "w", compression) as zipf:
1113                zipf.writestr('foo', junk)
1114                with zipf.open('foo', 'r') as fp:
1115                    buf = fp.read(test_size)
1116                    self.assertEqual(len(buf), test_size)
1117
1118    def test_read_return_size_stored(self):
1119        self.check_read_return_size(zipfile.ZIP_STORED)
1120
1121    @skipUnless(zlib, "requires zlib")
1122    def test_read_return_size_deflated(self):
1123        self.check_read_return_size(zipfile.ZIP_DEFLATED)
1124
1125    def test_empty_zipfile(self):
1126        # Check that creating a file in 'w' or 'a' mode and closing without
1127        # adding any files to the archives creates a valid empty ZIP file
1128        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1129            pass
1130        try:
1131            zipf = zipfile.ZipFile(TESTFN, mode="r")
1132        except zipfile.BadZipfile:
1133            self.fail("Unable to create empty ZIP file in 'w' mode")
1134
1135        with zipfile.ZipFile(TESTFN, mode="a") as zipf:
1136            pass
1137        try:
1138            zipf = zipfile.ZipFile(TESTFN, mode="r")
1139        except:
1140            self.fail("Unable to create empty ZIP file in 'a' mode")
1141
1142    def test_open_empty_file(self):
1143        # Issue 1710703: Check that opening a file with less than 22 bytes
1144        # raises a BadZipfile exception (rather than the previously unhelpful
1145        # IOError)
1146        with open(TESTFN, 'w') as f:
1147            pass
1148        self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN, 'r')
1149
1150    def test_create_zipinfo_before_1980(self):
1151        self.assertRaises(ValueError,
1152                          zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0))
1153
1154    def tearDown(self):
1155        unlink(TESTFN)
1156        unlink(TESTFN2)
1157
1158
1159class DecryptionTests(unittest.TestCase):
1160    """Check that ZIP decryption works. Since the library does not
1161    support encryption at the moment, we use a pre-generated encrypted
1162    ZIP file."""
1163
1164    data = (
1165    'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00'
1166    '\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y'
1167    '\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl'
1168    'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00'
1169    '\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81'
1170    '\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00'
1171    '\x00\x00L\x00\x00\x00\x00\x00' )
1172    data2 = (
1173    'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02'
1174    '\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04'
1175    '\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0'
1176    'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03'
1177    '\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00'
1178    '\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze'
1179    'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01'
1180    '\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' )
1181
1182    plain = 'zipfile.py encryption test'
1183    plain2 = '\x00'*512
1184
1185    def setUp(self):
1186        with open(TESTFN, "wb") as fp:
1187            fp.write(self.data)
1188        self.zip = zipfile.ZipFile(TESTFN, "r")
1189        with open(TESTFN2, "wb") as fp:
1190            fp.write(self.data2)
1191        self.zip2 = zipfile.ZipFile(TESTFN2, "r")
1192
1193    def tearDown(self):
1194        self.zip.close()
1195        os.unlink(TESTFN)
1196        self.zip2.close()
1197        os.unlink(TESTFN2)
1198
1199    def test_no_password(self):
1200        # Reading the encrypted file without password
1201        # must generate a RunTime exception
1202        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
1203        self.assertRaises(RuntimeError, self.zip2.read, "zero")
1204
1205    def test_bad_password(self):
1206        self.zip.setpassword("perl")
1207        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
1208        self.zip2.setpassword("perl")
1209        self.assertRaises(RuntimeError, self.zip2.read, "zero")
1210
1211    @skipUnless(zlib, "requires zlib")
1212    def test_good_password(self):
1213        self.zip.setpassword("python")
1214        self.assertEqual(self.zip.read("test.txt"), self.plain)
1215        self.zip2.setpassword("12345")
1216        self.assertEqual(self.zip2.read("zero"), self.plain2)
1217
1218
1219class TestsWithRandomBinaryFiles(unittest.TestCase):
1220    def setUp(self):
1221        datacount = randint(16, 64)*1024 + randint(1, 1024)
1222        self.data = ''.join(struct.pack('<f', random()*randint(-1000, 1000))
1223                            for i in xrange(datacount))
1224
1225        # Make a source file with some lines
1226        with open(TESTFN, "wb") as fp:
1227            fp.write(self.data)
1228
1229    def tearDown(self):
1230        unlink(TESTFN)
1231        unlink(TESTFN2)
1232
1233    def make_test_archive(self, f, compression):
1234        # Create the ZIP archive
1235        with zipfile.ZipFile(f, "w", compression) as zipfp:
1236            zipfp.write(TESTFN, "another.name")
1237            zipfp.write(TESTFN, TESTFN)
1238
1239    def zip_test(self, f, compression):
1240        self.make_test_archive(f, compression)
1241
1242        # Read the ZIP archive
1243        with zipfile.ZipFile(f, "r", compression) as zipfp:
1244            testdata = zipfp.read(TESTFN)
1245            self.assertEqual(len(testdata), len(self.data))
1246            self.assertEqual(testdata, self.data)
1247            self.assertEqual(zipfp.read("another.name"), self.data)
1248
1249    def test_stored(self):
1250        for f in (TESTFN2, TemporaryFile(), StringIO()):
1251            self.zip_test(f, zipfile.ZIP_STORED)
1252
1253    @skipUnless(zlib, "requires zlib")
1254    def test_deflated(self):
1255        for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
1256            self.zip_test(f, zipfile.ZIP_DEFLATED)
1257
1258    def zip_open_test(self, f, compression):
1259        self.make_test_archive(f, compression)
1260
1261        # Read the ZIP archive
1262        with zipfile.ZipFile(f, "r", compression) as zipfp:
1263            zipdata1 = []
1264            with zipfp.open(TESTFN) as zipopen1:
1265                while True:
1266                    read_data = zipopen1.read(256)
1267                    if not read_data:
1268                        break
1269                    zipdata1.append(read_data)
1270
1271            zipdata2 = []
1272            with zipfp.open("another.name") as zipopen2:
1273                while True:
1274                    read_data = zipopen2.read(256)
1275                    if not read_data:
1276                        break
1277                    zipdata2.append(read_data)
1278
1279            testdata1 = ''.join(zipdata1)
1280            self.assertEqual(len(testdata1), len(self.data))
1281            self.assertEqual(testdata1, self.data)
1282
1283            testdata2 = ''.join(zipdata2)
1284            self.assertEqual(len(testdata2), len(self.data))
1285            self.assertEqual(testdata2, self.data)
1286
1287    def test_open_stored(self):
1288        for f in (TESTFN2, TemporaryFile(), StringIO()):
1289            self.zip_open_test(f, zipfile.ZIP_STORED)
1290
1291    @skipUnless(zlib, "requires zlib")
1292    def test_open_deflated(self):
1293        for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
1294            self.zip_open_test(f, zipfile.ZIP_DEFLATED)
1295
1296    def zip_random_open_test(self, f, compression):
1297        self.make_test_archive(f, compression)
1298
1299        # Read the ZIP archive
1300        with zipfile.ZipFile(f, "r", compression) as zipfp:
1301            zipdata1 = []
1302            with zipfp.open(TESTFN) as zipopen1:
1303                while True:
1304                    read_data = zipopen1.read(randint(1, 1024))
1305                    if not read_data:
1306                        break
1307                    zipdata1.append(read_data)
1308
1309            testdata = ''.join(zipdata1)
1310            self.assertEqual(len(testdata), len(self.data))
1311            self.assertEqual(testdata, self.data)
1312
1313    def test_random_open_stored(self):
1314        for f in (TESTFN2, TemporaryFile(), StringIO()):
1315            self.zip_random_open_test(f, zipfile.ZIP_STORED)
1316
1317    @skipUnless(zlib, "requires zlib")
1318    def test_random_open_deflated(self):
1319        for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
1320            self.zip_random_open_test(f, zipfile.ZIP_DEFLATED)
1321
1322
1323@skipUnless(zlib, "requires zlib")
1324class TestsWithMultipleOpens(unittest.TestCase):
1325    def setUp(self):
1326        # Create the ZIP archive
1327        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp:
1328            zipfp.writestr('ones', '1'*FIXEDTEST_SIZE)
1329            zipfp.writestr('twos', '2'*FIXEDTEST_SIZE)
1330
1331    def test_same_file(self):
1332        # Verify that (when the ZipFile is in control of creating file objects)
1333        # multiple open() calls can be made without interfering with each other.
1334        with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
1335            zopen1 = zipf.open('ones')
1336            zopen2 = zipf.open('ones')
1337            data1 = zopen1.read(500)
1338            data2 = zopen2.read(500)
1339            data1 += zopen1.read(500)
1340            data2 += zopen2.read(500)
1341            self.assertEqual(data1, data2)
1342
1343    def test_different_file(self):
1344        # Verify that (when the ZipFile is in control of creating file objects)
1345        # multiple open() calls can be made without interfering with each other.
1346        with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
1347            with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
1348                data1 = zopen1.read(500)
1349                data2 = zopen2.read(500)
1350                data1 += zopen1.read(500)
1351                data2 += zopen2.read(500)
1352            self.assertEqual(data1, '1'*FIXEDTEST_SIZE)
1353            self.assertEqual(data2, '2'*FIXEDTEST_SIZE)
1354
1355    def test_interleaved(self):
1356        # Verify that (when the ZipFile is in control of creating file objects)
1357        # multiple open() calls can be made without interfering with each other.
1358        with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
1359            with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
1360                data1 = zopen1.read(500)
1361                data2 = zopen2.read(500)
1362                data1 += zopen1.read(500)
1363                data2 += zopen2.read(500)
1364            self.assertEqual(data1, '1'*FIXEDTEST_SIZE)
1365            self.assertEqual(data2, '2'*FIXEDTEST_SIZE)
1366
1367    def tearDown(self):
1368        unlink(TESTFN2)
1369
1370
1371class TestWithDirectory(unittest.TestCase):
1372    def setUp(self):
1373        os.mkdir(TESTFN2)
1374
1375    def test_extract_dir(self):
1376        with zipfile.ZipFile(findfile("zipdir.zip")) as zipf:
1377            zipf.extractall(TESTFN2)
1378        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a")))
1379        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b")))
1380        self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c")))
1381
1382    def test_bug_6050(self):
1383        # Extraction should succeed if directories already exist
1384        os.mkdir(os.path.join(TESTFN2, "a"))
1385        self.test_extract_dir()
1386
1387    def test_store_dir(self):
1388        os.mkdir(os.path.join(TESTFN2, "x"))
1389        zipf = zipfile.ZipFile(TESTFN, "w")
1390        zipf.write(os.path.join(TESTFN2, "x"), "x")
1391        self.assertTrue(zipf.filelist[0].filename.endswith("x/"))
1392
1393    def tearDown(self):
1394        shutil.rmtree(TESTFN2)
1395        if os.path.exists(TESTFN):
1396            unlink(TESTFN)
1397
1398
1399class UniversalNewlineTests(unittest.TestCase):
1400    def setUp(self):
1401        self.line_gen = ["Test of zipfile line %d." % i
1402                         for i in xrange(FIXEDTEST_SIZE)]
1403        self.seps = ('\r', '\r\n', '\n')
1404        self.arcdata, self.arcfiles = {}, {}
1405        for n, s in enumerate(self.seps):
1406            self.arcdata[s] = s.join(self.line_gen) + s
1407            self.arcfiles[s] = '%s-%d' % (TESTFN, n)
1408            open(self.arcfiles[s], "wb").write(self.arcdata[s])
1409
1410    def make_test_archive(self, f, compression):
1411        # Create the ZIP archive
1412        with zipfile.ZipFile(f, "w", compression) as zipfp:
1413            for fn in self.arcfiles.values():
1414                zipfp.write(fn, fn)
1415
1416    def read_test(self, f, compression):
1417        self.make_test_archive(f, compression)
1418
1419        # Read the ZIP archive
1420        with zipfile.ZipFile(f, "r") as zipfp:
1421            for sep, fn in self.arcfiles.items():
1422                with zipfp.open(fn, "rU") as fp:
1423                    zipdata = fp.read()
1424                self.assertEqual(self.arcdata[sep], zipdata)
1425
1426    def readline_read_test(self, f, compression):
1427        self.make_test_archive(f, compression)
1428
1429        # Read the ZIP archive
1430        zipfp = zipfile.ZipFile(f, "r")
1431        for sep, fn in self.arcfiles.items():
1432            with zipfp.open(fn, "rU") as zipopen:
1433                data = ''
1434                while True:
1435                    read = zipopen.readline()
1436                    if not read:
1437                        break
1438                    data += read
1439
1440                    read = zipopen.read(5)
1441                    if not read:
1442                        break
1443                    data += read
1444
1445            self.assertEqual(data, self.arcdata['\n'])
1446
1447        zipfp.close()
1448
1449    def readline_test(self, f, compression):
1450        self.make_test_archive(f, compression)
1451
1452        # Read the ZIP archive
1453        with zipfile.ZipFile(f, "r") as zipfp:
1454            for sep, fn in self.arcfiles.items():
1455                with zipfp.open(fn, "rU") as zipopen:
1456                    for line in self.line_gen:
1457                        linedata = zipopen.readline()
1458                        self.assertEqual(linedata, line + '\n')
1459
1460    def readlines_test(self, f, compression):
1461        self.make_test_archive(f, compression)
1462
1463        # Read the ZIP archive
1464        with zipfile.ZipFile(f, "r") as zipfp:
1465            for sep, fn in self.arcfiles.items():
1466                with zipfp.open(fn, "rU") as fp:
1467                    ziplines = fp.readlines()
1468                for line, zipline in zip(self.line_gen, ziplines):
1469                    self.assertEqual(zipline, line + '\n')
1470
1471    def iterlines_test(self, f, compression):
1472        self.make_test_archive(f, compression)
1473
1474        # Read the ZIP archive
1475        with zipfile.ZipFile(f, "r") as zipfp:
1476            for sep, fn in self.arcfiles.items():
1477                for line, zipline in zip(self.line_gen, zipfp.open(fn, "rU")):
1478                    self.assertEqual(zipline, line + '\n')
1479
1480    def test_read_stored(self):
1481        for f in (TESTFN2, TemporaryFile(), StringIO()):
1482            self.read_test(f, zipfile.ZIP_STORED)
1483
1484    def test_readline_read_stored(self):
1485        # Issue #7610: calls to readline() interleaved with calls to read().
1486        for f in (TESTFN2, TemporaryFile(), StringIO()):
1487            self.readline_read_test(f, zipfile.ZIP_STORED)
1488
1489    def test_readline_stored(self):
1490        for f in (TESTFN2, TemporaryFile(), StringIO()):
1491            self.readline_test(f, zipfile.ZIP_STORED)
1492
1493    def test_readlines_stored(self):
1494        for f in (TESTFN2, TemporaryFile(), StringIO()):
1495            self.readlines_test(f, zipfile.ZIP_STORED)
1496
1497    def test_iterlines_stored(self):
1498        for f in (TESTFN2, TemporaryFile(), StringIO()):
1499            self.iterlines_test(f, zipfile.ZIP_STORED)
1500
1501    @skipUnless(zlib, "requires zlib")
1502    def test_read_deflated(self):
1503        for f in (TESTFN2, TemporaryFile(), StringIO()):
1504            self.read_test(f, zipfile.ZIP_DEFLATED)
1505
1506    @skipUnless(zlib, "requires zlib")
1507    def test_readline_read_deflated(self):
1508        # Issue #7610: calls to readline() interleaved with calls to read().
1509        for f in (TESTFN2, TemporaryFile(), StringIO()):
1510            self.readline_read_test(f, zipfile.ZIP_DEFLATED)
1511
1512    @skipUnless(zlib, "requires zlib")
1513    def test_readline_deflated(self):
1514        for f in (TESTFN2, TemporaryFile(), StringIO()):
1515            self.readline_test(f, zipfile.ZIP_DEFLATED)
1516
1517    @skipUnless(zlib, "requires zlib")
1518    def test_readlines_deflated(self):
1519        for f in (TESTFN2, TemporaryFile(), StringIO()):
1520            self.readlines_test(f, zipfile.ZIP_DEFLATED)
1521
1522    @skipUnless(zlib, "requires zlib")
1523    def test_iterlines_deflated(self):
1524        for f in (TESTFN2, TemporaryFile(), StringIO()):
1525            self.iterlines_test(f, zipfile.ZIP_DEFLATED)
1526
1527    def tearDown(self):
1528        for sep, fn in self.arcfiles.items():
1529            os.remove(fn)
1530        unlink(TESTFN)
1531        unlink(TESTFN2)
1532
1533
1534def test_main():
1535    run_unittest(TestsWithSourceFile, TestZip64InSmallFiles, OtherTests,
1536                 PyZipFileTests, DecryptionTests, TestsWithMultipleOpens,
1537                 TestWithDirectory, UniversalNewlineTests,
1538                 TestsWithRandomBinaryFiles)
1539
1540if __name__ == "__main__":
1541    test_main()
1542