test_zipfile.py revision 0d654335689b88defdc6403624e3bfa8c0fd108b
1# We can test part of the module without zlib.
2try:
3    import zlib
4except ImportError:
5    zlib = None
6
7import os
8import io
9import sys
10import time
11import shutil
12import struct
13import zipfile
14import unittest
15
16from StringIO import StringIO
17from tempfile import TemporaryFile
18from random import randint, random
19from unittest import skipUnless
20
21from test.test_support import TESTFN, run_unittest, findfile, unlink
22
23TESTFN2 = TESTFN + "2"
24TESTFNDIR = TESTFN + "d"
25FIXEDTEST_SIZE = 1000
26
27SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'),
28                   ('ziptest2dir/_ziptest2', 'qawsedrftg'),
29                   ('/ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'),
30                   ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')]
31
32
33class TestsWithSourceFile(unittest.TestCase):
34    def setUp(self):
35        self.line_gen = ["Zipfile test line %d. random float: %f" % (i, random())
36                         for i in xrange(FIXEDTEST_SIZE)]
37        self.data = '\n'.join(self.line_gen) + '\n'
38
39        # Make a source file with some lines
40        with open(TESTFN, "wb") as fp:
41            fp.write(self.data)
42
43    def make_test_archive(self, f, compression):
44        # Create the ZIP archive
45        with zipfile.ZipFile(f, "w", compression) as zipfp:
46            zipfp.write(TESTFN, "another.name")
47            zipfp.write(TESTFN, TESTFN)
48            zipfp.writestr("strfile", self.data)
49
50    def zip_test(self, f, compression):
51        self.make_test_archive(f, compression)
52
53        # Read the ZIP archive
54        with zipfile.ZipFile(f, "r", compression) as zipfp:
55            self.assertEqual(zipfp.read(TESTFN), self.data)
56            self.assertEqual(zipfp.read("another.name"), self.data)
57            self.assertEqual(zipfp.read("strfile"), self.data)
58
59            # Print the ZIP directory
60            fp = StringIO()
61            stdout = sys.stdout
62            try:
63                sys.stdout = fp
64                zipfp.printdir()
65            finally:
66                sys.stdout = stdout
67
68            directory = fp.getvalue()
69            lines = directory.splitlines()
70            self.assertEqual(len(lines), 4) # Number of files + header
71
72            self.assertIn('File Name', lines[0])
73            self.assertIn('Modified', lines[0])
74            self.assertIn('Size', lines[0])
75
76            fn, date, time_, size = lines[1].split()
77            self.assertEqual(fn, 'another.name')
78            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
79            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
80            self.assertEqual(size, str(len(self.data)))
81
82            # Check the namelist
83            names = zipfp.namelist()
84            self.assertEqual(len(names), 3)
85            self.assertIn(TESTFN, names)
86            self.assertIn("another.name", names)
87            self.assertIn("strfile", names)
88
89            # Check infolist
90            infos = zipfp.infolist()
91            names = [i.filename for i in infos]
92            self.assertEqual(len(names), 3)
93            self.assertIn(TESTFN, names)
94            self.assertIn("another.name", names)
95            self.assertIn("strfile", names)
96            for i in infos:
97                self.assertEqual(i.file_size, len(self.data))
98
99            # check getinfo
100            for nm in (TESTFN, "another.name", "strfile"):
101                info = zipfp.getinfo(nm)
102                self.assertEqual(info.filename, nm)
103                self.assertEqual(info.file_size, len(self.data))
104
105            # Check that testzip doesn't raise an exception
106            zipfp.testzip()
107
108    def test_stored(self):
109        for f in (TESTFN2, TemporaryFile(), StringIO()):
110            self.zip_test(f, zipfile.ZIP_STORED)
111
112    def zip_open_test(self, f, compression):
113        self.make_test_archive(f, compression)
114
115        # Read the ZIP archive
116        with zipfile.ZipFile(f, "r", compression) as zipfp:
117            zipdata1 = []
118            with zipfp.open(TESTFN) as zipopen1:
119                while True:
120                    read_data = zipopen1.read(256)
121                    if not read_data:
122                        break
123                    zipdata1.append(read_data)
124
125            zipdata2 = []
126            with zipfp.open("another.name") as zipopen2:
127                while True:
128                    read_data = zipopen2.read(256)
129                    if not read_data:
130                        break
131                    zipdata2.append(read_data)
132
133            self.assertEqual(''.join(zipdata1), self.data)
134            self.assertEqual(''.join(zipdata2), self.data)
135
136    def test_open_stored(self):
137        for f in (TESTFN2, TemporaryFile(), StringIO()):
138            self.zip_open_test(f, zipfile.ZIP_STORED)
139
140    def test_open_via_zip_info(self):
141        # Create the ZIP archive
142        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
143            zipfp.writestr("name", "foo")
144            zipfp.writestr("name", "bar")
145
146        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
147            infos = zipfp.infolist()
148            data = ""
149            for info in infos:
150                with zipfp.open(info) as f:
151                    data += f.read()
152            self.assertTrue(data == "foobar" or data == "barfoo")
153            data = ""
154            for info in infos:
155                data += zipfp.read(info)
156            self.assertTrue(data == "foobar" or data == "barfoo")
157
158    def zip_random_open_test(self, f, compression):
159        self.make_test_archive(f, compression)
160
161        # Read the ZIP archive
162        with zipfile.ZipFile(f, "r", compression) as zipfp:
163            zipdata1 = []
164            with zipfp.open(TESTFN) as zipopen1:
165                while True:
166                    read_data = zipopen1.read(randint(1, 1024))
167                    if not read_data:
168                        break
169                    zipdata1.append(read_data)
170
171            self.assertEqual(''.join(zipdata1), self.data)
172
173    def test_random_open_stored(self):
174        for f in (TESTFN2, TemporaryFile(), StringIO()):
175            self.zip_random_open_test(f, zipfile.ZIP_STORED)
176
177    def test_univeral_readaheads(self):
178        f = StringIO()
179
180        data = 'a\r\n' * 16 * 1024
181        with zipfile.ZipFile(f, 'w', zipfile.ZIP_STORED) as zipfp:
182            zipfp.writestr(TESTFN, data)
183
184        data2 = ''
185        with zipfile.ZipFile(f, 'r') as zipfp:
186            with zipfp.open(TESTFN, 'rU') as zipopen:
187                for line in zipopen:
188                    data2 += line
189
190        self.assertEqual(data, data2.replace('\n', '\r\n'))
191
192    def zip_readline_read_test(self, f, compression):
193        self.make_test_archive(f, compression)
194
195        # Read the ZIP archive
196        with zipfile.ZipFile(f, "r") as zipfp:
197            with zipfp.open(TESTFN) as zipopen:
198                data = ''
199                while True:
200                    read = zipopen.readline()
201                    if not read:
202                        break
203                    data += read
204
205                    read = zipopen.read(100)
206                    if not read:
207                        break
208                    data += read
209
210        self.assertEqual(data, self.data)
211
212    def zip_readline_test(self, f, compression):
213        self.make_test_archive(f, compression)
214
215        # Read the ZIP archive
216        with zipfile.ZipFile(f, "r") as zipfp:
217            with zipfp.open(TESTFN) as zipopen:
218                for line in self.line_gen:
219                    linedata = zipopen.readline()
220                    self.assertEqual(linedata, line + '\n')
221
222    def zip_readlines_test(self, f, compression):
223        self.make_test_archive(f, compression)
224
225        # Read the ZIP archive
226        with zipfile.ZipFile(f, "r") as zipfp:
227            with zipfp.open(TESTFN) as zo:
228                ziplines = zo.readlines()
229                for line, zipline in zip(self.line_gen, ziplines):
230                    self.assertEqual(zipline, line + '\n')
231
232    def zip_iterlines_test(self, f, compression):
233        self.make_test_archive(f, compression)
234
235        # Read the ZIP archive
236        with zipfile.ZipFile(f, "r") as zipfp:
237            for line, zipline in zip(self.line_gen, zipfp.open(TESTFN)):
238                self.assertEqual(zipline, line + '\n')
239
240    def test_readline_read_stored(self):
241        # Issue #7610: calls to readline() interleaved with calls to read().
242        for f in (TESTFN2, TemporaryFile(), StringIO()):
243            self.zip_readline_read_test(f, zipfile.ZIP_STORED)
244
245    def test_readline_stored(self):
246        for f in (TESTFN2, TemporaryFile(), StringIO()):
247            self.zip_readline_test(f, zipfile.ZIP_STORED)
248
249    def test_readlines_stored(self):
250        for f in (TESTFN2, TemporaryFile(), StringIO()):
251            self.zip_readlines_test(f, zipfile.ZIP_STORED)
252
253    def test_iterlines_stored(self):
254        for f in (TESTFN2, TemporaryFile(), StringIO()):
255            self.zip_iterlines_test(f, zipfile.ZIP_STORED)
256
257    @skipUnless(zlib, "requires zlib")
258    def test_deflated(self):
259        for f in (TESTFN2, TemporaryFile(), StringIO()):
260            self.zip_test(f, zipfile.ZIP_DEFLATED)
261
262    @skipUnless(zlib, "requires zlib")
263    def test_open_deflated(self):
264        for f in (TESTFN2, TemporaryFile(), StringIO()):
265            self.zip_open_test(f, zipfile.ZIP_DEFLATED)
266
267    @skipUnless(zlib, "requires zlib")
268    def test_random_open_deflated(self):
269        for f in (TESTFN2, TemporaryFile(), StringIO()):
270            self.zip_random_open_test(f, zipfile.ZIP_DEFLATED)
271
272    @skipUnless(zlib, "requires zlib")
273    def test_readline_read_deflated(self):
274        # Issue #7610: calls to readline() interleaved with calls to read().
275        for f in (TESTFN2, TemporaryFile(), StringIO()):
276            self.zip_readline_read_test(f, zipfile.ZIP_DEFLATED)
277
278    @skipUnless(zlib, "requires zlib")
279    def test_readline_deflated(self):
280        for f in (TESTFN2, TemporaryFile(), StringIO()):
281            self.zip_readline_test(f, zipfile.ZIP_DEFLATED)
282
283    @skipUnless(zlib, "requires zlib")
284    def test_readlines_deflated(self):
285        for f in (TESTFN2, TemporaryFile(), StringIO()):
286            self.zip_readlines_test(f, zipfile.ZIP_DEFLATED)
287
288    @skipUnless(zlib, "requires zlib")
289    def test_iterlines_deflated(self):
290        for f in (TESTFN2, TemporaryFile(), StringIO()):
291            self.zip_iterlines_test(f, zipfile.ZIP_DEFLATED)
292
293    @skipUnless(zlib, "requires zlib")
294    def test_low_compression(self):
295        """Check for cases where compressed data is larger than original."""
296        # Create the ZIP archive
297        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp:
298            zipfp.writestr("strfile", '12')
299
300        # Get an open object for strfile
301        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED) as zipfp:
302            with zipfp.open("strfile") as openobj:
303                self.assertEqual(openobj.read(1), '1')
304                self.assertEqual(openobj.read(1), '2')
305
306    def test_absolute_arcnames(self):
307        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
308            zipfp.write(TESTFN, "/absolute")
309
310        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
311            self.assertEqual(zipfp.namelist(), ["absolute"])
312
313    def test_append_to_zip_file(self):
314        """Test appending to an existing zipfile."""
315        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
316            zipfp.write(TESTFN, TESTFN)
317
318        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
319            zipfp.writestr("strfile", self.data)
320            self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"])
321
322    def test_append_to_non_zip_file(self):
323        """Test appending to an existing file that is not a zipfile."""
324        # NOTE: this test fails if len(d) < 22 because of the first
325        # line "fpin.seek(-22, 2)" in _EndRecData
326        data = 'I am not a ZipFile!'*10
327        with open(TESTFN2, 'wb') as f:
328            f.write(data)
329
330        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
331            zipfp.write(TESTFN, TESTFN)
332
333        with open(TESTFN2, 'rb') as f:
334            f.seek(len(data))
335            with zipfile.ZipFile(f, "r") as zipfp:
336                self.assertEqual(zipfp.namelist(), [TESTFN])
337
338    def test_write_default_name(self):
339        """Check that calling ZipFile.write without arcname specified
340        produces the expected result."""
341        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
342            zipfp.write(TESTFN)
343            self.assertEqual(zipfp.read(TESTFN), open(TESTFN).read())
344
345    @skipUnless(zlib, "requires zlib")
346    def test_per_file_compression(self):
347        """Check that files within a Zip archive can have different
348        compression options."""
349        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
350            zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED)
351            zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED)
352            sinfo = zipfp.getinfo('storeme')
353            dinfo = zipfp.getinfo('deflateme')
354            self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED)
355            self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED)
356
357    def test_write_to_readonly(self):
358        """Check that trying to call write() on a readonly ZipFile object
359        raises a RuntimeError."""
360        with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
361            zipfp.writestr("somefile.txt", "bogus")
362
363        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
364            self.assertRaises(RuntimeError, zipfp.write, TESTFN)
365
366    def test_extract(self):
367        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
368            for fpath, fdata in SMALL_TEST_DATA:
369                zipfp.writestr(fpath, fdata)
370
371        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
372            for fpath, fdata in SMALL_TEST_DATA:
373                writtenfile = zipfp.extract(fpath)
374
375                # make sure it was written to the right place
376                if os.path.isabs(fpath):
377                    correctfile = os.path.join(os.getcwd(), fpath[1:])
378                else:
379                    correctfile = os.path.join(os.getcwd(), fpath)
380                correctfile = os.path.normpath(correctfile)
381
382                self.assertEqual(writtenfile, correctfile)
383
384                # make sure correct data is in correct file
385                self.assertEqual(fdata, open(writtenfile, "rb").read())
386                os.remove(writtenfile)
387
388        # remove the test file subdirectories
389        shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
390
391    def test_extract_all(self):
392        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
393            for fpath, fdata in SMALL_TEST_DATA:
394                zipfp.writestr(fpath, fdata)
395
396        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
397            zipfp.extractall()
398            for fpath, fdata in SMALL_TEST_DATA:
399                if os.path.isabs(fpath):
400                    outfile = os.path.join(os.getcwd(), fpath[1:])
401                else:
402                    outfile = os.path.join(os.getcwd(), fpath)
403
404                self.assertEqual(fdata, open(outfile, "rb").read())
405                os.remove(outfile)
406
407        # remove the test file subdirectories
408        shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
409
410    def test_writestr_compression(self):
411        zipfp = zipfile.ZipFile(TESTFN2, "w")
412        zipfp.writestr("a.txt", "hello world", compress_type=zipfile.ZIP_STORED)
413        if zlib:
414            zipfp.writestr("b.txt", "hello world", compress_type=zipfile.ZIP_DEFLATED)
415
416        info = zipfp.getinfo('a.txt')
417        self.assertEqual(info.compress_type, zipfile.ZIP_STORED)
418
419        if zlib:
420            info = zipfp.getinfo('b.txt')
421            self.assertEqual(info.compress_type, zipfile.ZIP_DEFLATED)
422
423
424    def zip_test_writestr_permissions(self, f, compression):
425        # Make sure that writestr creates files with mode 0600,
426        # when it is passed a name rather than a ZipInfo instance.
427
428        self.make_test_archive(f, compression)
429        with zipfile.ZipFile(f, "r") as zipfp:
430            zinfo = zipfp.getinfo('strfile')
431            self.assertEqual(zinfo.external_attr, 0600 << 16)
432
433    def test_writestr_permissions(self):
434        for f in (TESTFN2, TemporaryFile(), StringIO()):
435            self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED)
436
437    def test_close(self):
438        """Check that the zipfile is closed after the 'with' block."""
439        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
440            for fpath, fdata in SMALL_TEST_DATA:
441                zipfp.writestr(fpath, fdata)
442                self.assertTrue(zipfp.fp is not None, 'zipfp is not open')
443        self.assertTrue(zipfp.fp is None, 'zipfp is not closed')
444
445        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
446            self.assertTrue(zipfp.fp is not None, 'zipfp is not open')
447        self.assertTrue(zipfp.fp is None, 'zipfp is not closed')
448
449    def test_close_on_exception(self):
450        """Check that the zipfile is closed if an exception is raised in the
451        'with' block."""
452        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
453            for fpath, fdata in SMALL_TEST_DATA:
454                zipfp.writestr(fpath, fdata)
455
456        try:
457            with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
458                raise zipfile.BadZipfile()
459        except zipfile.BadZipfile:
460            self.assertTrue(zipfp2.fp is None, 'zipfp is not closed')
461
462    def tearDown(self):
463        unlink(TESTFN)
464        unlink(TESTFN2)
465
466
467class TestZip64InSmallFiles(unittest.TestCase):
468    # These tests test the ZIP64 functionality without using large files,
469    # see test_zipfile64 for proper tests.
470
471    def setUp(self):
472        self._limit = zipfile.ZIP64_LIMIT
473        zipfile.ZIP64_LIMIT = 5
474
475        line_gen = ("Test of zipfile line %d." % i
476                    for i in range(0, FIXEDTEST_SIZE))
477        self.data = '\n'.join(line_gen)
478
479        # Make a source file with some lines
480        with open(TESTFN, "wb") as fp:
481            fp.write(self.data)
482
483    def large_file_exception_test(self, f, compression):
484        with zipfile.ZipFile(f, "w", compression) as zipfp:
485            self.assertRaises(zipfile.LargeZipFile,
486                              zipfp.write, TESTFN, "another.name")
487
488    def large_file_exception_test2(self, f, compression):
489        with zipfile.ZipFile(f, "w", compression) as zipfp:
490            self.assertRaises(zipfile.LargeZipFile,
491                              zipfp.writestr, "another.name", self.data)
492
493    def test_large_file_exception(self):
494        for f in (TESTFN2, TemporaryFile(), StringIO()):
495            self.large_file_exception_test(f, zipfile.ZIP_STORED)
496            self.large_file_exception_test2(f, zipfile.ZIP_STORED)
497
498    def zip_test(self, f, compression):
499        # Create the ZIP archive
500        with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp:
501            zipfp.write(TESTFN, "another.name")
502            zipfp.write(TESTFN, TESTFN)
503            zipfp.writestr("strfile", self.data)
504
505        # Read the ZIP archive
506        with zipfile.ZipFile(f, "r", compression) as zipfp:
507            self.assertEqual(zipfp.read(TESTFN), self.data)
508            self.assertEqual(zipfp.read("another.name"), self.data)
509            self.assertEqual(zipfp.read("strfile"), self.data)
510
511            # Print the ZIP directory
512            fp = StringIO()
513            stdout = sys.stdout
514            try:
515                sys.stdout = fp
516                zipfp.printdir()
517            finally:
518                sys.stdout = stdout
519
520            directory = fp.getvalue()
521            lines = directory.splitlines()
522            self.assertEqual(len(lines), 4) # Number of files + header
523
524            self.assertIn('File Name', lines[0])
525            self.assertIn('Modified', lines[0])
526            self.assertIn('Size', lines[0])
527
528            fn, date, time_, size = lines[1].split()
529            self.assertEqual(fn, 'another.name')
530            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
531            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
532            self.assertEqual(size, str(len(self.data)))
533
534            # Check the namelist
535            names = zipfp.namelist()
536            self.assertEqual(len(names), 3)
537            self.assertIn(TESTFN, names)
538            self.assertIn("another.name", names)
539            self.assertIn("strfile", names)
540
541            # Check infolist
542            infos = zipfp.infolist()
543            names = [i.filename for i in infos]
544            self.assertEqual(len(names), 3)
545            self.assertIn(TESTFN, names)
546            self.assertIn("another.name", names)
547            self.assertIn("strfile", names)
548            for i in infos:
549                self.assertEqual(i.file_size, len(self.data))
550
551            # check getinfo
552            for nm in (TESTFN, "another.name", "strfile"):
553                info = zipfp.getinfo(nm)
554                self.assertEqual(info.filename, nm)
555                self.assertEqual(info.file_size, len(self.data))
556
557            # Check that testzip doesn't raise an exception
558            zipfp.testzip()
559
560    def test_stored(self):
561        for f in (TESTFN2, TemporaryFile(), StringIO()):
562            self.zip_test(f, zipfile.ZIP_STORED)
563
564    @skipUnless(zlib, "requires zlib")
565    def test_deflated(self):
566        for f in (TESTFN2, TemporaryFile(), StringIO()):
567            self.zip_test(f, zipfile.ZIP_DEFLATED)
568
569    def test_absolute_arcnames(self):
570        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED,
571                             allowZip64=True) as zipfp:
572            zipfp.write(TESTFN, "/absolute")
573
574        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
575            self.assertEqual(zipfp.namelist(), ["absolute"])
576
577    def tearDown(self):
578        zipfile.ZIP64_LIMIT = self._limit
579        unlink(TESTFN)
580        unlink(TESTFN2)
581
582
583class PyZipFileTests(unittest.TestCase):
584    def test_write_pyfile(self):
585        with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
586            fn = __file__
587            if fn.endswith('.pyc') or fn.endswith('.pyo'):
588                fn = fn[:-1]
589
590            zipfp.writepy(fn)
591
592            bn = os.path.basename(fn)
593            self.assertNotIn(bn, zipfp.namelist())
594            self.assertTrue(bn + 'o' in zipfp.namelist() or
595                            bn + 'c' in zipfp.namelist())
596
597        with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
598            fn = __file__
599            if fn.endswith(('.pyc', '.pyo')):
600                fn = fn[:-1]
601
602            zipfp.writepy(fn, "testpackage")
603
604            bn = "%s/%s" % ("testpackage", os.path.basename(fn))
605            self.assertNotIn(bn, zipfp.namelist())
606            self.assertTrue(bn + 'o' in zipfp.namelist() or
607                            bn + 'c' in zipfp.namelist())
608
609    def test_write_python_package(self):
610        import email
611        packagedir = os.path.dirname(email.__file__)
612
613        with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
614            zipfp.writepy(packagedir)
615
616            # Check for a couple of modules at different levels of the
617            # hierarchy
618            names = zipfp.namelist()
619            self.assertTrue('email/__init__.pyo' in names or
620                            'email/__init__.pyc' in names)
621            self.assertTrue('email/mime/text.pyo' in names or
622                            'email/mime/text.pyc' in names)
623
624    def test_write_python_directory(self):
625        os.mkdir(TESTFN2)
626        try:
627            with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
628                fp.write("print(42)\n")
629
630            with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp:
631                fp.write("print(42 * 42)\n")
632
633            with open(os.path.join(TESTFN2, "mod2.txt"), "w") as fp:
634                fp.write("bla bla bla\n")
635
636            zipfp  = zipfile.PyZipFile(TemporaryFile(), "w")
637            zipfp.writepy(TESTFN2)
638
639            names = zipfp.namelist()
640            self.assertTrue('mod1.pyc' in names or 'mod1.pyo' in names)
641            self.assertTrue('mod2.pyc' in names or 'mod2.pyo' in names)
642            self.assertNotIn('mod2.txt', names)
643
644        finally:
645            shutil.rmtree(TESTFN2)
646
647    def test_write_non_pyfile(self):
648        with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
649            open(TESTFN, 'w').write('most definitely not a python file')
650            self.assertRaises(RuntimeError, zipfp.writepy, TESTFN)
651            os.remove(TESTFN)
652
653
654class OtherTests(unittest.TestCase):
655    zips_with_bad_crc = {
656        zipfile.ZIP_STORED: (
657            b'PK\003\004\024\0\0\0\0\0 \213\212;:r'
658            b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af'
659            b'ilehello,AworldP'
660            b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:'
661            b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0'
662            b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi'
663            b'lePK\005\006\0\0\0\0\001\0\001\0003\000'
664            b'\0\0/\0\0\0\0\0'),
665        zipfile.ZIP_DEFLATED: (
666            b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA'
667            b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
668            b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0'
669            b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n'
670            b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05'
671            b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00'
672            b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00'
673            b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00'),
674    }
675
676    def test_unicode_filenames(self):
677        with zipfile.ZipFile(TESTFN, "w") as zf:
678            zf.writestr(u"foo.txt", "Test for unicode filename")
679            zf.writestr(u"\xf6.txt", "Test for unicode filename")
680            self.assertIsInstance(zf.infolist()[0].filename, unicode)
681
682        with zipfile.ZipFile(TESTFN, "r") as zf:
683            self.assertEqual(zf.filelist[0].filename, "foo.txt")
684            self.assertEqual(zf.filelist[1].filename, u"\xf6.txt")
685
686    def test_create_non_existent_file_for_append(self):
687        if os.path.exists(TESTFN):
688            os.unlink(TESTFN)
689
690        filename = 'testfile.txt'
691        content = 'hello, world. this is some content.'
692
693        try:
694            with zipfile.ZipFile(TESTFN, 'a') as zf:
695                zf.writestr(filename, content)
696        except IOError:
697            self.fail('Could not append data to a non-existent zip file.')
698
699        self.assertTrue(os.path.exists(TESTFN))
700
701        with zipfile.ZipFile(TESTFN, 'r') as zf:
702            self.assertEqual(zf.read(filename), content)
703
704    def test_close_erroneous_file(self):
705        # This test checks that the ZipFile constructor closes the file object
706        # it opens if there's an error in the file.  If it doesn't, the
707        # traceback holds a reference to the ZipFile object and, indirectly,
708        # the file object.
709        # On Windows, this causes the os.unlink() call to fail because the
710        # underlying file is still open.  This is SF bug #412214.
711        #
712        with open(TESTFN, "w") as fp:
713            fp.write("this is not a legal zip file\n")
714        try:
715            zf = zipfile.ZipFile(TESTFN)
716        except zipfile.BadZipfile:
717            pass
718
719    def test_is_zip_erroneous_file(self):
720        """Check that is_zipfile() correctly identifies non-zip files."""
721        # - passing a filename
722        with open(TESTFN, "w") as fp:
723            fp.write("this is not a legal zip file\n")
724        chk = zipfile.is_zipfile(TESTFN)
725        self.assertFalse(chk)
726        # - passing a file object
727        with open(TESTFN, "rb") as fp:
728            chk = zipfile.is_zipfile(fp)
729            self.assertTrue(not chk)
730        # - passing a file-like object
731        fp = StringIO()
732        fp.write("this is not a legal zip file\n")
733        chk = zipfile.is_zipfile(fp)
734        self.assertTrue(not chk)
735        fp.seek(0, 0)
736        chk = zipfile.is_zipfile(fp)
737        self.assertTrue(not chk)
738
739    def test_is_zip_valid_file(self):
740        """Check that is_zipfile() correctly identifies zip files."""
741        # - passing a filename
742        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
743            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
744        chk = zipfile.is_zipfile(TESTFN)
745        self.assertTrue(chk)
746        # - passing a file object
747        with open(TESTFN, "rb") as fp:
748            chk = zipfile.is_zipfile(fp)
749            self.assertTrue(chk)
750            fp.seek(0, 0)
751            zip_contents = fp.read()
752        # - passing a file-like object
753        fp = StringIO()
754        fp.write(zip_contents)
755        chk = zipfile.is_zipfile(fp)
756        self.assertTrue(chk)
757        fp.seek(0, 0)
758        chk = zipfile.is_zipfile(fp)
759        self.assertTrue(chk)
760
761    def test_non_existent_file_raises_IOError(self):
762        # make sure we don't raise an AttributeError when a partially-constructed
763        # ZipFile instance is finalized; this tests for regression on SF tracker
764        # bug #403871.
765
766        # The bug we're testing for caused an AttributeError to be raised
767        # when a ZipFile instance was created for a file that did not
768        # exist; the .fp member was not initialized but was needed by the
769        # __del__() method.  Since the AttributeError is in the __del__(),
770        # it is ignored, but the user should be sufficiently annoyed by
771        # the message on the output that regression will be noticed
772        # quickly.
773        self.assertRaises(IOError, zipfile.ZipFile, TESTFN)
774
775    def test_empty_file_raises_BadZipFile(self):
776        with open(TESTFN, 'w') as f:
777            pass
778        self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN)
779
780        with open(TESTFN, 'w') as fp:
781            fp.write("short file")
782        self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN)
783
784    def test_closed_zip_raises_RuntimeError(self):
785        """Verify that testzip() doesn't swallow inappropriate exceptions."""
786        data = StringIO()
787        with zipfile.ZipFile(data, mode="w") as zipf:
788            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
789
790        # This is correct; calling .read on a closed ZipFile should throw
791        # a RuntimeError, and so should calling .testzip.  An earlier
792        # version of .testzip would swallow this exception (and any other)
793        # and report that the first file in the archive was corrupt.
794        self.assertRaises(RuntimeError, zipf.read, "foo.txt")
795        self.assertRaises(RuntimeError, zipf.open, "foo.txt")
796        self.assertRaises(RuntimeError, zipf.testzip)
797        self.assertRaises(RuntimeError, zipf.writestr, "bogus.txt", "bogus")
798        open(TESTFN, 'w').write('zipfile test data')
799        self.assertRaises(RuntimeError, zipf.write, TESTFN)
800
801    def test_bad_constructor_mode(self):
802        """Check that bad modes passed to ZipFile constructor are caught."""
803        self.assertRaises(RuntimeError, zipfile.ZipFile, TESTFN, "q")
804
805    def test_bad_open_mode(self):
806        """Check that bad modes passed to ZipFile.open are caught."""
807        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
808            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
809
810        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
811        # read the data to make sure the file is there
812            zipf.read("foo.txt")
813            self.assertRaises(RuntimeError, zipf.open, "foo.txt", "q")
814
815    def test_read0(self):
816        """Check that calling read(0) on a ZipExtFile object returns an empty
817        string and doesn't advance file pointer."""
818        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
819            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
820            # read the data to make sure the file is there
821            with zipf.open("foo.txt") as f:
822                for i in xrange(FIXEDTEST_SIZE):
823                    self.assertEqual(f.read(0), '')
824
825                self.assertEqual(f.read(), "O, for a Muse of Fire!")
826
827    def test_open_non_existent_item(self):
828        """Check that attempting to call open() for an item that doesn't
829        exist in the archive raises a RuntimeError."""
830        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
831            self.assertRaises(KeyError, zipf.open, "foo.txt", "r")
832
833    def test_bad_compression_mode(self):
834        """Check that bad compression methods passed to ZipFile.open are
835        caught."""
836        self.assertRaises(RuntimeError, zipfile.ZipFile, TESTFN, "w", -1)
837
838    def test_null_byte_in_filename(self):
839        """Check that a filename containing a null byte is properly
840        terminated."""
841        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
842            zipf.writestr("foo.txt\x00qqq", "O, for a Muse of Fire!")
843            self.assertEqual(zipf.namelist(), ['foo.txt'])
844
845    def test_struct_sizes(self):
846        """Check that ZIP internal structure sizes are calculated correctly."""
847        self.assertEqual(zipfile.sizeEndCentDir, 22)
848        self.assertEqual(zipfile.sizeCentralDir, 46)
849        self.assertEqual(zipfile.sizeEndCentDir64, 56)
850        self.assertEqual(zipfile.sizeEndCentDir64Locator, 20)
851
852    def test_comments(self):
853        """Check that comments on the archive are handled properly."""
854
855        # check default comment is empty
856        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
857            self.assertEqual(zipf.comment, '')
858            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
859
860        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
861            self.assertEqual(zipf.comment, '')
862
863        # check a simple short comment
864        comment = 'Bravely taking to his feet, he beat a very brave retreat.'
865        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
866            zipf.comment = comment
867            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
868        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
869            self.assertEqual(zipf.comment, comment)
870
871        # check a comment of max length
872        comment2 = ''.join(['%d' % (i**3 % 10) for i in xrange((1 << 16)-1)])
873        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
874            zipf.comment = comment2
875            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
876
877        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
878            self.assertEqual(zipf.comment, comment2)
879
880        # check a comment that is too long is truncated
881        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
882            zipf.comment = comment2 + 'oops'
883            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
884        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
885            self.assertEqual(zipf.comment, comment2)
886
887    def check_testzip_with_bad_crc(self, compression):
888        """Tests that files with bad CRCs return their name from testzip."""
889        zipdata = self.zips_with_bad_crc[compression]
890
891        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
892            # testzip returns the name of the first corrupt file, or None
893            self.assertEqual('afile', zipf.testzip())
894
895    def test_testzip_with_bad_crc_stored(self):
896        self.check_testzip_with_bad_crc(zipfile.ZIP_STORED)
897
898    @skipUnless(zlib, "requires zlib")
899    def test_testzip_with_bad_crc_deflated(self):
900        self.check_testzip_with_bad_crc(zipfile.ZIP_DEFLATED)
901
902    def check_read_with_bad_crc(self, compression):
903        """Tests that files with bad CRCs raise a BadZipfile exception when read."""
904        zipdata = self.zips_with_bad_crc[compression]
905
906        # Using ZipFile.read()
907        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
908            self.assertRaises(zipfile.BadZipfile, zipf.read, 'afile')
909
910        # Using ZipExtFile.read()
911        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
912            with zipf.open('afile', 'r') as corrupt_file:
913                self.assertRaises(zipfile.BadZipfile, corrupt_file.read)
914
915        # Same with small reads (in order to exercise the buffering logic)
916        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
917            with zipf.open('afile', 'r') as corrupt_file:
918                corrupt_file.MIN_READ_SIZE = 2
919                with self.assertRaises(zipfile.BadZipfile):
920                    while corrupt_file.read(2):
921                        pass
922
923    def test_read_with_bad_crc_stored(self):
924        self.check_read_with_bad_crc(zipfile.ZIP_STORED)
925
926    @skipUnless(zlib, "requires zlib")
927    def test_read_with_bad_crc_deflated(self):
928        self.check_read_with_bad_crc(zipfile.ZIP_DEFLATED)
929
930    def check_read_return_size(self, compression):
931        # Issue #9837: ZipExtFile.read() shouldn't return more bytes
932        # than requested.
933        for test_size in (1, 4095, 4096, 4097, 16384):
934            file_size = test_size + 1
935            junk = b''.join(struct.pack('B', randint(0, 255))
936                            for x in range(file_size))
937            with zipfile.ZipFile(io.BytesIO(), "w", compression) as zipf:
938                zipf.writestr('foo', junk)
939                with zipf.open('foo', 'r') as fp:
940                    buf = fp.read(test_size)
941                    self.assertEqual(len(buf), test_size)
942
943    def test_read_return_size_stored(self):
944        self.check_read_return_size(zipfile.ZIP_STORED)
945
946    @skipUnless(zlib, "requires zlib")
947    def test_read_return_size_deflated(self):
948        self.check_read_return_size(zipfile.ZIP_DEFLATED)
949
950    def test_empty_zipfile(self):
951        # Check that creating a file in 'w' or 'a' mode and closing without
952        # adding any files to the archives creates a valid empty ZIP file
953        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
954            pass
955        try:
956            zipf = zipfile.ZipFile(TESTFN, mode="r")
957        except zipfile.BadZipfile:
958            self.fail("Unable to create empty ZIP file in 'w' mode")
959
960        with zipfile.ZipFile(TESTFN, mode="a") as zipf:
961            pass
962        try:
963            zipf = zipfile.ZipFile(TESTFN, mode="r")
964        except:
965            self.fail("Unable to create empty ZIP file in 'a' mode")
966
967    def test_open_empty_file(self):
968        # Issue 1710703: Check that opening a file with less than 22 bytes
969        # raises a BadZipfile exception (rather than the previously unhelpful
970        # IOError)
971        with open(TESTFN, 'w') as f:
972            pass
973        self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN, 'r')
974
975    def tearDown(self):
976        unlink(TESTFN)
977        unlink(TESTFN2)
978
979
980class DecryptionTests(unittest.TestCase):
981    """Check that ZIP decryption works. Since the library does not
982    support encryption at the moment, we use a pre-generated encrypted
983    ZIP file."""
984
985    data = (
986    'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00'
987    '\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y'
988    '\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl'
989    'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00'
990    '\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81'
991    '\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00'
992    '\x00\x00L\x00\x00\x00\x00\x00' )
993    data2 = (
994    'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02'
995    '\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04'
996    '\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0'
997    'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03'
998    '\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00'
999    '\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze'
1000    'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01'
1001    '\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' )
1002
1003    plain = 'zipfile.py encryption test'
1004    plain2 = '\x00'*512
1005
1006    def setUp(self):
1007        with open(TESTFN, "wb") as fp:
1008            fp.write(self.data)
1009        self.zip = zipfile.ZipFile(TESTFN, "r")
1010        with open(TESTFN2, "wb") as fp:
1011            fp.write(self.data2)
1012        self.zip2 = zipfile.ZipFile(TESTFN2, "r")
1013
1014    def tearDown(self):
1015        self.zip.close()
1016        os.unlink(TESTFN)
1017        self.zip2.close()
1018        os.unlink(TESTFN2)
1019
1020    def test_no_password(self):
1021        # Reading the encrypted file without password
1022        # must generate a RunTime exception
1023        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
1024        self.assertRaises(RuntimeError, self.zip2.read, "zero")
1025
1026    def test_bad_password(self):
1027        self.zip.setpassword("perl")
1028        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
1029        self.zip2.setpassword("perl")
1030        self.assertRaises(RuntimeError, self.zip2.read, "zero")
1031
1032    @skipUnless(zlib, "requires zlib")
1033    def test_good_password(self):
1034        self.zip.setpassword("python")
1035        self.assertEqual(self.zip.read("test.txt"), self.plain)
1036        self.zip2.setpassword("12345")
1037        self.assertEqual(self.zip2.read("zero"), self.plain2)
1038
1039
1040class TestsWithRandomBinaryFiles(unittest.TestCase):
1041    def setUp(self):
1042        datacount = randint(16, 64)*1024 + randint(1, 1024)
1043        self.data = ''.join(struct.pack('<f', random()*randint(-1000, 1000))
1044                            for i in xrange(datacount))
1045
1046        # Make a source file with some lines
1047        with open(TESTFN, "wb") as fp:
1048            fp.write(self.data)
1049
1050    def tearDown(self):
1051        unlink(TESTFN)
1052        unlink(TESTFN2)
1053
1054    def make_test_archive(self, f, compression):
1055        # Create the ZIP archive
1056        with zipfile.ZipFile(f, "w", compression) as zipfp:
1057            zipfp.write(TESTFN, "another.name")
1058            zipfp.write(TESTFN, TESTFN)
1059
1060    def zip_test(self, f, compression):
1061        self.make_test_archive(f, compression)
1062
1063        # Read the ZIP archive
1064        with zipfile.ZipFile(f, "r", compression) as zipfp:
1065            testdata = zipfp.read(TESTFN)
1066            self.assertEqual(len(testdata), len(self.data))
1067            self.assertEqual(testdata, self.data)
1068            self.assertEqual(zipfp.read("another.name"), self.data)
1069
1070    def test_stored(self):
1071        for f in (TESTFN2, TemporaryFile(), StringIO()):
1072            self.zip_test(f, zipfile.ZIP_STORED)
1073
1074    @skipUnless(zlib, "requires zlib")
1075    def test_deflated(self):
1076        for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
1077            self.zip_test(f, zipfile.ZIP_DEFLATED)
1078
1079    def zip_open_test(self, f, compression):
1080        self.make_test_archive(f, compression)
1081
1082        # Read the ZIP archive
1083        with zipfile.ZipFile(f, "r", compression) as zipfp:
1084            zipdata1 = []
1085            with zipfp.open(TESTFN) as zipopen1:
1086                while True:
1087                    read_data = zipopen1.read(256)
1088                    if not read_data:
1089                        break
1090                    zipdata1.append(read_data)
1091
1092            zipdata2 = []
1093            with zipfp.open("another.name") as zipopen2:
1094                while True:
1095                    read_data = zipopen2.read(256)
1096                    if not read_data:
1097                        break
1098                    zipdata2.append(read_data)
1099
1100            testdata1 = ''.join(zipdata1)
1101            self.assertEqual(len(testdata1), len(self.data))
1102            self.assertEqual(testdata1, self.data)
1103
1104            testdata2 = ''.join(zipdata2)
1105            self.assertEqual(len(testdata2), len(self.data))
1106            self.assertEqual(testdata2, self.data)
1107
1108    def test_open_stored(self):
1109        for f in (TESTFN2, TemporaryFile(), StringIO()):
1110            self.zip_open_test(f, zipfile.ZIP_STORED)
1111
1112    @skipUnless(zlib, "requires zlib")
1113    def test_open_deflated(self):
1114        for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
1115            self.zip_open_test(f, zipfile.ZIP_DEFLATED)
1116
1117    def zip_random_open_test(self, f, compression):
1118        self.make_test_archive(f, compression)
1119
1120        # Read the ZIP archive
1121        with zipfile.ZipFile(f, "r", compression) as zipfp:
1122            zipdata1 = []
1123            with zipfp.open(TESTFN) as zipopen1:
1124                while True:
1125                    read_data = zipopen1.read(randint(1, 1024))
1126                    if not read_data:
1127                        break
1128                    zipdata1.append(read_data)
1129
1130            testdata = ''.join(zipdata1)
1131            self.assertEqual(len(testdata), len(self.data))
1132            self.assertEqual(testdata, self.data)
1133
1134    def test_random_open_stored(self):
1135        for f in (TESTFN2, TemporaryFile(), StringIO()):
1136            self.zip_random_open_test(f, zipfile.ZIP_STORED)
1137
1138    @skipUnless(zlib, "requires zlib")
1139    def test_random_open_deflated(self):
1140        for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
1141            self.zip_random_open_test(f, zipfile.ZIP_DEFLATED)
1142
1143
1144@skipUnless(zlib, "requires zlib")
1145class TestsWithMultipleOpens(unittest.TestCase):
1146    def setUp(self):
1147        # Create the ZIP archive
1148        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp:
1149            zipfp.writestr('ones', '1'*FIXEDTEST_SIZE)
1150            zipfp.writestr('twos', '2'*FIXEDTEST_SIZE)
1151
1152    def test_same_file(self):
1153        # Verify that (when the ZipFile is in control of creating file objects)
1154        # multiple open() calls can be made without interfering with each other.
1155        with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
1156            zopen1 = zipf.open('ones')
1157            zopen2 = zipf.open('ones')
1158            data1 = zopen1.read(500)
1159            data2 = zopen2.read(500)
1160            data1 += zopen1.read(500)
1161            data2 += zopen2.read(500)
1162            self.assertEqual(data1, data2)
1163
1164    def test_different_file(self):
1165        # Verify that (when the ZipFile is in control of creating file objects)
1166        # multiple open() calls can be made without interfering with each other.
1167        with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
1168            with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
1169                data1 = zopen1.read(500)
1170                data2 = zopen2.read(500)
1171                data1 += zopen1.read(500)
1172                data2 += zopen2.read(500)
1173            self.assertEqual(data1, '1'*FIXEDTEST_SIZE)
1174            self.assertEqual(data2, '2'*FIXEDTEST_SIZE)
1175
1176    def test_interleaved(self):
1177        # Verify that (when the ZipFile is in control of creating file objects)
1178        # multiple open() calls can be made without interfering with each other.
1179        with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
1180            with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
1181                data1 = zopen1.read(500)
1182                data2 = zopen2.read(500)
1183                data1 += zopen1.read(500)
1184                data2 += zopen2.read(500)
1185            self.assertEqual(data1, '1'*FIXEDTEST_SIZE)
1186            self.assertEqual(data2, '2'*FIXEDTEST_SIZE)
1187
1188    def tearDown(self):
1189        unlink(TESTFN2)
1190
1191
1192class TestWithDirectory(unittest.TestCase):
1193    def setUp(self):
1194        os.mkdir(TESTFN2)
1195
1196    def test_extract_dir(self):
1197        with zipfile.ZipFile(findfile("zipdir.zip")) as zipf:
1198            zipf.extractall(TESTFN2)
1199        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a")))
1200        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b")))
1201        self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c")))
1202
1203    def test_bug_6050(self):
1204        # Extraction should succeed if directories already exist
1205        os.mkdir(os.path.join(TESTFN2, "a"))
1206        self.test_extract_dir()
1207
1208    def test_store_dir(self):
1209        os.mkdir(os.path.join(TESTFN2, "x"))
1210        zipf = zipfile.ZipFile(TESTFN, "w")
1211        zipf.write(os.path.join(TESTFN2, "x"), "x")
1212        self.assertTrue(zipf.filelist[0].filename.endswith("x/"))
1213
1214    def tearDown(self):
1215        shutil.rmtree(TESTFN2)
1216        if os.path.exists(TESTFN):
1217            unlink(TESTFN)
1218
1219
1220class UniversalNewlineTests(unittest.TestCase):
1221    def setUp(self):
1222        self.line_gen = ["Test of zipfile line %d." % i
1223                         for i in xrange(FIXEDTEST_SIZE)]
1224        self.seps = ('\r', '\r\n', '\n')
1225        self.arcdata, self.arcfiles = {}, {}
1226        for n, s in enumerate(self.seps):
1227            self.arcdata[s] = s.join(self.line_gen) + s
1228            self.arcfiles[s] = '%s-%d' % (TESTFN, n)
1229            open(self.arcfiles[s], "wb").write(self.arcdata[s])
1230
1231    def make_test_archive(self, f, compression):
1232        # Create the ZIP archive
1233        with zipfile.ZipFile(f, "w", compression) as zipfp:
1234            for fn in self.arcfiles.values():
1235                zipfp.write(fn, fn)
1236
1237    def read_test(self, f, compression):
1238        self.make_test_archive(f, compression)
1239
1240        # Read the ZIP archive
1241        with zipfile.ZipFile(f, "r") as zipfp:
1242            for sep, fn in self.arcfiles.items():
1243                with zipfp.open(fn, "rU") as fp:
1244                    zipdata = fp.read()
1245                self.assertEqual(self.arcdata[sep], zipdata)
1246
1247    def readline_read_test(self, f, compression):
1248        self.make_test_archive(f, compression)
1249
1250        # Read the ZIP archive
1251        zipfp = zipfile.ZipFile(f, "r")
1252        for sep, fn in self.arcfiles.items():
1253            with zipfp.open(fn, "rU") as zipopen:
1254                data = ''
1255                while True:
1256                    read = zipopen.readline()
1257                    if not read:
1258                        break
1259                    data += read
1260
1261                    read = zipopen.read(5)
1262                    if not read:
1263                        break
1264                    data += read
1265
1266            self.assertEqual(data, self.arcdata['\n'])
1267
1268        zipfp.close()
1269
1270    def readline_test(self, f, compression):
1271        self.make_test_archive(f, compression)
1272
1273        # Read the ZIP archive
1274        with zipfile.ZipFile(f, "r") as zipfp:
1275            for sep, fn in self.arcfiles.items():
1276                with zipfp.open(fn, "rU") as zipopen:
1277                    for line in self.line_gen:
1278                        linedata = zipopen.readline()
1279                        self.assertEqual(linedata, line + '\n')
1280
1281    def readlines_test(self, f, compression):
1282        self.make_test_archive(f, compression)
1283
1284        # Read the ZIP archive
1285        with zipfile.ZipFile(f, "r") as zipfp:
1286            for sep, fn in self.arcfiles.items():
1287                with zipfp.open(fn, "rU") as fp:
1288                    ziplines = fp.readlines()
1289                for line, zipline in zip(self.line_gen, ziplines):
1290                    self.assertEqual(zipline, line + '\n')
1291
1292    def iterlines_test(self, f, compression):
1293        self.make_test_archive(f, compression)
1294
1295        # Read the ZIP archive
1296        with zipfile.ZipFile(f, "r") as zipfp:
1297            for sep, fn in self.arcfiles.items():
1298                for line, zipline in zip(self.line_gen, zipfp.open(fn, "rU")):
1299                    self.assertEqual(zipline, line + '\n')
1300
1301    def test_read_stored(self):
1302        for f in (TESTFN2, TemporaryFile(), StringIO()):
1303            self.read_test(f, zipfile.ZIP_STORED)
1304
1305    def test_readline_read_stored(self):
1306        # Issue #7610: calls to readline() interleaved with calls to read().
1307        for f in (TESTFN2, TemporaryFile(), StringIO()):
1308            self.readline_read_test(f, zipfile.ZIP_STORED)
1309
1310    def test_readline_stored(self):
1311        for f in (TESTFN2, TemporaryFile(), StringIO()):
1312            self.readline_test(f, zipfile.ZIP_STORED)
1313
1314    def test_readlines_stored(self):
1315        for f in (TESTFN2, TemporaryFile(), StringIO()):
1316            self.readlines_test(f, zipfile.ZIP_STORED)
1317
1318    def test_iterlines_stored(self):
1319        for f in (TESTFN2, TemporaryFile(), StringIO()):
1320            self.iterlines_test(f, zipfile.ZIP_STORED)
1321
1322    @skipUnless(zlib, "requires zlib")
1323    def test_read_deflated(self):
1324        for f in (TESTFN2, TemporaryFile(), StringIO()):
1325            self.read_test(f, zipfile.ZIP_DEFLATED)
1326
1327    @skipUnless(zlib, "requires zlib")
1328    def test_readline_read_deflated(self):
1329        # Issue #7610: calls to readline() interleaved with calls to read().
1330        for f in (TESTFN2, TemporaryFile(), StringIO()):
1331            self.readline_read_test(f, zipfile.ZIP_DEFLATED)
1332
1333    @skipUnless(zlib, "requires zlib")
1334    def test_readline_deflated(self):
1335        for f in (TESTFN2, TemporaryFile(), StringIO()):
1336            self.readline_test(f, zipfile.ZIP_DEFLATED)
1337
1338    @skipUnless(zlib, "requires zlib")
1339    def test_readlines_deflated(self):
1340        for f in (TESTFN2, TemporaryFile(), StringIO()):
1341            self.readlines_test(f, zipfile.ZIP_DEFLATED)
1342
1343    @skipUnless(zlib, "requires zlib")
1344    def test_iterlines_deflated(self):
1345        for f in (TESTFN2, TemporaryFile(), StringIO()):
1346            self.iterlines_test(f, zipfile.ZIP_DEFLATED)
1347
1348    def tearDown(self):
1349        for sep, fn in self.arcfiles.items():
1350            os.remove(fn)
1351        unlink(TESTFN)
1352        unlink(TESTFN2)
1353
1354
1355def test_main():
1356    run_unittest(TestsWithSourceFile, TestZip64InSmallFiles, OtherTests,
1357                 PyZipFileTests, DecryptionTests, TestsWithMultipleOpens,
1358                 TestWithDirectory, UniversalNewlineTests,
1359                 TestsWithRandomBinaryFiles)
1360
1361if __name__ == "__main__":
1362    test_main()
1363