1# Tests of the full ZIP64 functionality of zipfile
2# The support.requires call is the only reason for keeping this separate
3# from test_zipfile
4from test import support
5
6# XXX(nnorwitz): disable this test by looking for extralargefile resource,
7# which doesn't exist.  This test takes over 30 minutes to run in general
8# and requires more disk space than most of the buildbots.
9support.requires(
10        'extralargefile',
11        'test requires loads of disk-space bytes and a long time to run'
12    )
13
14import zipfile, os, unittest
15import time
16import sys
17
18from io import StringIO
19from tempfile import TemporaryFile
20
21from test.support import TESTFN, requires_zlib
22
23TESTFN2 = TESTFN + "2"
24
25# How much time in seconds can pass before we print a 'Still working' message.
26_PRINT_WORKING_MSG_INTERVAL = 5 * 60
27
28class TestsWithSourceFile(unittest.TestCase):
29    def setUp(self):
30        # Create test data.
31        line_gen = ("Test of zipfile line %d." % i for i in range(1000000))
32        self.data = '\n'.join(line_gen).encode('ascii')
33
34        # And write it to a file.
35        fp = open(TESTFN, "wb")
36        fp.write(self.data)
37        fp.close()
38
39    def zipTest(self, f, compression):
40        # Create the ZIP archive.
41        zipfp = zipfile.ZipFile(f, "w", compression)
42
43        # It will contain enough copies of self.data to reach about 6GB of
44        # raw data to store.
45        filecount = 6*1024**3 // len(self.data)
46
47        next_time = time.time() + _PRINT_WORKING_MSG_INTERVAL
48        for num in range(filecount):
49            zipfp.writestr("testfn%d" % num, self.data)
50            # Print still working message since this test can be really slow
51            if next_time <= time.time():
52                next_time = time.time() + _PRINT_WORKING_MSG_INTERVAL
53                print((
54                   '  zipTest still writing %d of %d, be patient...' %
55                   (num, filecount)), file=sys.__stdout__)
56                sys.__stdout__.flush()
57        zipfp.close()
58
59        # Read the ZIP archive
60        zipfp = zipfile.ZipFile(f, "r", compression)
61        for num in range(filecount):
62            self.assertEqual(zipfp.read("testfn%d" % num), self.data)
63            # Print still working message since this test can be really slow
64            if next_time <= time.time():
65                next_time = time.time() + _PRINT_WORKING_MSG_INTERVAL
66                print((
67                   '  zipTest still reading %d of %d, be patient...' %
68                   (num, filecount)), file=sys.__stdout__)
69                sys.__stdout__.flush()
70        zipfp.close()
71
72    def testStored(self):
73        # Try the temp file first.  If we do TESTFN2 first, then it hogs
74        # gigabytes of disk space for the duration of the test.
75        with TemporaryFile() as f:
76            self.zipTest(f, zipfile.ZIP_STORED)
77            self.assertFalse(f.closed)
78        self.zipTest(TESTFN2, zipfile.ZIP_STORED)
79
80    @requires_zlib
81    def testDeflated(self):
82        # Try the temp file first.  If we do TESTFN2 first, then it hogs
83        # gigabytes of disk space for the duration of the test.
84        with TemporaryFile() as f:
85            self.zipTest(f, zipfile.ZIP_DEFLATED)
86            self.assertFalse(f.closed)
87        self.zipTest(TESTFN2, zipfile.ZIP_DEFLATED)
88
89    def tearDown(self):
90        for fname in TESTFN, TESTFN2:
91            if os.path.exists(fname):
92                os.remove(fname)
93
94
95class OtherTests(unittest.TestCase):
96    def testMoreThan64kFiles(self):
97        # This test checks that more than 64k files can be added to an archive,
98        # and that the resulting archive can be read properly by ZipFile
99        zipf = zipfile.ZipFile(TESTFN, mode="w", allowZip64=True)
100        zipf.debug = 100
101        numfiles = (1 << 16) * 3//2
102        for i in range(numfiles):
103            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
104        self.assertEqual(len(zipf.namelist()), numfiles)
105        zipf.close()
106
107        zipf2 = zipfile.ZipFile(TESTFN, mode="r")
108        self.assertEqual(len(zipf2.namelist()), numfiles)
109        for i in range(numfiles):
110            content = zipf2.read("foo%08d" % i).decode('ascii')
111            self.assertEqual(content, "%d" % (i**3 % 57))
112        zipf2.close()
113
114    def testMoreThan64kFilesAppend(self):
115        zipf = zipfile.ZipFile(TESTFN, mode="w", allowZip64=False)
116        zipf.debug = 100
117        numfiles = (1 << 16) - 1
118        for i in range(numfiles):
119            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
120        self.assertEqual(len(zipf.namelist()), numfiles)
121        with self.assertRaises(zipfile.LargeZipFile):
122            zipf.writestr("foo%08d" % numfiles, b'')
123        self.assertEqual(len(zipf.namelist()), numfiles)
124        zipf.close()
125
126        zipf = zipfile.ZipFile(TESTFN, mode="a", allowZip64=False)
127        zipf.debug = 100
128        self.assertEqual(len(zipf.namelist()), numfiles)
129        with self.assertRaises(zipfile.LargeZipFile):
130            zipf.writestr("foo%08d" % numfiles, b'')
131        self.assertEqual(len(zipf.namelist()), numfiles)
132        zipf.close()
133
134        zipf = zipfile.ZipFile(TESTFN, mode="a", allowZip64=True)
135        zipf.debug = 100
136        self.assertEqual(len(zipf.namelist()), numfiles)
137        numfiles2 = (1 << 16) * 3//2
138        for i in range(numfiles, numfiles2):
139            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
140        self.assertEqual(len(zipf.namelist()), numfiles2)
141        zipf.close()
142
143        zipf2 = zipfile.ZipFile(TESTFN, mode="r")
144        self.assertEqual(len(zipf2.namelist()), numfiles2)
145        for i in range(numfiles2):
146            content = zipf2.read("foo%08d" % i).decode('ascii')
147            self.assertEqual(content, "%d" % (i**3 % 57))
148        zipf2.close()
149
150    def tearDown(self):
151        support.unlink(TESTFN)
152        support.unlink(TESTFN2)
153
154if __name__ == "__main__":
155    unittest.main()
156