1"""Test script for the gzip module.
2"""
3
4import unittest
5from test import support
6from test.support import bigmemtest, _4G
7import os
8import pathlib
9import io
10import struct
11import array
12gzip = support.import_module('gzip')
13
14data1 = b"""  int length=DEFAULTALLOC, err = Z_OK;
15  PyObject *RetVal;
16  int flushmode = Z_FINISH;
17  unsigned long start_total_out;
18
19"""
20
21data2 = b"""/* zlibmodule.c -- gzip-compatible data compression */
22/* See http://www.gzip.org/zlib/
23/* See http://www.winimage.com/zLibDll for Windows */
24"""
25
26
27class UnseekableIO(io.BytesIO):
28    def seekable(self):
29        return False
30
31    def tell(self):
32        raise io.UnsupportedOperation
33
34    def seek(self, *args):
35        raise io.UnsupportedOperation
36
37
38class BaseTest(unittest.TestCase):
39    filename = support.TESTFN
40
41    def setUp(self):
42        support.unlink(self.filename)
43
44    def tearDown(self):
45        support.unlink(self.filename)
46
47
48class TestGzip(BaseTest):
49    def write_and_read_back(self, data, mode='b'):
50        b_data = bytes(data)
51        with gzip.GzipFile(self.filename, 'w'+mode) as f:
52            l = f.write(data)
53        self.assertEqual(l, len(b_data))
54        with gzip.GzipFile(self.filename, 'r'+mode) as f:
55            self.assertEqual(f.read(), b_data)
56
57    def test_write(self):
58        with gzip.GzipFile(self.filename, 'wb') as f:
59            f.write(data1 * 50)
60
61            # Try flush and fileno.
62            f.flush()
63            f.fileno()
64            if hasattr(os, 'fsync'):
65                os.fsync(f.fileno())
66            f.close()
67
68        # Test multiple close() calls.
69        f.close()
70
71    def test_write_read_with_pathlike_file(self):
72        filename = pathlib.Path(self.filename)
73        with gzip.GzipFile(filename, 'w') as f:
74            f.write(data1 * 50)
75        self.assertIsInstance(f.name, str)
76        with gzip.GzipFile(filename, 'a') as f:
77            f.write(data1)
78        with gzip.GzipFile(filename) as f:
79            d = f.read()
80        self.assertEqual(d, data1 * 51)
81        self.assertIsInstance(f.name, str)
82
83    # The following test_write_xy methods test that write accepts
84    # the corresponding bytes-like object type as input
85    # and that the data written equals bytes(xy) in all cases.
86    def test_write_memoryview(self):
87        self.write_and_read_back(memoryview(data1 * 50))
88        m = memoryview(bytes(range(256)))
89        data = m.cast('B', shape=[8,8,4])
90        self.write_and_read_back(data)
91
92    def test_write_bytearray(self):
93        self.write_and_read_back(bytearray(data1 * 50))
94
95    def test_write_array(self):
96        self.write_and_read_back(array.array('I', data1 * 40))
97
98    def test_write_incompatible_type(self):
99        # Test that non-bytes-like types raise TypeError.
100        # Issue #21560: attempts to write incompatible types
101        # should not affect the state of the fileobject
102        with gzip.GzipFile(self.filename, 'wb') as f:
103            with self.assertRaises(TypeError):
104                f.write('')
105            with self.assertRaises(TypeError):
106                f.write([])
107            f.write(data1)
108        with gzip.GzipFile(self.filename, 'rb') as f:
109            self.assertEqual(f.read(), data1)
110
111    def test_read(self):
112        self.test_write()
113        # Try reading.
114        with gzip.GzipFile(self.filename, 'r') as f:
115            d = f.read()
116        self.assertEqual(d, data1*50)
117
118    def test_read1(self):
119        self.test_write()
120        blocks = []
121        nread = 0
122        with gzip.GzipFile(self.filename, 'r') as f:
123            while True:
124                d = f.read1()
125                if not d:
126                    break
127                blocks.append(d)
128                nread += len(d)
129                # Check that position was updated correctly (see issue10791).
130                self.assertEqual(f.tell(), nread)
131        self.assertEqual(b''.join(blocks), data1 * 50)
132
133    @bigmemtest(size=_4G, memuse=1)
134    def test_read_large(self, size):
135        # Read chunk size over UINT_MAX should be supported, despite zlib's
136        # limitation per low-level call
137        compressed = gzip.compress(data1, compresslevel=1)
138        f = gzip.GzipFile(fileobj=io.BytesIO(compressed), mode='rb')
139        self.assertEqual(f.read(size), data1)
140
141    def test_io_on_closed_object(self):
142        # Test that I/O operations on closed GzipFile objects raise a
143        # ValueError, just like the corresponding functions on file objects.
144
145        # Write to a file, open it for reading, then close it.
146        self.test_write()
147        f = gzip.GzipFile(self.filename, 'r')
148        fileobj = f.fileobj
149        self.assertFalse(fileobj.closed)
150        f.close()
151        self.assertTrue(fileobj.closed)
152        with self.assertRaises(ValueError):
153            f.read(1)
154        with self.assertRaises(ValueError):
155            f.seek(0)
156        with self.assertRaises(ValueError):
157            f.tell()
158        # Open the file for writing, then close it.
159        f = gzip.GzipFile(self.filename, 'w')
160        fileobj = f.fileobj
161        self.assertFalse(fileobj.closed)
162        f.close()
163        self.assertTrue(fileobj.closed)
164        with self.assertRaises(ValueError):
165            f.write(b'')
166        with self.assertRaises(ValueError):
167            f.flush()
168
169    def test_append(self):
170        self.test_write()
171        # Append to the previous file
172        with gzip.GzipFile(self.filename, 'ab') as f:
173            f.write(data2 * 15)
174
175        with gzip.GzipFile(self.filename, 'rb') as f:
176            d = f.read()
177        self.assertEqual(d, (data1*50) + (data2*15))
178
179    def test_many_append(self):
180        # Bug #1074261 was triggered when reading a file that contained
181        # many, many members.  Create such a file and verify that reading it
182        # works.
183        with gzip.GzipFile(self.filename, 'wb', 9) as f:
184            f.write(b'a')
185        for i in range(0, 200):
186            with gzip.GzipFile(self.filename, "ab", 9) as f: # append
187                f.write(b'a')
188
189        # Try reading the file
190        with gzip.GzipFile(self.filename, "rb") as zgfile:
191            contents = b""
192            while 1:
193                ztxt = zgfile.read(8192)
194                contents += ztxt
195                if not ztxt: break
196        self.assertEqual(contents, b'a'*201)
197
198    def test_exclusive_write(self):
199        with gzip.GzipFile(self.filename, 'xb') as f:
200            f.write(data1 * 50)
201        with gzip.GzipFile(self.filename, 'rb') as f:
202            self.assertEqual(f.read(), data1 * 50)
203        with self.assertRaises(FileExistsError):
204            gzip.GzipFile(self.filename, 'xb')
205
206    def test_buffered_reader(self):
207        # Issue #7471: a GzipFile can be wrapped in a BufferedReader for
208        # performance.
209        self.test_write()
210
211        with gzip.GzipFile(self.filename, 'rb') as f:
212            with io.BufferedReader(f) as r:
213                lines = [line for line in r]
214
215        self.assertEqual(lines, 50 * data1.splitlines(keepends=True))
216
217    def test_readline(self):
218        self.test_write()
219        # Try .readline() with varying line lengths
220
221        with gzip.GzipFile(self.filename, 'rb') as f:
222            line_length = 0
223            while 1:
224                L = f.readline(line_length)
225                if not L and line_length != 0: break
226                self.assertTrue(len(L) <= line_length)
227                line_length = (line_length + 1) % 50
228
229    def test_readlines(self):
230        self.test_write()
231        # Try .readlines()
232
233        with gzip.GzipFile(self.filename, 'rb') as f:
234            L = f.readlines()
235
236        with gzip.GzipFile(self.filename, 'rb') as f:
237            while 1:
238                L = f.readlines(150)
239                if L == []: break
240
241    def test_seek_read(self):
242        self.test_write()
243        # Try seek, read test
244
245        with gzip.GzipFile(self.filename) as f:
246            while 1:
247                oldpos = f.tell()
248                line1 = f.readline()
249                if not line1: break
250                newpos = f.tell()
251                f.seek(oldpos)  # negative seek
252                if len(line1)>10:
253                    amount = 10
254                else:
255                    amount = len(line1)
256                line2 = f.read(amount)
257                self.assertEqual(line1[:amount], line2)
258                f.seek(newpos)  # positive seek
259
260    def test_seek_whence(self):
261        self.test_write()
262        # Try seek(whence=1), read test
263
264        with gzip.GzipFile(self.filename) as f:
265            f.read(10)
266            f.seek(10, whence=1)
267            y = f.read(10)
268        self.assertEqual(y, data1[20:30])
269
270    def test_seek_write(self):
271        # Try seek, write test
272        with gzip.GzipFile(self.filename, 'w') as f:
273            for pos in range(0, 256, 16):
274                f.seek(pos)
275                f.write(b'GZ\n')
276
277    def test_mode(self):
278        self.test_write()
279        with gzip.GzipFile(self.filename, 'r') as f:
280            self.assertEqual(f.myfileobj.mode, 'rb')
281        support.unlink(self.filename)
282        with gzip.GzipFile(self.filename, 'x') as f:
283            self.assertEqual(f.myfileobj.mode, 'xb')
284
285    def test_1647484(self):
286        for mode in ('wb', 'rb'):
287            with gzip.GzipFile(self.filename, mode) as f:
288                self.assertTrue(hasattr(f, "name"))
289                self.assertEqual(f.name, self.filename)
290
291    def test_paddedfile_getattr(self):
292        self.test_write()
293        with gzip.GzipFile(self.filename, 'rb') as f:
294            self.assertTrue(hasattr(f.fileobj, "name"))
295            self.assertEqual(f.fileobj.name, self.filename)
296
297    def test_mtime(self):
298        mtime = 123456789
299        with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
300            fWrite.write(data1)
301        with gzip.GzipFile(self.filename) as fRead:
302            self.assertTrue(hasattr(fRead, 'mtime'))
303            self.assertIsNone(fRead.mtime)
304            dataRead = fRead.read()
305            self.assertEqual(dataRead, data1)
306            self.assertEqual(fRead.mtime, mtime)
307
308    def test_metadata(self):
309        mtime = 123456789
310
311        with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
312            fWrite.write(data1)
313
314        with open(self.filename, 'rb') as fRead:
315            # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
316
317            idBytes = fRead.read(2)
318            self.assertEqual(idBytes, b'\x1f\x8b') # gzip ID
319
320            cmByte = fRead.read(1)
321            self.assertEqual(cmByte, b'\x08') # deflate
322
323            flagsByte = fRead.read(1)
324            self.assertEqual(flagsByte, b'\x08') # only the FNAME flag is set
325
326            mtimeBytes = fRead.read(4)
327            self.assertEqual(mtimeBytes, struct.pack('<i', mtime)) # little-endian
328
329            xflByte = fRead.read(1)
330            self.assertEqual(xflByte, b'\x02') # maximum compression
331
332            osByte = fRead.read(1)
333            self.assertEqual(osByte, b'\xff') # OS "unknown" (OS-independent)
334
335            # Since the FNAME flag is set, the zero-terminated filename follows.
336            # RFC 1952 specifies that this is the name of the input file, if any.
337            # However, the gzip module defaults to storing the name of the output
338            # file in this field.
339            expected = self.filename.encode('Latin-1') + b'\x00'
340            nameBytes = fRead.read(len(expected))
341            self.assertEqual(nameBytes, expected)
342
343            # Since no other flags were set, the header ends here.
344            # Rather than process the compressed data, let's seek to the trailer.
345            fRead.seek(os.stat(self.filename).st_size - 8)
346
347            crc32Bytes = fRead.read(4) # CRC32 of uncompressed data [data1]
348            self.assertEqual(crc32Bytes, b'\xaf\xd7d\x83')
349
350            isizeBytes = fRead.read(4)
351            self.assertEqual(isizeBytes, struct.pack('<i', len(data1)))
352
353    def test_with_open(self):
354        # GzipFile supports the context management protocol
355        with gzip.GzipFile(self.filename, "wb") as f:
356            f.write(b"xxx")
357        f = gzip.GzipFile(self.filename, "rb")
358        f.close()
359        try:
360            with f:
361                pass
362        except ValueError:
363            pass
364        else:
365            self.fail("__enter__ on a closed file didn't raise an exception")
366        try:
367            with gzip.GzipFile(self.filename, "wb") as f:
368                1/0
369        except ZeroDivisionError:
370            pass
371        else:
372            self.fail("1/0 didn't raise an exception")
373
374    def test_zero_padded_file(self):
375        with gzip.GzipFile(self.filename, "wb") as f:
376            f.write(data1 * 50)
377
378        # Pad the file with zeroes
379        with open(self.filename, "ab") as f:
380            f.write(b"\x00" * 50)
381
382        with gzip.GzipFile(self.filename, "rb") as f:
383            d = f.read()
384            self.assertEqual(d, data1 * 50, "Incorrect data in file")
385
386    def test_non_seekable_file(self):
387        uncompressed = data1 * 50
388        buf = UnseekableIO()
389        with gzip.GzipFile(fileobj=buf, mode="wb") as f:
390            f.write(uncompressed)
391        compressed = buf.getvalue()
392        buf = UnseekableIO(compressed)
393        with gzip.GzipFile(fileobj=buf, mode="rb") as f:
394            self.assertEqual(f.read(), uncompressed)
395
396    def test_peek(self):
397        uncompressed = data1 * 200
398        with gzip.GzipFile(self.filename, "wb") as f:
399            f.write(uncompressed)
400
401        def sizes():
402            while True:
403                for n in range(5, 50, 10):
404                    yield n
405
406        with gzip.GzipFile(self.filename, "rb") as f:
407            f.max_read_chunk = 33
408            nread = 0
409            for n in sizes():
410                s = f.peek(n)
411                if s == b'':
412                    break
413                self.assertEqual(f.read(len(s)), s)
414                nread += len(s)
415            self.assertEqual(f.read(100), b'')
416            self.assertEqual(nread, len(uncompressed))
417
418    def test_textio_readlines(self):
419        # Issue #10791: TextIOWrapper.readlines() fails when wrapping GzipFile.
420        lines = (data1 * 50).decode("ascii").splitlines(keepends=True)
421        self.test_write()
422        with gzip.GzipFile(self.filename, 'r') as f:
423            with io.TextIOWrapper(f, encoding="ascii") as t:
424                self.assertEqual(t.readlines(), lines)
425
426    def test_fileobj_from_fdopen(self):
427        # Issue #13781: Opening a GzipFile for writing fails when using a
428        # fileobj created with os.fdopen().
429        fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT)
430        with os.fdopen(fd, "wb") as f:
431            with gzip.GzipFile(fileobj=f, mode="w") as g:
432                pass
433
434    def test_bytes_filename(self):
435        str_filename = self.filename
436        try:
437            bytes_filename = str_filename.encode("ascii")
438        except UnicodeEncodeError:
439            self.skipTest("Temporary file name needs to be ASCII")
440        with gzip.GzipFile(bytes_filename, "wb") as f:
441            f.write(data1 * 50)
442        with gzip.GzipFile(bytes_filename, "rb") as f:
443            self.assertEqual(f.read(), data1 * 50)
444        # Sanity check that we are actually operating on the right file.
445        with gzip.GzipFile(str_filename, "rb") as f:
446            self.assertEqual(f.read(), data1 * 50)
447
448    def test_decompress_limited(self):
449        """Decompressed data buffering should be limited"""
450        bomb = gzip.compress(b'\0' * int(2e6), compresslevel=9)
451        self.assertLess(len(bomb), io.DEFAULT_BUFFER_SIZE)
452
453        bomb = io.BytesIO(bomb)
454        decomp = gzip.GzipFile(fileobj=bomb)
455        self.assertEqual(decomp.read(1), b'\0')
456        max_decomp = 1 + io.DEFAULT_BUFFER_SIZE
457        self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp,
458            "Excessive amount of data was decompressed")
459
460    # Testing compress/decompress shortcut functions
461
462    def test_compress(self):
463        for data in [data1, data2]:
464            for args in [(), (1,), (6,), (9,)]:
465                datac = gzip.compress(data, *args)
466                self.assertEqual(type(datac), bytes)
467                with gzip.GzipFile(fileobj=io.BytesIO(datac), mode="rb") as f:
468                    self.assertEqual(f.read(), data)
469
470    def test_decompress(self):
471        for data in (data1, data2):
472            buf = io.BytesIO()
473            with gzip.GzipFile(fileobj=buf, mode="wb") as f:
474                f.write(data)
475            self.assertEqual(gzip.decompress(buf.getvalue()), data)
476            # Roundtrip with compress
477            datac = gzip.compress(data)
478            self.assertEqual(gzip.decompress(datac), data)
479
480    def test_read_truncated(self):
481        data = data1*50
482        # Drop the CRC (4 bytes) and file size (4 bytes).
483        truncated = gzip.compress(data)[:-8]
484        with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f:
485            self.assertRaises(EOFError, f.read)
486        with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f:
487            self.assertEqual(f.read(len(data)), data)
488            self.assertRaises(EOFError, f.read, 1)
489        # Incomplete 10-byte header.
490        for i in range(2, 10):
491            with gzip.GzipFile(fileobj=io.BytesIO(truncated[:i])) as f:
492                self.assertRaises(EOFError, f.read, 1)
493
494    def test_read_with_extra(self):
495        # Gzip data with an extra field
496        gzdata = (b'\x1f\x8b\x08\x04\xb2\x17cQ\x02\xff'
497                  b'\x05\x00Extra'
498                  b'\x0bI-.\x01\x002\xd1Mx\x04\x00\x00\x00')
499        with gzip.GzipFile(fileobj=io.BytesIO(gzdata)) as f:
500            self.assertEqual(f.read(), b'Test')
501
502    def test_prepend_error(self):
503        # See issue #20875
504        with gzip.open(self.filename, "wb") as f:
505            f.write(data1)
506        with gzip.open(self.filename, "rb") as f:
507            f._buffer.raw._fp.prepend()
508
509class TestOpen(BaseTest):
510    def test_binary_modes(self):
511        uncompressed = data1 * 50
512
513        with gzip.open(self.filename, "wb") as f:
514            f.write(uncompressed)
515        with open(self.filename, "rb") as f:
516            file_data = gzip.decompress(f.read())
517            self.assertEqual(file_data, uncompressed)
518
519        with gzip.open(self.filename, "rb") as f:
520            self.assertEqual(f.read(), uncompressed)
521
522        with gzip.open(self.filename, "ab") as f:
523            f.write(uncompressed)
524        with open(self.filename, "rb") as f:
525            file_data = gzip.decompress(f.read())
526            self.assertEqual(file_data, uncompressed * 2)
527
528        with self.assertRaises(FileExistsError):
529            gzip.open(self.filename, "xb")
530        support.unlink(self.filename)
531        with gzip.open(self.filename, "xb") as f:
532            f.write(uncompressed)
533        with open(self.filename, "rb") as f:
534            file_data = gzip.decompress(f.read())
535            self.assertEqual(file_data, uncompressed)
536
537    def test_pathlike_file(self):
538        filename = pathlib.Path(self.filename)
539        with gzip.open(filename, "wb") as f:
540            f.write(data1 * 50)
541        with gzip.open(filename, "ab") as f:
542            f.write(data1)
543        with gzip.open(filename) as f:
544            self.assertEqual(f.read(), data1 * 51)
545
546    def test_implicit_binary_modes(self):
547        # Test implicit binary modes (no "b" or "t" in mode string).
548        uncompressed = data1 * 50
549
550        with gzip.open(self.filename, "w") as f:
551            f.write(uncompressed)
552        with open(self.filename, "rb") as f:
553            file_data = gzip.decompress(f.read())
554            self.assertEqual(file_data, uncompressed)
555
556        with gzip.open(self.filename, "r") as f:
557            self.assertEqual(f.read(), uncompressed)
558
559        with gzip.open(self.filename, "a") as f:
560            f.write(uncompressed)
561        with open(self.filename, "rb") as f:
562            file_data = gzip.decompress(f.read())
563            self.assertEqual(file_data, uncompressed * 2)
564
565        with self.assertRaises(FileExistsError):
566            gzip.open(self.filename, "x")
567        support.unlink(self.filename)
568        with gzip.open(self.filename, "x") as f:
569            f.write(uncompressed)
570        with open(self.filename, "rb") as f:
571            file_data = gzip.decompress(f.read())
572            self.assertEqual(file_data, uncompressed)
573
574    def test_text_modes(self):
575        uncompressed = data1.decode("ascii") * 50
576        uncompressed_raw = uncompressed.replace("\n", os.linesep)
577        with gzip.open(self.filename, "wt") as f:
578            f.write(uncompressed)
579        with open(self.filename, "rb") as f:
580            file_data = gzip.decompress(f.read()).decode("ascii")
581            self.assertEqual(file_data, uncompressed_raw)
582        with gzip.open(self.filename, "rt") as f:
583            self.assertEqual(f.read(), uncompressed)
584        with gzip.open(self.filename, "at") as f:
585            f.write(uncompressed)
586        with open(self.filename, "rb") as f:
587            file_data = gzip.decompress(f.read()).decode("ascii")
588            self.assertEqual(file_data, uncompressed_raw * 2)
589
590    def test_fileobj(self):
591        uncompressed_bytes = data1 * 50
592        uncompressed_str = uncompressed_bytes.decode("ascii")
593        compressed = gzip.compress(uncompressed_bytes)
594        with gzip.open(io.BytesIO(compressed), "r") as f:
595            self.assertEqual(f.read(), uncompressed_bytes)
596        with gzip.open(io.BytesIO(compressed), "rb") as f:
597            self.assertEqual(f.read(), uncompressed_bytes)
598        with gzip.open(io.BytesIO(compressed), "rt") as f:
599            self.assertEqual(f.read(), uncompressed_str)
600
601    def test_bad_params(self):
602        # Test invalid parameter combinations.
603        with self.assertRaises(TypeError):
604            gzip.open(123.456)
605        with self.assertRaises(ValueError):
606            gzip.open(self.filename, "wbt")
607        with self.assertRaises(ValueError):
608            gzip.open(self.filename, "xbt")
609        with self.assertRaises(ValueError):
610            gzip.open(self.filename, "rb", encoding="utf-8")
611        with self.assertRaises(ValueError):
612            gzip.open(self.filename, "rb", errors="ignore")
613        with self.assertRaises(ValueError):
614            gzip.open(self.filename, "rb", newline="\n")
615
616    def test_encoding(self):
617        # Test non-default encoding.
618        uncompressed = data1.decode("ascii") * 50
619        uncompressed_raw = uncompressed.replace("\n", os.linesep)
620        with gzip.open(self.filename, "wt", encoding="utf-16") as f:
621            f.write(uncompressed)
622        with open(self.filename, "rb") as f:
623            file_data = gzip.decompress(f.read()).decode("utf-16")
624            self.assertEqual(file_data, uncompressed_raw)
625        with gzip.open(self.filename, "rt", encoding="utf-16") as f:
626            self.assertEqual(f.read(), uncompressed)
627
628    def test_encoding_error_handler(self):
629        # Test with non-default encoding error handler.
630        with gzip.open(self.filename, "wb") as f:
631            f.write(b"foo\xffbar")
632        with gzip.open(self.filename, "rt", encoding="ascii", errors="ignore") \
633                as f:
634            self.assertEqual(f.read(), "foobar")
635
636    def test_newline(self):
637        # Test with explicit newline (universal newline mode disabled).
638        uncompressed = data1.decode("ascii") * 50
639        with gzip.open(self.filename, "wt", newline="\n") as f:
640            f.write(uncompressed)
641        with gzip.open(self.filename, "rt", newline="\r") as f:
642            self.assertEqual(f.readlines(), [uncompressed])
643
644def test_main(verbose=None):
645    support.run_unittest(TestGzip, TestOpen)
646
647if __name__ == "__main__":
648    test_main(verbose=True)
649