1# We can test part of the module without zlib.
2try:
3    import zlib
4except ImportError:
5    zlib = None
6
7import os
8import io
9import sys
10import time
11import struct
12import zipfile
13import unittest
14
15from StringIO import StringIO
16from tempfile import TemporaryFile
17from random import randint, random, getrandbits
18from unittest import skipUnless
19
20from test import script_helper
21from test.test_support import TESTFN, TESTFN_UNICODE, TESTFN_ENCODING, \
22                              run_unittest, findfile, unlink, rmtree, \
23                              check_warnings, captured_stdout
24try:
25    TESTFN_UNICODE.encode(TESTFN_ENCODING)
26except (UnicodeError, TypeError):
27    # Either the file system encoding is None, or the file name
28    # cannot be encoded in the file system encoding.
29    TESTFN_UNICODE = None
30
31TESTFN2 = TESTFN + "2"
32TESTFNDIR = TESTFN + "d"
33FIXEDTEST_SIZE = 1000
34
35SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'),
36                   ('ziptest2dir/_ziptest2', 'qawsedrftg'),
37                   ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'),
38                   ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')]
39
40def getrandbytes(size):
41    return bytes(bytearray.fromhex('%0*x' % (2 * size, getrandbits(8 * size))))
42
43class TestsWithSourceFile(unittest.TestCase):
44    def setUp(self):
45        self.line_gen = ["Zipfile test line %d. random float: %f" % (i, random())
46                         for i in xrange(FIXEDTEST_SIZE)]
47        self.data = '\n'.join(self.line_gen) + '\n'
48
49        # Make a source file with some lines
50        with open(TESTFN, "wb") as fp:
51            fp.write(self.data)
52
53    def make_test_archive(self, f, compression):
54        # Create the ZIP archive
55        with zipfile.ZipFile(f, "w", compression) as zipfp:
56            zipfp.write(TESTFN, "another.name")
57            zipfp.write(TESTFN, TESTFN)
58            zipfp.writestr("strfile", self.data)
59
60    def zip_test(self, f, compression):
61        self.make_test_archive(f, compression)
62
63        # Read the ZIP archive
64        with zipfile.ZipFile(f, "r", compression) as zipfp:
65            self.assertEqual(zipfp.read(TESTFN), self.data)
66            self.assertEqual(zipfp.read("another.name"), self.data)
67            self.assertEqual(zipfp.read("strfile"), self.data)
68
69            # Print the ZIP directory
70            fp = StringIO()
71            stdout = sys.stdout
72            try:
73                sys.stdout = fp
74                zipfp.printdir()
75            finally:
76                sys.stdout = stdout
77
78            directory = fp.getvalue()
79            lines = directory.splitlines()
80            self.assertEqual(len(lines), 4) # Number of files + header
81
82            self.assertIn('File Name', lines[0])
83            self.assertIn('Modified', lines[0])
84            self.assertIn('Size', lines[0])
85
86            fn, date, time_, size = lines[1].split()
87            self.assertEqual(fn, 'another.name')
88            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
89            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
90            self.assertEqual(size, str(len(self.data)))
91
92            # Check the namelist
93            names = zipfp.namelist()
94            self.assertEqual(len(names), 3)
95            self.assertIn(TESTFN, names)
96            self.assertIn("another.name", names)
97            self.assertIn("strfile", names)
98
99            # Check infolist
100            infos = zipfp.infolist()
101            names = [i.filename for i in infos]
102            self.assertEqual(len(names), 3)
103            self.assertIn(TESTFN, names)
104            self.assertIn("another.name", names)
105            self.assertIn("strfile", names)
106            for i in infos:
107                self.assertEqual(i.file_size, len(self.data))
108
109            # check getinfo
110            for nm in (TESTFN, "another.name", "strfile"):
111                info = zipfp.getinfo(nm)
112                self.assertEqual(info.filename, nm)
113                self.assertEqual(info.file_size, len(self.data))
114
115            # Check that testzip doesn't raise an exception
116            zipfp.testzip()
117
118    def test_stored(self):
119        for f in (TESTFN2, TemporaryFile(), StringIO()):
120            self.zip_test(f, zipfile.ZIP_STORED)
121
122    def zip_open_test(self, f, compression):
123        self.make_test_archive(f, compression)
124
125        # Read the ZIP archive
126        with zipfile.ZipFile(f, "r", compression) as zipfp:
127            zipdata1 = []
128            with zipfp.open(TESTFN) as zipopen1:
129                while True:
130                    read_data = zipopen1.read(256)
131                    if not read_data:
132                        break
133                    zipdata1.append(read_data)
134
135            zipdata2 = []
136            with zipfp.open("another.name") as zipopen2:
137                while True:
138                    read_data = zipopen2.read(256)
139                    if not read_data:
140                        break
141                    zipdata2.append(read_data)
142
143            self.assertEqual(''.join(zipdata1), self.data)
144            self.assertEqual(''.join(zipdata2), self.data)
145
146    def test_open_stored(self):
147        for f in (TESTFN2, TemporaryFile(), StringIO()):
148            self.zip_open_test(f, zipfile.ZIP_STORED)
149
150    def test_open_via_zip_info(self):
151        # Create the ZIP archive
152        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
153            zipfp.writestr("name", "foo")
154            with check_warnings(('', UserWarning)):
155                zipfp.writestr("name", "bar")
156            self.assertEqual(zipfp.namelist(), ["name"] * 2)
157
158        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
159            infos = zipfp.infolist()
160            data = ""
161            for info in infos:
162                with zipfp.open(info) as f:
163                    data += f.read()
164            self.assertTrue(data == "foobar" or data == "barfoo")
165            data = ""
166            for info in infos:
167                data += zipfp.read(info)
168            self.assertTrue(data == "foobar" or data == "barfoo")
169
170    def zip_random_open_test(self, f, compression):
171        self.make_test_archive(f, compression)
172
173        # Read the ZIP archive
174        with zipfile.ZipFile(f, "r", compression) as zipfp:
175            zipdata1 = []
176            with zipfp.open(TESTFN) as zipopen1:
177                while True:
178                    read_data = zipopen1.read(randint(1, 1024))
179                    if not read_data:
180                        break
181                    zipdata1.append(read_data)
182
183            self.assertEqual(''.join(zipdata1), self.data)
184
185    def test_random_open_stored(self):
186        for f in (TESTFN2, TemporaryFile(), StringIO()):
187            self.zip_random_open_test(f, zipfile.ZIP_STORED)
188
189    def test_universal_readaheads(self):
190        f = StringIO()
191
192        data = 'a\r\n' * 16 * 1024
193        with zipfile.ZipFile(f, 'w', zipfile.ZIP_STORED) as zipfp:
194            zipfp.writestr(TESTFN, data)
195
196        data2 = ''
197        with zipfile.ZipFile(f, 'r') as zipfp:
198            with zipfp.open(TESTFN, 'rU') as zipopen:
199                for line in zipopen:
200                    data2 += line
201
202        self.assertEqual(data, data2.replace('\n', '\r\n'))
203
204    def zip_readline_read_test(self, f, compression):
205        self.make_test_archive(f, compression)
206
207        # Read the ZIP archive
208        with zipfile.ZipFile(f, "r") as zipfp:
209            with zipfp.open(TESTFN) as zipopen:
210                data = ''
211                while True:
212                    read = zipopen.readline()
213                    if not read:
214                        break
215                    data += read
216
217                    read = zipopen.read(100)
218                    if not read:
219                        break
220                    data += read
221
222        self.assertEqual(data, self.data)
223
224    def zip_readline_test(self, f, compression):
225        self.make_test_archive(f, compression)
226
227        # Read the ZIP archive
228        with zipfile.ZipFile(f, "r") as zipfp:
229            with zipfp.open(TESTFN) as zipopen:
230                for line in self.line_gen:
231                    linedata = zipopen.readline()
232                    self.assertEqual(linedata, line + '\n')
233
234    def zip_readlines_test(self, f, compression):
235        self.make_test_archive(f, compression)
236
237        # Read the ZIP archive
238        with zipfile.ZipFile(f, "r") as zipfp:
239            with zipfp.open(TESTFN) as zo:
240                ziplines = zo.readlines()
241                for line, zipline in zip(self.line_gen, ziplines):
242                    self.assertEqual(zipline, line + '\n')
243
244    def zip_iterlines_test(self, f, compression):
245        self.make_test_archive(f, compression)
246
247        # Read the ZIP archive
248        with zipfile.ZipFile(f, "r") as zipfp:
249            for line, zipline in zip(self.line_gen, zipfp.open(TESTFN)):
250                self.assertEqual(zipline, line + '\n')
251
252    def test_readline_read_stored(self):
253        # Issue #7610: calls to readline() interleaved with calls to read().
254        for f in (TESTFN2, TemporaryFile(), StringIO()):
255            self.zip_readline_read_test(f, zipfile.ZIP_STORED)
256
257    def test_readline_stored(self):
258        for f in (TESTFN2, TemporaryFile(), StringIO()):
259            self.zip_readline_test(f, zipfile.ZIP_STORED)
260
261    def test_readlines_stored(self):
262        for f in (TESTFN2, TemporaryFile(), StringIO()):
263            self.zip_readlines_test(f, zipfile.ZIP_STORED)
264
265    def test_iterlines_stored(self):
266        for f in (TESTFN2, TemporaryFile(), StringIO()):
267            self.zip_iterlines_test(f, zipfile.ZIP_STORED)
268
269    @skipUnless(zlib, "requires zlib")
270    def test_deflated(self):
271        for f in (TESTFN2, TemporaryFile(), StringIO()):
272            self.zip_test(f, zipfile.ZIP_DEFLATED)
273
274    @skipUnless(zlib, "requires zlib")
275    def test_open_deflated(self):
276        for f in (TESTFN2, TemporaryFile(), StringIO()):
277            self.zip_open_test(f, zipfile.ZIP_DEFLATED)
278
279    @skipUnless(zlib, "requires zlib")
280    def test_random_open_deflated(self):
281        for f in (TESTFN2, TemporaryFile(), StringIO()):
282            self.zip_random_open_test(f, zipfile.ZIP_DEFLATED)
283
284    @skipUnless(zlib, "requires zlib")
285    def test_readline_read_deflated(self):
286        # Issue #7610: calls to readline() interleaved with calls to read().
287        for f in (TESTFN2, TemporaryFile(), StringIO()):
288            self.zip_readline_read_test(f, zipfile.ZIP_DEFLATED)
289
290    @skipUnless(zlib, "requires zlib")
291    def test_readline_deflated(self):
292        for f in (TESTFN2, TemporaryFile(), StringIO()):
293            self.zip_readline_test(f, zipfile.ZIP_DEFLATED)
294
295    @skipUnless(zlib, "requires zlib")
296    def test_readlines_deflated(self):
297        for f in (TESTFN2, TemporaryFile(), StringIO()):
298            self.zip_readlines_test(f, zipfile.ZIP_DEFLATED)
299
300    @skipUnless(zlib, "requires zlib")
301    def test_iterlines_deflated(self):
302        for f in (TESTFN2, TemporaryFile(), StringIO()):
303            self.zip_iterlines_test(f, zipfile.ZIP_DEFLATED)
304
305    @skipUnless(zlib, "requires zlib")
306    def test_low_compression(self):
307        """Check for cases where compressed data is larger than original."""
308        # Create the ZIP archive
309        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp:
310            zipfp.writestr("strfile", '12')
311
312        # Get an open object for strfile
313        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED) as zipfp:
314            with zipfp.open("strfile") as openobj:
315                self.assertEqual(openobj.read(1), '1')
316                self.assertEqual(openobj.read(1), '2')
317
318    def test_absolute_arcnames(self):
319        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
320            zipfp.write(TESTFN, "/absolute")
321
322        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
323            self.assertEqual(zipfp.namelist(), ["absolute"])
324
325    def test_append_to_zip_file(self):
326        """Test appending to an existing zipfile."""
327        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
328            zipfp.write(TESTFN, TESTFN)
329
330        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
331            zipfp.writestr("strfile", self.data)
332            self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"])
333
334    def test_append_to_non_zip_file(self):
335        """Test appending to an existing file that is not a zipfile."""
336        # NOTE: this test fails if len(d) < 22 because of the first
337        # line "fpin.seek(-22, 2)" in _EndRecData
338        data = 'I am not a ZipFile!'*10
339        with open(TESTFN2, 'wb') as f:
340            f.write(data)
341
342        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
343            zipfp.write(TESTFN, TESTFN)
344
345        with open(TESTFN2, 'rb') as f:
346            f.seek(len(data))
347            with zipfile.ZipFile(f, "r") as zipfp:
348                self.assertEqual(zipfp.namelist(), [TESTFN])
349                self.assertEqual(zipfp.read(TESTFN), self.data)
350        with open(TESTFN2, 'rb') as f:
351            self.assertEqual(f.read(len(data)), data)
352            zipfiledata = f.read()
353        with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
354            self.assertEqual(zipfp.namelist(), [TESTFN])
355            self.assertEqual(zipfp.read(TESTFN), self.data)
356
357    def test_read_concatenated_zip_file(self):
358        with io.BytesIO() as bio:
359            with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
360                zipfp.write(TESTFN, TESTFN)
361            zipfiledata = bio.getvalue()
362        data = b'I am not a ZipFile!'*10
363        with open(TESTFN2, 'wb') as f:
364            f.write(data)
365            f.write(zipfiledata)
366
367        with zipfile.ZipFile(TESTFN2) as zipfp:
368            self.assertEqual(zipfp.namelist(), [TESTFN])
369            self.assertEqual(zipfp.read(TESTFN), self.data)
370
371    def test_append_to_concatenated_zip_file(self):
372        with io.BytesIO() as bio:
373            with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
374                zipfp.write(TESTFN, TESTFN)
375            zipfiledata = bio.getvalue()
376        data = b'I am not a ZipFile!'*1000000
377        with open(TESTFN2, 'wb') as f:
378            f.write(data)
379            f.write(zipfiledata)
380
381        with zipfile.ZipFile(TESTFN2, 'a') as zipfp:
382            self.assertEqual(zipfp.namelist(), [TESTFN])
383            zipfp.writestr('strfile', self.data)
384
385        with open(TESTFN2, 'rb') as f:
386            self.assertEqual(f.read(len(data)), data)
387            zipfiledata = f.read()
388        with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
389            self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile'])
390            self.assertEqual(zipfp.read(TESTFN), self.data)
391            self.assertEqual(zipfp.read('strfile'), self.data)
392
393    def test_ignores_newline_at_end(self):
394        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
395            zipfp.write(TESTFN, TESTFN)
396        with open(TESTFN2, 'a') as f:
397            f.write("\r\n\00\00\00")
398        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
399            self.assertIsInstance(zipfp, zipfile.ZipFile)
400
401    def test_ignores_stuff_appended_past_comments(self):
402        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
403            zipfp.comment = b"this is a comment"
404            zipfp.write(TESTFN, TESTFN)
405        with open(TESTFN2, 'a') as f:
406            f.write("abcdef\r\n")
407        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
408            self.assertIsInstance(zipfp, zipfile.ZipFile)
409            self.assertEqual(zipfp.comment, b"this is a comment")
410
411    def test_write_default_name(self):
412        """Check that calling ZipFile.write without arcname specified
413        produces the expected result."""
414        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
415            zipfp.write(TESTFN)
416            with open(TESTFN,'r') as fid:
417                self.assertEqual(zipfp.read(TESTFN), fid.read())
418
419    @skipUnless(zlib, "requires zlib")
420    def test_per_file_compression(self):
421        """Check that files within a Zip archive can have different
422        compression options."""
423        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
424            zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED)
425            zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED)
426            sinfo = zipfp.getinfo('storeme')
427            dinfo = zipfp.getinfo('deflateme')
428            self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED)
429            self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED)
430
431    def test_write_to_readonly(self):
432        """Check that trying to call write() on a readonly ZipFile object
433        raises a RuntimeError."""
434        with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
435            zipfp.writestr("somefile.txt", "bogus")
436
437        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
438            self.assertRaises(RuntimeError, zipfp.write, TESTFN)
439
440    def test_extract(self):
441        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
442            for fpath, fdata in SMALL_TEST_DATA:
443                zipfp.writestr(fpath, fdata)
444
445        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
446            for fpath, fdata in SMALL_TEST_DATA:
447                writtenfile = zipfp.extract(fpath)
448
449                # make sure it was written to the right place
450                correctfile = os.path.join(os.getcwd(), fpath)
451                correctfile = os.path.normpath(correctfile)
452
453                self.assertEqual(writtenfile, correctfile)
454
455                # make sure correct data is in correct file
456                with open(writtenfile, "rb") as fid:
457                    self.assertEqual(fdata, fid.read())
458                os.remove(writtenfile)
459
460        # remove the test file subdirectories
461        rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
462
463    def test_extract_all(self):
464        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
465            for fpath, fdata in SMALL_TEST_DATA:
466                zipfp.writestr(fpath, fdata)
467
468        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
469            zipfp.extractall()
470            for fpath, fdata in SMALL_TEST_DATA:
471                outfile = os.path.join(os.getcwd(), fpath)
472
473                with open(outfile, "rb") as fid:
474                    self.assertEqual(fdata, fid.read())
475                os.remove(outfile)
476
477        # remove the test file subdirectories
478        rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
479
480    def check_file(self, filename, content):
481        self.assertTrue(os.path.isfile(filename))
482        with open(filename, 'rb') as f:
483            self.assertEqual(f.read(), content)
484
485    @skipUnless(TESTFN_UNICODE, "No Unicode filesystem semantics on this platform.")
486    def test_extract_unicode_filenames(self):
487        fnames = [u'foo.txt', os.path.basename(TESTFN_UNICODE)]
488        content = 'Test for unicode filename'
489        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
490            for fname in fnames:
491                zipfp.writestr(fname, content)
492
493        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
494            for fname in fnames:
495                writtenfile = zipfp.extract(fname)
496
497                # make sure it was written to the right place
498                correctfile = os.path.join(os.getcwd(), fname)
499                correctfile = os.path.normpath(correctfile)
500                self.assertEqual(writtenfile, correctfile)
501
502                self.check_file(writtenfile, content)
503                os.remove(writtenfile)
504
505    def test_extract_hackers_arcnames(self):
506        hacknames = [
507            ('../foo/bar', 'foo/bar'),
508            ('foo/../bar', 'foo/bar'),
509            ('foo/../../bar', 'foo/bar'),
510            ('foo/bar/..', 'foo/bar'),
511            ('./../foo/bar', 'foo/bar'),
512            ('/foo/bar', 'foo/bar'),
513            ('/foo/../bar', 'foo/bar'),
514            ('/foo/../../bar', 'foo/bar'),
515        ]
516        if os.path.sep == '\\':
517            hacknames.extend([
518                (r'..\foo\bar', 'foo/bar'),
519                (r'..\/foo\/bar', 'foo/bar'),
520                (r'foo/\..\/bar', 'foo/bar'),
521                (r'foo\/../\bar', 'foo/bar'),
522                (r'C:foo/bar', 'foo/bar'),
523                (r'C:/foo/bar', 'foo/bar'),
524                (r'C://foo/bar', 'foo/bar'),
525                (r'C:\foo\bar', 'foo/bar'),
526                (r'//conky/mountpoint/foo/bar', 'foo/bar'),
527                (r'\\conky\mountpoint\foo\bar', 'foo/bar'),
528                (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
529                (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
530                (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
531                (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
532                (r'//?/C:/foo/bar', 'foo/bar'),
533                (r'\\?\C:\foo\bar', 'foo/bar'),
534                (r'C:/../C:/foo/bar', 'C_/foo/bar'),
535                (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'),
536                ('../../foo../../ba..r', 'foo/ba..r'),
537            ])
538        else:  # Unix
539            hacknames.extend([
540                ('//foo/bar', 'foo/bar'),
541                ('../../foo../../ba..r', 'foo../ba..r'),
542                (r'foo/..\bar', r'foo/..\bar'),
543            ])
544
545        for arcname, fixedname in hacknames:
546            content = b'foobar' + arcname.encode()
547            with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp:
548                zinfo = zipfile.ZipInfo()
549                # preserve backslashes
550                zinfo.filename = arcname
551                zinfo.external_attr = 0o600 << 16
552                zipfp.writestr(zinfo, content)
553
554            arcname = arcname.replace(os.sep, "/")
555            targetpath = os.path.join('target', 'subdir', 'subsub')
556            correctfile = os.path.join(targetpath, *fixedname.split('/'))
557
558            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
559                writtenfile = zipfp.extract(arcname, targetpath)
560                self.assertEqual(writtenfile, correctfile,
561                                 msg="extract %r" % arcname)
562            self.check_file(correctfile, content)
563            rmtree('target')
564
565            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
566                zipfp.extractall(targetpath)
567            self.check_file(correctfile, content)
568            rmtree('target')
569
570            correctfile = os.path.join(os.getcwd(), *fixedname.split('/'))
571
572            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
573                writtenfile = zipfp.extract(arcname)
574                self.assertEqual(writtenfile, correctfile,
575                                 msg="extract %r" % arcname)
576            self.check_file(correctfile, content)
577            rmtree(fixedname.split('/')[0])
578
579            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
580                zipfp.extractall()
581            self.check_file(correctfile, content)
582            rmtree(fixedname.split('/')[0])
583
584            os.remove(TESTFN2)
585
586    def test_writestr_compression(self):
587        zipfp = zipfile.ZipFile(TESTFN2, "w")
588        zipfp.writestr("a.txt", "hello world", compress_type=zipfile.ZIP_STORED)
589        if zlib:
590            zipfp.writestr("b.txt", "hello world", compress_type=zipfile.ZIP_DEFLATED)
591
592        info = zipfp.getinfo('a.txt')
593        self.assertEqual(info.compress_type, zipfile.ZIP_STORED)
594
595        if zlib:
596            info = zipfp.getinfo('b.txt')
597            self.assertEqual(info.compress_type, zipfile.ZIP_DEFLATED)
598
599
600    def zip_test_writestr_permissions(self, f, compression):
601        # Make sure that writestr creates files with mode 0600,
602        # when it is passed a name rather than a ZipInfo instance.
603
604        self.make_test_archive(f, compression)
605        with zipfile.ZipFile(f, "r") as zipfp:
606            zinfo = zipfp.getinfo('strfile')
607            self.assertEqual(zinfo.external_attr, 0600 << 16)
608
609    def test_writestr_permissions(self):
610        for f in (TESTFN2, TemporaryFile(), StringIO()):
611            self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED)
612
613    def test_close(self):
614        """Check that the zipfile is closed after the 'with' block."""
615        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
616            for fpath, fdata in SMALL_TEST_DATA:
617                zipfp.writestr(fpath, fdata)
618                self.assertTrue(zipfp.fp is not None, 'zipfp is not open')
619        self.assertTrue(zipfp.fp is None, 'zipfp is not closed')
620
621        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
622            self.assertTrue(zipfp.fp is not None, 'zipfp is not open')
623        self.assertTrue(zipfp.fp is None, 'zipfp is not closed')
624
625    def test_close_on_exception(self):
626        """Check that the zipfile is closed if an exception is raised in the
627        'with' block."""
628        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
629            for fpath, fdata in SMALL_TEST_DATA:
630                zipfp.writestr(fpath, fdata)
631
632        try:
633            with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
634                raise zipfile.BadZipfile()
635        except zipfile.BadZipfile:
636            self.assertTrue(zipfp2.fp is None, 'zipfp is not closed')
637
638    def test_add_file_before_1980(self):
639        # Set atime and mtime to 1970-01-01
640        os.utime(TESTFN, (0, 0))
641        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
642            self.assertRaises(ValueError, zipfp.write, TESTFN)
643
644    def tearDown(self):
645        unlink(TESTFN)
646        unlink(TESTFN2)
647
648
649class TestZip64InSmallFiles(unittest.TestCase):
650    # These tests test the ZIP64 functionality without using large files,
651    # see test_zipfile64 for proper tests.
652
653    def setUp(self):
654        self._limit = zipfile.ZIP64_LIMIT
655        self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT
656        zipfile.ZIP64_LIMIT = 1000
657        zipfile.ZIP_FILECOUNT_LIMIT = 9
658
659        line_gen = ("Test of zipfile line %d." % i
660                    for i in range(0, FIXEDTEST_SIZE))
661        self.data = '\n'.join(line_gen)
662
663        # Make a source file with some lines
664        with open(TESTFN, "wb") as fp:
665            fp.write(self.data)
666
667    def large_file_exception_test(self, f, compression):
668        with zipfile.ZipFile(f, "w", compression) as zipfp:
669            self.assertRaises(zipfile.LargeZipFile,
670                              zipfp.write, TESTFN, "another.name")
671
672    def large_file_exception_test2(self, f, compression):
673        with zipfile.ZipFile(f, "w", compression) as zipfp:
674            self.assertRaises(zipfile.LargeZipFile,
675                              zipfp.writestr, "another.name", self.data)
676
677    def test_large_file_exception(self):
678        for f in (TESTFN2, TemporaryFile(), StringIO()):
679            self.large_file_exception_test(f, zipfile.ZIP_STORED)
680            self.large_file_exception_test2(f, zipfile.ZIP_STORED)
681
682    def zip_test(self, f, compression):
683        # Create the ZIP archive
684        with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp:
685            zipfp.write(TESTFN, "another.name")
686            zipfp.write(TESTFN, TESTFN)
687            zipfp.writestr("strfile", self.data)
688
689        # Read the ZIP archive
690        with zipfile.ZipFile(f, "r", compression) as zipfp:
691            self.assertEqual(zipfp.read(TESTFN), self.data)
692            self.assertEqual(zipfp.read("another.name"), self.data)
693            self.assertEqual(zipfp.read("strfile"), self.data)
694
695            # Print the ZIP directory
696            fp = StringIO()
697            stdout = sys.stdout
698            try:
699                sys.stdout = fp
700                zipfp.printdir()
701            finally:
702                sys.stdout = stdout
703
704            directory = fp.getvalue()
705            lines = directory.splitlines()
706            self.assertEqual(len(lines), 4) # Number of files + header
707
708            self.assertIn('File Name', lines[0])
709            self.assertIn('Modified', lines[0])
710            self.assertIn('Size', lines[0])
711
712            fn, date, time_, size = lines[1].split()
713            self.assertEqual(fn, 'another.name')
714            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
715            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
716            self.assertEqual(size, str(len(self.data)))
717
718            # Check the namelist
719            names = zipfp.namelist()
720            self.assertEqual(len(names), 3)
721            self.assertIn(TESTFN, names)
722            self.assertIn("another.name", names)
723            self.assertIn("strfile", names)
724
725            # Check infolist
726            infos = zipfp.infolist()
727            names = [i.filename for i in infos]
728            self.assertEqual(len(names), 3)
729            self.assertIn(TESTFN, names)
730            self.assertIn("another.name", names)
731            self.assertIn("strfile", names)
732            for i in infos:
733                self.assertEqual(i.file_size, len(self.data))
734
735            # check getinfo
736            for nm in (TESTFN, "another.name", "strfile"):
737                info = zipfp.getinfo(nm)
738                self.assertEqual(info.filename, nm)
739                self.assertEqual(info.file_size, len(self.data))
740
741            # Check that testzip doesn't raise an exception
742            zipfp.testzip()
743
744    def test_stored(self):
745        for f in (TESTFN2, TemporaryFile(), StringIO()):
746            self.zip_test(f, zipfile.ZIP_STORED)
747
748    @skipUnless(zlib, "requires zlib")
749    def test_deflated(self):
750        for f in (TESTFN2, TemporaryFile(), StringIO()):
751            self.zip_test(f, zipfile.ZIP_DEFLATED)
752
753    def test_absolute_arcnames(self):
754        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED,
755                             allowZip64=True) as zipfp:
756            zipfp.write(TESTFN, "/absolute")
757
758        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
759            self.assertEqual(zipfp.namelist(), ["absolute"])
760
761    def test_too_many_files(self):
762        # This test checks that more than 64k files can be added to an archive,
763        # and that the resulting archive can be read properly by ZipFile
764        zipf = zipfile.ZipFile(TESTFN, mode="w", allowZip64=True)
765        zipf.debug = 100
766        numfiles = 15
767        for i in range(numfiles):
768            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
769        self.assertEqual(len(zipf.namelist()), numfiles)
770        zipf.close()
771
772        zipf2 = zipfile.ZipFile(TESTFN, mode="r")
773        self.assertEqual(len(zipf2.namelist()), numfiles)
774        for i in range(numfiles):
775            content = zipf2.read("foo%08d" % i)
776            self.assertEqual(content, "%d" % (i**3 % 57))
777        zipf2.close()
778
779    def test_too_many_files_append(self):
780        zipf = zipfile.ZipFile(TESTFN, mode="w", allowZip64=False)
781        zipf.debug = 100
782        numfiles = 9
783        for i in range(numfiles):
784            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
785        self.assertEqual(len(zipf.namelist()), numfiles)
786        with self.assertRaises(zipfile.LargeZipFile):
787            zipf.writestr("foo%08d" % numfiles, b'')
788        self.assertEqual(len(zipf.namelist()), numfiles)
789        zipf.close()
790
791        zipf = zipfile.ZipFile(TESTFN, mode="a", allowZip64=False)
792        zipf.debug = 100
793        self.assertEqual(len(zipf.namelist()), numfiles)
794        with self.assertRaises(zipfile.LargeZipFile):
795            zipf.writestr("foo%08d" % numfiles, b'')
796        self.assertEqual(len(zipf.namelist()), numfiles)
797        zipf.close()
798
799        zipf = zipfile.ZipFile(TESTFN, mode="a", allowZip64=True)
800        zipf.debug = 100
801        self.assertEqual(len(zipf.namelist()), numfiles)
802        numfiles2 = 15
803        for i in range(numfiles, numfiles2):
804            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
805        self.assertEqual(len(zipf.namelist()), numfiles2)
806        zipf.close()
807
808        zipf2 = zipfile.ZipFile(TESTFN, mode="r")
809        self.assertEqual(len(zipf2.namelist()), numfiles2)
810        for i in range(numfiles2):
811            content = zipf2.read("foo%08d" % i)
812            self.assertEqual(content, "%d" % (i**3 % 57))
813        zipf2.close()
814
815    def tearDown(self):
816        zipfile.ZIP64_LIMIT = self._limit
817        zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit
818        unlink(TESTFN)
819        unlink(TESTFN2)
820
821
822class PyZipFileTests(unittest.TestCase):
823    def requiresWriteAccess(self, path):
824        if not os.access(path, os.W_OK):
825            self.skipTest('requires write access to the installed location')
826        filename = os.path.join(path, 'test_zipfile.try')
827        try:
828            fd = os.open(filename, os.O_WRONLY | os.O_CREAT)
829            os.close(fd)
830        except Exception:
831            self.skipTest('requires write access to the installed location')
832        unlink(filename)
833
834    def test_write_pyfile(self):
835        self.requiresWriteAccess(os.path.dirname(__file__))
836        with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
837            fn = __file__
838            if fn.endswith('.pyc') or fn.endswith('.pyo'):
839                fn = fn[:-1]
840
841            zipfp.writepy(fn)
842
843            bn = os.path.basename(fn)
844            self.assertNotIn(bn, zipfp.namelist())
845            self.assertTrue(bn + 'o' in zipfp.namelist() or
846                            bn + 'c' in zipfp.namelist())
847
848        with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
849            fn = __file__
850            if fn.endswith(('.pyc', '.pyo')):
851                fn = fn[:-1]
852
853            zipfp.writepy(fn, "testpackage")
854
855            bn = "%s/%s" % ("testpackage", os.path.basename(fn))
856            self.assertNotIn(bn, zipfp.namelist())
857            self.assertTrue(bn + 'o' in zipfp.namelist() or
858                            bn + 'c' in zipfp.namelist())
859
860    def test_write_python_package(self):
861        import email
862        packagedir = os.path.dirname(email.__file__)
863        self.requiresWriteAccess(packagedir)
864
865        with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
866            zipfp.writepy(packagedir)
867
868            # Check for a couple of modules at different levels of the
869            # hierarchy
870            names = zipfp.namelist()
871            self.assertTrue('email/__init__.pyo' in names or
872                            'email/__init__.pyc' in names)
873            self.assertTrue('email/mime/text.pyo' in names or
874                            'email/mime/text.pyc' in names)
875
876    def test_write_python_directory(self):
877        os.mkdir(TESTFN2)
878        try:
879            with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
880                fp.write("print(42)\n")
881
882            with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp:
883                fp.write("print(42 * 42)\n")
884
885            with open(os.path.join(TESTFN2, "mod2.txt"), "w") as fp:
886                fp.write("bla bla bla\n")
887
888            zipfp  = zipfile.PyZipFile(TemporaryFile(), "w")
889            zipfp.writepy(TESTFN2)
890
891            names = zipfp.namelist()
892            self.assertTrue('mod1.pyc' in names or 'mod1.pyo' in names)
893            self.assertTrue('mod2.pyc' in names or 'mod2.pyo' in names)
894            self.assertNotIn('mod2.txt', names)
895
896        finally:
897            rmtree(TESTFN2)
898
899    def test_write_non_pyfile(self):
900        with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
901            with open(TESTFN, 'w') as fid:
902                fid.write('most definitely not a python file')
903            self.assertRaises(RuntimeError, zipfp.writepy, TESTFN)
904            os.remove(TESTFN)
905
906
907class OtherTests(unittest.TestCase):
908    zips_with_bad_crc = {
909        zipfile.ZIP_STORED: (
910            b'PK\003\004\024\0\0\0\0\0 \213\212;:r'
911            b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af'
912            b'ilehello,AworldP'
913            b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:'
914            b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0'
915            b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi'
916            b'lePK\005\006\0\0\0\0\001\0\001\0003\000'
917            b'\0\0/\0\0\0\0\0'),
918        zipfile.ZIP_DEFLATED: (
919            b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA'
920            b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
921            b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0'
922            b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n'
923            b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05'
924            b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00'
925            b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00'
926            b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00'),
927    }
928
929    def test_unicode_filenames(self):
930        with zipfile.ZipFile(TESTFN, "w") as zf:
931            zf.writestr(u"foo.txt", "Test for unicode filename")
932            zf.writestr(u"\xf6.txt", "Test for unicode filename")
933            self.assertIsInstance(zf.infolist()[0].filename, unicode)
934
935        with zipfile.ZipFile(TESTFN, "r") as zf:
936            self.assertEqual(zf.filelist[0].filename, "foo.txt")
937            self.assertEqual(zf.filelist[1].filename, u"\xf6.txt")
938
939    def test_create_non_existent_file_for_append(self):
940        if os.path.exists(TESTFN):
941            os.unlink(TESTFN)
942
943        filename = 'testfile.txt'
944        content = 'hello, world. this is some content.'
945
946        try:
947            with zipfile.ZipFile(TESTFN, 'a') as zf:
948                zf.writestr(filename, content)
949        except IOError:
950            self.fail('Could not append data to a non-existent zip file.')
951
952        self.assertTrue(os.path.exists(TESTFN))
953
954        with zipfile.ZipFile(TESTFN, 'r') as zf:
955            self.assertEqual(zf.read(filename), content)
956
957    def test_close_erroneous_file(self):
958        # This test checks that the ZipFile constructor closes the file object
959        # it opens if there's an error in the file.  If it doesn't, the
960        # traceback holds a reference to the ZipFile object and, indirectly,
961        # the file object.
962        # On Windows, this causes the os.unlink() call to fail because the
963        # underlying file is still open.  This is SF bug #412214.
964        #
965        with open(TESTFN, "w") as fp:
966            fp.write("this is not a legal zip file\n")
967        try:
968            zf = zipfile.ZipFile(TESTFN)
969        except zipfile.BadZipfile:
970            pass
971
972    def test_is_zip_erroneous_file(self):
973        """Check that is_zipfile() correctly identifies non-zip files."""
974        # - passing a filename
975        with open(TESTFN, "w") as fp:
976            fp.write("this is not a legal zip file\n")
977        chk = zipfile.is_zipfile(TESTFN)
978        self.assertFalse(chk)
979        # - passing a file object
980        with open(TESTFN, "rb") as fp:
981            chk = zipfile.is_zipfile(fp)
982            self.assertTrue(not chk)
983        # - passing a file-like object
984        fp = StringIO()
985        fp.write("this is not a legal zip file\n")
986        chk = zipfile.is_zipfile(fp)
987        self.assertTrue(not chk)
988        fp.seek(0, 0)
989        chk = zipfile.is_zipfile(fp)
990        self.assertTrue(not chk)
991
992    def test_damaged_zipfile(self):
993        """Check that zipfiles with missing bytes at the end raise BadZipFile."""
994        # - Create a valid zip file
995        fp = io.BytesIO()
996        with zipfile.ZipFile(fp, mode="w") as zipf:
997            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
998        zipfiledata = fp.getvalue()
999
1000        # - Now create copies of it missing the last N bytes and make sure
1001        #   a BadZipFile exception is raised when we try to open it
1002        for N in range(len(zipfiledata)):
1003            fp = io.BytesIO(zipfiledata[:N])
1004            self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, fp)
1005
1006    def test_is_zip_valid_file(self):
1007        """Check that is_zipfile() correctly identifies zip files."""
1008        # - passing a filename
1009        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1010            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1011        chk = zipfile.is_zipfile(TESTFN)
1012        self.assertTrue(chk)
1013        # - passing a file object
1014        with open(TESTFN, "rb") as fp:
1015            chk = zipfile.is_zipfile(fp)
1016            self.assertTrue(chk)
1017            fp.seek(0, 0)
1018            zip_contents = fp.read()
1019        # - passing a file-like object
1020        fp = StringIO()
1021        fp.write(zip_contents)
1022        chk = zipfile.is_zipfile(fp)
1023        self.assertTrue(chk)
1024        fp.seek(0, 0)
1025        chk = zipfile.is_zipfile(fp)
1026        self.assertTrue(chk)
1027
1028    def test_non_existent_file_raises_IOError(self):
1029        # make sure we don't raise an AttributeError when a partially-constructed
1030        # ZipFile instance is finalized; this tests for regression on SF tracker
1031        # bug #403871.
1032
1033        # The bug we're testing for caused an AttributeError to be raised
1034        # when a ZipFile instance was created for a file that did not
1035        # exist; the .fp member was not initialized but was needed by the
1036        # __del__() method.  Since the AttributeError is in the __del__(),
1037        # it is ignored, but the user should be sufficiently annoyed by
1038        # the message on the output that regression will be noticed
1039        # quickly.
1040        self.assertRaises(IOError, zipfile.ZipFile, TESTFN)
1041
1042    def test_empty_file_raises_BadZipFile(self):
1043        with open(TESTFN, 'w') as f:
1044            pass
1045        self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN)
1046
1047        with open(TESTFN, 'w') as fp:
1048            fp.write("short file")
1049        self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN)
1050
1051    def test_closed_zip_raises_RuntimeError(self):
1052        """Verify that testzip() doesn't swallow inappropriate exceptions."""
1053        data = StringIO()
1054        with zipfile.ZipFile(data, mode="w") as zipf:
1055            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1056
1057        # This is correct; calling .read on a closed ZipFile should raise
1058        # a RuntimeError, and so should calling .testzip.  An earlier
1059        # version of .testzip would swallow this exception (and any other)
1060        # and report that the first file in the archive was corrupt.
1061        self.assertRaises(RuntimeError, zipf.read, "foo.txt")
1062        self.assertRaises(RuntimeError, zipf.open, "foo.txt")
1063        self.assertRaises(RuntimeError, zipf.testzip)
1064        self.assertRaises(RuntimeError, zipf.writestr, "bogus.txt", "bogus")
1065        with open(TESTFN, 'w') as fid:
1066            fid.write('zipfile test data')
1067            self.assertRaises(RuntimeError, zipf.write, TESTFN)
1068
1069    def test_bad_constructor_mode(self):
1070        """Check that bad modes passed to ZipFile constructor are caught."""
1071        self.assertRaises(RuntimeError, zipfile.ZipFile, TESTFN, "q")
1072
1073    def test_bad_open_mode(self):
1074        """Check that bad modes passed to ZipFile.open are caught."""
1075        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1076            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1077
1078        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
1079        # read the data to make sure the file is there
1080            zipf.read("foo.txt")
1081            self.assertRaises(RuntimeError, zipf.open, "foo.txt", "q")
1082
1083    def test_read0(self):
1084        """Check that calling read(0) on a ZipExtFile object returns an empty
1085        string and doesn't advance file pointer."""
1086        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1087            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1088            # read the data to make sure the file is there
1089            with zipf.open("foo.txt") as f:
1090                for i in xrange(FIXEDTEST_SIZE):
1091                    self.assertEqual(f.read(0), '')
1092
1093                self.assertEqual(f.read(), "O, for a Muse of Fire!")
1094
1095    def test_open_non_existent_item(self):
1096        """Check that attempting to call open() for an item that doesn't
1097        exist in the archive raises a RuntimeError."""
1098        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1099            self.assertRaises(KeyError, zipf.open, "foo.txt", "r")
1100
1101    def test_bad_compression_mode(self):
1102        """Check that bad compression methods passed to ZipFile.open are
1103        caught."""
1104        self.assertRaises(RuntimeError, zipfile.ZipFile, TESTFN, "w", -1)
1105
1106    def test_unsupported_compression(self):
1107        # data is declared as shrunk, but actually deflated
1108        data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00'
1109        b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01'
1110        b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00'
1111        b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
1112        b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00'
1113        b'/\x00\x00\x00!\x00\x00\x00\x00\x00')
1114        with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf:
1115            self.assertRaises(NotImplementedError, zipf.open, 'x')
1116
1117    def test_null_byte_in_filename(self):
1118        """Check that a filename containing a null byte is properly
1119        terminated."""
1120        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1121            zipf.writestr("foo.txt\x00qqq", "O, for a Muse of Fire!")
1122            self.assertEqual(zipf.namelist(), ['foo.txt'])
1123
1124    def test_struct_sizes(self):
1125        """Check that ZIP internal structure sizes are calculated correctly."""
1126        self.assertEqual(zipfile.sizeEndCentDir, 22)
1127        self.assertEqual(zipfile.sizeCentralDir, 46)
1128        self.assertEqual(zipfile.sizeEndCentDir64, 56)
1129        self.assertEqual(zipfile.sizeEndCentDir64Locator, 20)
1130
1131    def test_comments(self):
1132        """Check that comments on the archive are handled properly."""
1133
1134        # check default comment is empty
1135        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1136            self.assertEqual(zipf.comment, '')
1137            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1138
1139        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
1140            self.assertEqual(zipf.comment, '')
1141
1142        # check a simple short comment
1143        comment = 'Bravely taking to his feet, he beat a very brave retreat.'
1144        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1145            zipf.comment = comment
1146            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1147        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
1148            self.assertEqual(zipf.comment, comment)
1149
1150        # check a comment of max length
1151        comment2 = ''.join(['%d' % (i**3 % 10) for i in xrange((1 << 16)-1)])
1152        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1153            zipf.comment = comment2
1154            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1155
1156        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
1157            self.assertEqual(zipf.comment, comment2)
1158
1159        # check a comment that is too long is truncated
1160        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1161            with check_warnings(('', UserWarning)):
1162                zipf.comment = comment2 + 'oops'
1163            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1164        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
1165            self.assertEqual(zipf.comment, comment2)
1166
1167    def test_change_comment_in_empty_archive(self):
1168        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
1169            self.assertFalse(zipf.filelist)
1170            zipf.comment = b"this is a comment"
1171        with zipfile.ZipFile(TESTFN, "r") as zipf:
1172            self.assertEqual(zipf.comment, b"this is a comment")
1173
1174    def test_change_comment_in_nonempty_archive(self):
1175        with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
1176            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1177        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
1178            self.assertTrue(zipf.filelist)
1179            zipf.comment = b"this is a comment"
1180        with zipfile.ZipFile(TESTFN, "r") as zipf:
1181            self.assertEqual(zipf.comment, b"this is a comment")
1182
1183    def check_testzip_with_bad_crc(self, compression):
1184        """Tests that files with bad CRCs return their name from testzip."""
1185        zipdata = self.zips_with_bad_crc[compression]
1186
1187        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
1188            # testzip returns the name of the first corrupt file, or None
1189            self.assertEqual('afile', zipf.testzip())
1190
1191    def test_testzip_with_bad_crc_stored(self):
1192        self.check_testzip_with_bad_crc(zipfile.ZIP_STORED)
1193
1194    @skipUnless(zlib, "requires zlib")
1195    def test_testzip_with_bad_crc_deflated(self):
1196        self.check_testzip_with_bad_crc(zipfile.ZIP_DEFLATED)
1197
1198    def check_read_with_bad_crc(self, compression):
1199        """Tests that files with bad CRCs raise a BadZipfile exception when read."""
1200        zipdata = self.zips_with_bad_crc[compression]
1201
1202        # Using ZipFile.read()
1203        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
1204            self.assertRaises(zipfile.BadZipfile, zipf.read, 'afile')
1205
1206        # Using ZipExtFile.read()
1207        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
1208            with zipf.open('afile', 'r') as corrupt_file:
1209                self.assertRaises(zipfile.BadZipfile, corrupt_file.read)
1210
1211        # Same with small reads (in order to exercise the buffering logic)
1212        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
1213            with zipf.open('afile', 'r') as corrupt_file:
1214                corrupt_file.MIN_READ_SIZE = 2
1215                with self.assertRaises(zipfile.BadZipfile):
1216                    while corrupt_file.read(2):
1217                        pass
1218
1219    def test_read_with_bad_crc_stored(self):
1220        self.check_read_with_bad_crc(zipfile.ZIP_STORED)
1221
1222    @skipUnless(zlib, "requires zlib")
1223    def test_read_with_bad_crc_deflated(self):
1224        self.check_read_with_bad_crc(zipfile.ZIP_DEFLATED)
1225
1226    def check_read_return_size(self, compression):
1227        # Issue #9837: ZipExtFile.read() shouldn't return more bytes
1228        # than requested.
1229        for test_size in (1, 4095, 4096, 4097, 16384):
1230            file_size = test_size + 1
1231            junk = getrandbytes(file_size)
1232            with zipfile.ZipFile(io.BytesIO(), "w", compression) as zipf:
1233                zipf.writestr('foo', junk)
1234                with zipf.open('foo', 'r') as fp:
1235                    buf = fp.read(test_size)
1236                    self.assertEqual(len(buf), test_size)
1237
1238    def test_read_return_size_stored(self):
1239        self.check_read_return_size(zipfile.ZIP_STORED)
1240
1241    @skipUnless(zlib, "requires zlib")
1242    def test_read_return_size_deflated(self):
1243        self.check_read_return_size(zipfile.ZIP_DEFLATED)
1244
1245    def test_empty_zipfile(self):
1246        # Check that creating a file in 'w' or 'a' mode and closing without
1247        # adding any files to the archives creates a valid empty ZIP file
1248        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1249            pass
1250        try:
1251            zipf = zipfile.ZipFile(TESTFN, mode="r")
1252            zipf.close()
1253        except zipfile.BadZipfile:
1254            self.fail("Unable to create empty ZIP file in 'w' mode")
1255
1256        with zipfile.ZipFile(TESTFN, mode="a") as zipf:
1257            pass
1258        try:
1259            zipf = zipfile.ZipFile(TESTFN, mode="r")
1260            zipf.close()
1261        except:
1262            self.fail("Unable to create empty ZIP file in 'a' mode")
1263
1264    def test_open_empty_file(self):
1265        # Issue 1710703: Check that opening a file with less than 22 bytes
1266        # raises a BadZipfile exception (rather than the previously unhelpful
1267        # IOError)
1268        with open(TESTFN, 'w') as f:
1269            pass
1270        self.assertRaises(zipfile.BadZipfile, zipfile.ZipFile, TESTFN, 'r')
1271
1272    def test_create_zipinfo_before_1980(self):
1273        self.assertRaises(ValueError,
1274                          zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0))
1275
1276    def test_zipfile_with_short_extra_field(self):
1277        """If an extra field in the header is less than 4 bytes, skip it."""
1278        zipdata = (
1279            b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x93\x9b\xad@\x8b\x9e'
1280            b'\xd9\xd3\x01\x00\x00\x00\x01\x00\x00\x00\x03\x00\x03\x00ab'
1281            b'c\x00\x00\x00APK\x01\x02\x14\x03\x14\x00\x00\x00\x00'
1282            b'\x00\x93\x9b\xad@\x8b\x9e\xd9\xd3\x01\x00\x00\x00\x01\x00\x00'
1283            b'\x00\x03\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00'
1284            b'\x00\x00\x00abc\x00\x00PK\x05\x06\x00\x00\x00\x00'
1285            b'\x01\x00\x01\x003\x00\x00\x00%\x00\x00\x00\x00\x00'
1286        )
1287        with zipfile.ZipFile(io.BytesIO(zipdata), 'r') as zipf:
1288            # testzip returns the name of the first corrupt file, or None
1289            self.assertIsNone(zipf.testzip())
1290
1291    def tearDown(self):
1292        unlink(TESTFN)
1293        unlink(TESTFN2)
1294
1295
1296class DecryptionTests(unittest.TestCase):
1297    """Check that ZIP decryption works. Since the library does not
1298    support encryption at the moment, we use a pre-generated encrypted
1299    ZIP file."""
1300
1301    data = (
1302    'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00'
1303    '\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y'
1304    '\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl'
1305    'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00'
1306    '\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81'
1307    '\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00'
1308    '\x00\x00L\x00\x00\x00\x00\x00' )
1309    data2 = (
1310    'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02'
1311    '\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04'
1312    '\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0'
1313    'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03'
1314    '\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00'
1315    '\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze'
1316    'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01'
1317    '\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' )
1318
1319    plain = 'zipfile.py encryption test'
1320    plain2 = '\x00'*512
1321
1322    def setUp(self):
1323        with open(TESTFN, "wb") as fp:
1324            fp.write(self.data)
1325        self.zip = zipfile.ZipFile(TESTFN, "r")
1326        with open(TESTFN2, "wb") as fp:
1327            fp.write(self.data2)
1328        self.zip2 = zipfile.ZipFile(TESTFN2, "r")
1329
1330    def tearDown(self):
1331        self.zip.close()
1332        os.unlink(TESTFN)
1333        self.zip2.close()
1334        os.unlink(TESTFN2)
1335
1336    def test_no_password(self):
1337        # Reading the encrypted file without password
1338        # must generate a RunTime exception
1339        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
1340        self.assertRaises(RuntimeError, self.zip2.read, "zero")
1341
1342    def test_bad_password(self):
1343        self.zip.setpassword("perl")
1344        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
1345        self.zip2.setpassword("perl")
1346        self.assertRaises(RuntimeError, self.zip2.read, "zero")
1347
1348    @skipUnless(zlib, "requires zlib")
1349    def test_good_password(self):
1350        self.zip.setpassword("python")
1351        self.assertEqual(self.zip.read("test.txt"), self.plain)
1352        self.zip2.setpassword("12345")
1353        self.assertEqual(self.zip2.read("zero"), self.plain2)
1354
1355
1356class TestsWithRandomBinaryFiles(unittest.TestCase):
1357    def setUp(self):
1358        datacount = randint(16, 64)*1024 + randint(1, 1024)
1359        self.data = ''.join(struct.pack('<f', random()*randint(-1000, 1000))
1360                            for i in xrange(datacount))
1361
1362        # Make a source file with some lines
1363        with open(TESTFN, "wb") as fp:
1364            fp.write(self.data)
1365
1366    def tearDown(self):
1367        unlink(TESTFN)
1368        unlink(TESTFN2)
1369
1370    def make_test_archive(self, f, compression):
1371        # Create the ZIP archive
1372        with zipfile.ZipFile(f, "w", compression) as zipfp:
1373            zipfp.write(TESTFN, "another.name")
1374            zipfp.write(TESTFN, TESTFN)
1375
1376    def zip_test(self, f, compression):
1377        self.make_test_archive(f, compression)
1378
1379        # Read the ZIP archive
1380        with zipfile.ZipFile(f, "r", compression) as zipfp:
1381            testdata = zipfp.read(TESTFN)
1382            self.assertEqual(len(testdata), len(self.data))
1383            self.assertEqual(testdata, self.data)
1384            self.assertEqual(zipfp.read("another.name"), self.data)
1385
1386    def test_stored(self):
1387        for f in (TESTFN2, TemporaryFile(), StringIO()):
1388            self.zip_test(f, zipfile.ZIP_STORED)
1389
1390    @skipUnless(zlib, "requires zlib")
1391    def test_deflated(self):
1392        for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
1393            self.zip_test(f, zipfile.ZIP_DEFLATED)
1394
1395    def zip_open_test(self, f, compression):
1396        self.make_test_archive(f, compression)
1397
1398        # Read the ZIP archive
1399        with zipfile.ZipFile(f, "r", compression) as zipfp:
1400            zipdata1 = []
1401            with zipfp.open(TESTFN) as zipopen1:
1402                while True:
1403                    read_data = zipopen1.read(256)
1404                    if not read_data:
1405                        break
1406                    zipdata1.append(read_data)
1407
1408            zipdata2 = []
1409            with zipfp.open("another.name") as zipopen2:
1410                while True:
1411                    read_data = zipopen2.read(256)
1412                    if not read_data:
1413                        break
1414                    zipdata2.append(read_data)
1415
1416            testdata1 = ''.join(zipdata1)
1417            self.assertEqual(len(testdata1), len(self.data))
1418            self.assertEqual(testdata1, self.data)
1419
1420            testdata2 = ''.join(zipdata2)
1421            self.assertEqual(len(testdata2), len(self.data))
1422            self.assertEqual(testdata2, self.data)
1423
1424    def test_open_stored(self):
1425        for f in (TESTFN2, TemporaryFile(), StringIO()):
1426            self.zip_open_test(f, zipfile.ZIP_STORED)
1427
1428    @skipUnless(zlib, "requires zlib")
1429    def test_open_deflated(self):
1430        for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
1431            self.zip_open_test(f, zipfile.ZIP_DEFLATED)
1432
1433    def zip_random_open_test(self, f, compression):
1434        self.make_test_archive(f, compression)
1435
1436        # Read the ZIP archive
1437        with zipfile.ZipFile(f, "r", compression) as zipfp:
1438            zipdata1 = []
1439            with zipfp.open(TESTFN) as zipopen1:
1440                while True:
1441                    read_data = zipopen1.read(randint(1, 1024))
1442                    if not read_data:
1443                        break
1444                    zipdata1.append(read_data)
1445
1446            testdata = ''.join(zipdata1)
1447            self.assertEqual(len(testdata), len(self.data))
1448            self.assertEqual(testdata, self.data)
1449
1450    def test_random_open_stored(self):
1451        for f in (TESTFN2, TemporaryFile(), StringIO()):
1452            self.zip_random_open_test(f, zipfile.ZIP_STORED)
1453
1454    @skipUnless(zlib, "requires zlib")
1455    def test_random_open_deflated(self):
1456        for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
1457            self.zip_random_open_test(f, zipfile.ZIP_DEFLATED)
1458
1459
1460@skipUnless(zlib, "requires zlib")
1461class TestsWithMultipleOpens(unittest.TestCase):
1462    @classmethod
1463    def setUpClass(cls):
1464        cls.data1 = b'111' + getrandbytes(10000)
1465        cls.data2 = b'222' + getrandbytes(10000)
1466
1467    def make_test_archive(self, f):
1468        # Create the ZIP archive
1469        with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp:
1470            zipfp.writestr('ones', self.data1)
1471            zipfp.writestr('twos', self.data2)
1472
1473    def test_same_file(self):
1474        # Verify that (when the ZipFile is in control of creating file objects)
1475        # multiple open() calls can be made without interfering with each other.
1476        self.make_test_archive(TESTFN2)
1477        with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
1478            with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2:
1479                data1 = zopen1.read(500)
1480                data2 = zopen2.read(500)
1481                data1 += zopen1.read()
1482                data2 += zopen2.read()
1483            self.assertEqual(data1, data2)
1484            self.assertEqual(data1, self.data1)
1485
1486    def test_different_file(self):
1487        # Verify that (when the ZipFile is in control of creating file objects)
1488        # multiple open() calls can be made without interfering with each other.
1489        self.make_test_archive(TESTFN2)
1490        with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
1491            with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
1492                data1 = zopen1.read(500)
1493                data2 = zopen2.read(500)
1494                data1 += zopen1.read()
1495                data2 += zopen2.read()
1496            self.assertEqual(data1, self.data1)
1497            self.assertEqual(data2, self.data2)
1498
1499    def test_interleaved(self):
1500        # Verify that (when the ZipFile is in control of creating file objects)
1501        # multiple open() calls can be made without interfering with each other.
1502        self.make_test_archive(TESTFN2)
1503        with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
1504            with zipf.open('ones') as zopen1:
1505                data1 = zopen1.read(500)
1506                with zipf.open('twos') as zopen2:
1507                    data2 = zopen2.read(500)
1508                    data1 += zopen1.read()
1509                    data2 += zopen2.read()
1510            self.assertEqual(data1, self.data1)
1511            self.assertEqual(data2, self.data2)
1512
1513    def test_read_after_close(self):
1514        self.make_test_archive(TESTFN2)
1515        zopen1 = zopen2 = None
1516        try:
1517            with zipfile.ZipFile(TESTFN2, 'r') as zipf:
1518                zopen1 = zipf.open('ones')
1519                zopen2 = zipf.open('twos')
1520            data1 = zopen1.read(500)
1521            data2 = zopen2.read(500)
1522            data1 += zopen1.read()
1523            data2 += zopen2.read()
1524        finally:
1525            if zopen1:
1526                zopen1.close()
1527            if zopen2:
1528                zopen2.close()
1529        self.assertEqual(data1, self.data1)
1530        self.assertEqual(data2, self.data2)
1531
1532    def test_read_after_write(self):
1533        with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf:
1534            zipf.writestr('ones', self.data1)
1535            zipf.writestr('twos', self.data2)
1536            with zipf.open('ones') as zopen1:
1537                data1 = zopen1.read(500)
1538        self.assertEqual(data1, self.data1[:500])
1539        with zipfile.ZipFile(TESTFN2, 'r') as zipf:
1540            data1 = zipf.read('ones')
1541            data2 = zipf.read('twos')
1542        self.assertEqual(data1, self.data1)
1543        self.assertEqual(data2, self.data2)
1544
1545    def test_write_after_read(self):
1546        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipf:
1547            zipf.writestr('ones', self.data1)
1548            with zipf.open('ones') as zopen1:
1549                zopen1.read(500)
1550                zipf.writestr('twos', self.data2)
1551        with zipfile.ZipFile(TESTFN2, 'r') as zipf:
1552            data1 = zipf.read('ones')
1553            data2 = zipf.read('twos')
1554        self.assertEqual(data1, self.data1)
1555        self.assertEqual(data2, self.data2)
1556
1557    def test_many_opens(self):
1558        # Verify that read() and open() promptly close the file descriptor,
1559        # and don't rely on the garbage collector to free resources.
1560        self.make_test_archive(TESTFN2)
1561        with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
1562            for x in range(100):
1563                zipf.read('ones')
1564                with zipf.open('ones') as zopen1:
1565                    pass
1566        with open(os.devnull) as f:
1567            self.assertLess(f.fileno(), 100)
1568
1569    def tearDown(self):
1570        unlink(TESTFN2)
1571
1572
1573class TestWithDirectory(unittest.TestCase):
1574    def setUp(self):
1575        os.mkdir(TESTFN2)
1576
1577    def test_extract_dir(self):
1578        with zipfile.ZipFile(findfile("zipdir.zip")) as zipf:
1579            zipf.extractall(TESTFN2)
1580        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a")))
1581        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b")))
1582        self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c")))
1583
1584    def test_bug_6050(self):
1585        # Extraction should succeed if directories already exist
1586        os.mkdir(os.path.join(TESTFN2, "a"))
1587        self.test_extract_dir()
1588
1589    def test_write_dir(self):
1590        dirpath = os.path.join(TESTFN2, "x")
1591        os.mkdir(dirpath)
1592        mode = os.stat(dirpath).st_mode & 0xFFFF
1593        with zipfile.ZipFile(TESTFN, "w") as zipf:
1594            zipf.write(dirpath)
1595            zinfo = zipf.filelist[0]
1596            self.assertTrue(zinfo.filename.endswith("/x/"))
1597            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
1598            zipf.write(dirpath, "y")
1599            zinfo = zipf.filelist[1]
1600            self.assertTrue(zinfo.filename, "y/")
1601            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
1602        with zipfile.ZipFile(TESTFN, "r") as zipf:
1603            zinfo = zipf.filelist[0]
1604            self.assertTrue(zinfo.filename.endswith("/x/"))
1605            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
1606            zinfo = zipf.filelist[1]
1607            self.assertTrue(zinfo.filename, "y/")
1608            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
1609            target = os.path.join(TESTFN2, "target")
1610            os.mkdir(target)
1611            zipf.extractall(target)
1612            self.assertTrue(os.path.isdir(os.path.join(target, "y")))
1613            self.assertEqual(len(os.listdir(target)), 2)
1614
1615    def test_writestr_dir(self):
1616        os.mkdir(os.path.join(TESTFN2, "x"))
1617        with zipfile.ZipFile(TESTFN, "w") as zipf:
1618            zipf.writestr("x/", b'')
1619            zinfo = zipf.filelist[0]
1620            self.assertEqual(zinfo.filename, "x/")
1621            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
1622        with zipfile.ZipFile(TESTFN, "r") as zipf:
1623            zinfo = zipf.filelist[0]
1624            self.assertTrue(zinfo.filename.endswith("x/"))
1625            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
1626            target = os.path.join(TESTFN2, "target")
1627            os.mkdir(target)
1628            zipf.extractall(target)
1629            self.assertTrue(os.path.isdir(os.path.join(target, "x")))
1630            self.assertEqual(os.listdir(target), ["x"])
1631
1632    def tearDown(self):
1633        rmtree(TESTFN2)
1634        if os.path.exists(TESTFN):
1635            unlink(TESTFN)
1636
1637
1638class UniversalNewlineTests(unittest.TestCase):
1639    def setUp(self):
1640        self.line_gen = ["Test of zipfile line %d." % i
1641                         for i in xrange(FIXEDTEST_SIZE)]
1642        self.seps = ('\r', '\r\n', '\n')
1643        self.arcdata, self.arcfiles = {}, {}
1644        for n, s in enumerate(self.seps):
1645            self.arcdata[s] = s.join(self.line_gen) + s
1646            self.arcfiles[s] = '%s-%d' % (TESTFN, n)
1647            with open(self.arcfiles[s], "wb") as fid:
1648                fid.write(self.arcdata[s])
1649
1650    def make_test_archive(self, f, compression):
1651        # Create the ZIP archive
1652        with zipfile.ZipFile(f, "w", compression) as zipfp:
1653            for fn in self.arcfiles.values():
1654                zipfp.write(fn, fn)
1655
1656    def read_test(self, f, compression):
1657        self.make_test_archive(f, compression)
1658
1659        # Read the ZIP archive
1660        with zipfile.ZipFile(f, "r") as zipfp:
1661            for sep, fn in self.arcfiles.items():
1662                with zipfp.open(fn, "rU") as fp:
1663                    zipdata = fp.read()
1664                self.assertEqual(self.arcdata[sep], zipdata)
1665
1666    def readline_read_test(self, f, compression):
1667        self.make_test_archive(f, compression)
1668
1669        # Read the ZIP archive
1670        zipfp = zipfile.ZipFile(f, "r")
1671        for sep, fn in self.arcfiles.items():
1672            with zipfp.open(fn, "rU") as zipopen:
1673                data = ''
1674                while True:
1675                    read = zipopen.readline()
1676                    if not read:
1677                        break
1678                    data += read
1679
1680                    read = zipopen.read(5)
1681                    if not read:
1682                        break
1683                    data += read
1684
1685            self.assertEqual(data, self.arcdata['\n'])
1686
1687        zipfp.close()
1688
1689    def readline_test(self, f, compression):
1690        self.make_test_archive(f, compression)
1691
1692        # Read the ZIP archive
1693        with zipfile.ZipFile(f, "r") as zipfp:
1694            for sep, fn in self.arcfiles.items():
1695                with zipfp.open(fn, "rU") as zipopen:
1696                    for line in self.line_gen:
1697                        linedata = zipopen.readline()
1698                        self.assertEqual(linedata, line + '\n')
1699
1700    def readlines_test(self, f, compression):
1701        self.make_test_archive(f, compression)
1702
1703        # Read the ZIP archive
1704        with zipfile.ZipFile(f, "r") as zipfp:
1705            for sep, fn in self.arcfiles.items():
1706                with zipfp.open(fn, "rU") as fp:
1707                    ziplines = fp.readlines()
1708                for line, zipline in zip(self.line_gen, ziplines):
1709                    self.assertEqual(zipline, line + '\n')
1710
1711    def iterlines_test(self, f, compression):
1712        self.make_test_archive(f, compression)
1713
1714        # Read the ZIP archive
1715        with zipfile.ZipFile(f, "r") as zipfp:
1716            for sep, fn in self.arcfiles.items():
1717                with zipfp.open(fn, "rU") as fid:
1718                    for line, zipline in zip(self.line_gen, fid):
1719                        self.assertEqual(zipline, line + '\n')
1720
1721    def test_read_stored(self):
1722        for f in (TESTFN2, TemporaryFile(), StringIO()):
1723            self.read_test(f, zipfile.ZIP_STORED)
1724
1725    def test_readline_read_stored(self):
1726        # Issue #7610: calls to readline() interleaved with calls to read().
1727        for f in (TESTFN2, TemporaryFile(), StringIO()):
1728            self.readline_read_test(f, zipfile.ZIP_STORED)
1729
1730    def test_readline_stored(self):
1731        for f in (TESTFN2, TemporaryFile(), StringIO()):
1732            self.readline_test(f, zipfile.ZIP_STORED)
1733
1734    def test_readlines_stored(self):
1735        for f in (TESTFN2, TemporaryFile(), StringIO()):
1736            self.readlines_test(f, zipfile.ZIP_STORED)
1737
1738    def test_iterlines_stored(self):
1739        for f in (TESTFN2, TemporaryFile(), StringIO()):
1740            self.iterlines_test(f, zipfile.ZIP_STORED)
1741
1742    @skipUnless(zlib, "requires zlib")
1743    def test_read_deflated(self):
1744        for f in (TESTFN2, TemporaryFile(), StringIO()):
1745            self.read_test(f, zipfile.ZIP_DEFLATED)
1746
1747    @skipUnless(zlib, "requires zlib")
1748    def test_readline_read_deflated(self):
1749        # Issue #7610: calls to readline() interleaved with calls to read().
1750        for f in (TESTFN2, TemporaryFile(), StringIO()):
1751            self.readline_read_test(f, zipfile.ZIP_DEFLATED)
1752
1753    @skipUnless(zlib, "requires zlib")
1754    def test_readline_deflated(self):
1755        for f in (TESTFN2, TemporaryFile(), StringIO()):
1756            self.readline_test(f, zipfile.ZIP_DEFLATED)
1757
1758    @skipUnless(zlib, "requires zlib")
1759    def test_readlines_deflated(self):
1760        for f in (TESTFN2, TemporaryFile(), StringIO()):
1761            self.readlines_test(f, zipfile.ZIP_DEFLATED)
1762
1763    @skipUnless(zlib, "requires zlib")
1764    def test_iterlines_deflated(self):
1765        for f in (TESTFN2, TemporaryFile(), StringIO()):
1766            self.iterlines_test(f, zipfile.ZIP_DEFLATED)
1767
1768    def tearDown(self):
1769        for sep, fn in self.arcfiles.items():
1770            os.remove(fn)
1771        unlink(TESTFN)
1772        unlink(TESTFN2)
1773
1774
1775class CommandLineTest(unittest.TestCase):
1776
1777    def zipfilecmd(self, *args, **kwargs):
1778        rc, out, err = script_helper.assert_python_ok('-m', 'zipfile', *args,
1779                                                      **kwargs)
1780        return out.replace(os.linesep.encode(), b'\n')
1781
1782    def zipfilecmd_failure(self, *args):
1783        return script_helper.assert_python_failure('-m', 'zipfile', *args)
1784
1785    def test_test_command(self):
1786        zip_name = findfile('zipdir.zip')
1787        out = self.zipfilecmd('-t', zip_name)
1788        self.assertEqual(out.rstrip(), b'Done testing')
1789        zip_name = findfile('testtar.tar')
1790        rc, out, err = self.zipfilecmd_failure('-t', zip_name)
1791        self.assertEqual(out, b'')
1792
1793    def test_list_command(self):
1794        zip_name = findfile('zipdir.zip')
1795        with captured_stdout() as t, zipfile.ZipFile(zip_name, 'r') as tf:
1796            tf.printdir()
1797        expected = t.getvalue().encode('ascii', 'backslashreplace')
1798        out = self.zipfilecmd('-l', zip_name,
1799                              PYTHONIOENCODING='ascii:backslashreplace')
1800        self.assertEqual(out, expected)
1801
1802    @skipUnless(zlib, "requires zlib")
1803    def test_create_command(self):
1804        self.addCleanup(unlink, TESTFN)
1805        with open(TESTFN, 'w') as f:
1806            f.write('test 1')
1807        os.mkdir(TESTFNDIR)
1808        self.addCleanup(rmtree, TESTFNDIR)
1809        with open(os.path.join(TESTFNDIR, 'file.txt'), 'w') as f:
1810            f.write('test 2')
1811        files = [TESTFN, TESTFNDIR]
1812        namelist = [TESTFN, TESTFNDIR + '/', TESTFNDIR + '/file.txt']
1813        try:
1814            out = self.zipfilecmd('-c', TESTFN2, *files)
1815            self.assertEqual(out, b'')
1816            with zipfile.ZipFile(TESTFN2) as zf:
1817                self.assertEqual(zf.namelist(), namelist)
1818                self.assertEqual(zf.read(namelist[0]), b'test 1')
1819                self.assertEqual(zf.read(namelist[2]), b'test 2')
1820        finally:
1821            unlink(TESTFN2)
1822
1823    def test_extract_command(self):
1824        zip_name = findfile('zipdir.zip')
1825        extdir = TESTFNDIR
1826        os.mkdir(extdir)
1827        try:
1828            out = self.zipfilecmd('-e', zip_name, extdir)
1829            self.assertEqual(out, b'')
1830            with zipfile.ZipFile(zip_name) as zf:
1831                for zi in zf.infolist():
1832                    path = os.path.join(extdir,
1833                                zi.filename.replace('/', os.sep))
1834                    if zi.filename.endswith('/'):
1835                        self.assertTrue(os.path.isdir(path))
1836                    else:
1837                        self.assertTrue(os.path.isfile(path))
1838                        with open(path, 'rb') as f:
1839                            self.assertEqual(f.read(), zf.read(zi))
1840        finally:
1841            rmtree(extdir)
1842
1843def test_main():
1844    run_unittest(TestsWithSourceFile, TestZip64InSmallFiles, OtherTests,
1845                 PyZipFileTests, DecryptionTests, TestsWithMultipleOpens,
1846                 TestWithDirectory, UniversalNewlineTests,
1847                 TestsWithRandomBinaryFiles, CommandLineTest)
1848
1849
1850if __name__ == "__main__":
1851    test_main()
1852