1import unittest
2from test.test_support import TESTFN, run_unittest, import_module, unlink, requires
3import binascii
4import random
5from test.test_support import precisionbigmemtest, _1G, _4G
6import sys
7
8try:
9    import mmap
10except ImportError:
11    mmap = None
12
13zlib = import_module('zlib')
14
15
16class ChecksumTestCase(unittest.TestCase):
17    # checksum test cases
18    def test_crc32start(self):
19        self.assertEqual(zlib.crc32(""), zlib.crc32("", 0))
20        self.assertTrue(zlib.crc32("abc", 0xffffffff))
21
22    def test_crc32empty(self):
23        self.assertEqual(zlib.crc32("", 0), 0)
24        self.assertEqual(zlib.crc32("", 1), 1)
25        self.assertEqual(zlib.crc32("", 432), 432)
26
27    def test_adler32start(self):
28        self.assertEqual(zlib.adler32(""), zlib.adler32("", 1))
29        self.assertTrue(zlib.adler32("abc", 0xffffffff))
30
31    def test_adler32empty(self):
32        self.assertEqual(zlib.adler32("", 0), 0)
33        self.assertEqual(zlib.adler32("", 1), 1)
34        self.assertEqual(zlib.adler32("", 432), 432)
35
36    def assertEqual32(self, seen, expected):
37        # 32-bit values masked -- checksums on 32- vs 64- bit machines
38        # This is important if bit 31 (0x08000000L) is set.
39        self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL)
40
41    def test_penguins(self):
42        self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L)
43        self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94)
44        self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6)
45        self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7)
46
47        self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0))
48        self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1))
49
50    def test_abcdefghijklmnop(self):
51        """test issue1202 compliance: signed crc32, adler32 in 2.x"""
52        foo = 'abcdefghijklmnop'
53        # explicitly test signed behavior
54        self.assertEqual(zlib.crc32(foo), -1808088941)
55        self.assertEqual(zlib.crc32('spam'), 1138425661)
56        self.assertEqual(zlib.adler32(foo+foo), -721416943)
57        self.assertEqual(zlib.adler32('spam'), 72286642)
58
59    def test_same_as_binascii_crc32(self):
60        foo = 'abcdefghijklmnop'
61        self.assertEqual(binascii.crc32(foo), zlib.crc32(foo))
62        self.assertEqual(binascii.crc32('spam'), zlib.crc32('spam'))
63
64    def test_negative_crc_iv_input(self):
65        # The range of valid input values for the crc state should be
66        # -2**31 through 2**32-1 to allow inputs artifically constrained
67        # to a signed 32-bit integer.
68        self.assertEqual(zlib.crc32('ham', -1), zlib.crc32('ham', 0xffffffffL))
69        self.assertEqual(zlib.crc32('spam', -3141593),
70                         zlib.crc32('spam',  0xffd01027L))
71        self.assertEqual(zlib.crc32('spam', -(2**31)),
72                         zlib.crc32('spam',  (2**31)))
73
74
75class ExceptionTestCase(unittest.TestCase):
76    # make sure we generate some expected errors
77    def test_badlevel(self):
78        # specifying compression level out of range causes an error
79        # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
80        # accepts 0 too)
81        self.assertRaises(zlib.error, zlib.compress, 'ERROR', 10)
82
83    def test_badcompressobj(self):
84        # verify failure on building compress object with bad params
85        self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
86        # specifying total bits too large causes an error
87        self.assertRaises(ValueError,
88                zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
89
90    def test_baddecompressobj(self):
91        # verify failure on building decompress object with bad params
92        self.assertRaises(ValueError, zlib.decompressobj, -1)
93
94    def test_decompressobj_badflush(self):
95        # verify failure on calling decompressobj.flush with bad params
96        self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
97        self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
98
99
100class BaseCompressTestCase(object):
101    def check_big_compress_buffer(self, size, compress_func):
102        _1M = 1024 * 1024
103        fmt = "%%0%dx" % (2 * _1M)
104        # Generate 10MB worth of random, and expand it by repeating it.
105        # The assumption is that zlib's memory is not big enough to exploit
106        # such spread out redundancy.
107        data = ''.join([binascii.a2b_hex(fmt % random.getrandbits(8 * _1M))
108                        for i in range(10)])
109        data = data * (size // len(data) + 1)
110        try:
111            compress_func(data)
112        finally:
113            # Release memory
114            data = None
115
116    def check_big_decompress_buffer(self, size, decompress_func):
117        data = 'x' * size
118        try:
119            compressed = zlib.compress(data, 1)
120        finally:
121            # Release memory
122            data = None
123        data = decompress_func(compressed)
124        # Sanity check
125        try:
126            self.assertEqual(len(data), size)
127            self.assertEqual(len(data.strip('x')), 0)
128        finally:
129            data = None
130
131
132class CompressTestCase(BaseCompressTestCase, unittest.TestCase):
133    # Test compression in one go (whole message compression)
134    def test_speech(self):
135        x = zlib.compress(HAMLET_SCENE)
136        self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
137
138    def test_speech128(self):
139        # compress more data
140        data = HAMLET_SCENE * 128
141        x = zlib.compress(data)
142        self.assertEqual(zlib.decompress(x), data)
143
144    def test_incomplete_stream(self):
145        # An useful error message is given
146        x = zlib.compress(HAMLET_SCENE)
147        self.assertRaisesRegexp(zlib.error,
148            "Error -5 while decompressing data: incomplete or truncated stream",
149            zlib.decompress, x[:-1])
150
151    # Memory use of the following functions takes into account overallocation
152
153    @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
154    def test_big_compress_buffer(self, size):
155        compress = lambda s: zlib.compress(s, 1)
156        self.check_big_compress_buffer(size, compress)
157
158    @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
159    def test_big_decompress_buffer(self, size):
160        self.check_big_decompress_buffer(size, zlib.decompress)
161
162
163class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
164    # Test compression object
165    def test_pair(self):
166        # straightforward compress/decompress objects
167        data = HAMLET_SCENE * 128
168        co = zlib.compressobj()
169        x1 = co.compress(data)
170        x2 = co.flush()
171        self.assertRaises(zlib.error, co.flush) # second flush should not work
172        dco = zlib.decompressobj()
173        y1 = dco.decompress(x1 + x2)
174        y2 = dco.flush()
175        self.assertEqual(data, y1 + y2)
176
177    def test_compressoptions(self):
178        # specify lots of options to compressobj()
179        level = 2
180        method = zlib.DEFLATED
181        wbits = -12
182        memlevel = 9
183        strategy = zlib.Z_FILTERED
184        co = zlib.compressobj(level, method, wbits, memlevel, strategy)
185        x1 = co.compress(HAMLET_SCENE)
186        x2 = co.flush()
187        dco = zlib.decompressobj(wbits)
188        y1 = dco.decompress(x1 + x2)
189        y2 = dco.flush()
190        self.assertEqual(HAMLET_SCENE, y1 + y2)
191
192    def test_compressincremental(self):
193        # compress object in steps, decompress object as one-shot
194        data = HAMLET_SCENE * 128
195        co = zlib.compressobj()
196        bufs = []
197        for i in range(0, len(data), 256):
198            bufs.append(co.compress(data[i:i+256]))
199        bufs.append(co.flush())
200        combuf = ''.join(bufs)
201
202        dco = zlib.decompressobj()
203        y1 = dco.decompress(''.join(bufs))
204        y2 = dco.flush()
205        self.assertEqual(data, y1 + y2)
206
207    def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
208        # compress object in steps, decompress object in steps
209        source = source or HAMLET_SCENE
210        data = source * 128
211        co = zlib.compressobj()
212        bufs = []
213        for i in range(0, len(data), cx):
214            bufs.append(co.compress(data[i:i+cx]))
215        bufs.append(co.flush())
216        combuf = ''.join(bufs)
217
218        self.assertEqual(data, zlib.decompress(combuf))
219
220        dco = zlib.decompressobj()
221        bufs = []
222        for i in range(0, len(combuf), dcx):
223            bufs.append(dco.decompress(combuf[i:i+dcx]))
224            self.assertEqual('', dco.unconsumed_tail, ########
225                             "(A) uct should be '': not %d long" %
226                                       len(dco.unconsumed_tail))
227        if flush:
228            bufs.append(dco.flush())
229        else:
230            while True:
231                chunk = dco.decompress('')
232                if chunk:
233                    bufs.append(chunk)
234                else:
235                    break
236        self.assertEqual('', dco.unconsumed_tail, ########
237                         "(B) uct should be '': not %d long" %
238                                       len(dco.unconsumed_tail))
239        self.assertEqual(data, ''.join(bufs))
240        # Failure means: "decompressobj with init options failed"
241
242    def test_decompincflush(self):
243        self.test_decompinc(flush=True)
244
245    def test_decompimax(self, source=None, cx=256, dcx=64):
246        # compress in steps, decompress in length-restricted steps
247        source = source or HAMLET_SCENE
248        # Check a decompression object with max_length specified
249        data = source * 128
250        co = zlib.compressobj()
251        bufs = []
252        for i in range(0, len(data), cx):
253            bufs.append(co.compress(data[i:i+cx]))
254        bufs.append(co.flush())
255        combuf = ''.join(bufs)
256        self.assertEqual(data, zlib.decompress(combuf),
257                         'compressed data failure')
258
259        dco = zlib.decompressobj()
260        bufs = []
261        cb = combuf
262        while cb:
263            #max_length = 1 + len(cb)//10
264            chunk = dco.decompress(cb, dcx)
265            self.assertFalse(len(chunk) > dcx,
266                    'chunk too big (%d>%d)' % (len(chunk), dcx))
267            bufs.append(chunk)
268            cb = dco.unconsumed_tail
269        bufs.append(dco.flush())
270        self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
271
272    def test_decompressmaxlen(self, flush=False):
273        # Check a decompression object with max_length specified
274        data = HAMLET_SCENE * 128
275        co = zlib.compressobj()
276        bufs = []
277        for i in range(0, len(data), 256):
278            bufs.append(co.compress(data[i:i+256]))
279        bufs.append(co.flush())
280        combuf = ''.join(bufs)
281        self.assertEqual(data, zlib.decompress(combuf),
282                         'compressed data failure')
283
284        dco = zlib.decompressobj()
285        bufs = []
286        cb = combuf
287        while cb:
288            max_length = 1 + len(cb)//10
289            chunk = dco.decompress(cb, max_length)
290            self.assertFalse(len(chunk) > max_length,
291                        'chunk too big (%d>%d)' % (len(chunk),max_length))
292            bufs.append(chunk)
293            cb = dco.unconsumed_tail
294        if flush:
295            bufs.append(dco.flush())
296        else:
297            while chunk:
298                chunk = dco.decompress('', max_length)
299                self.assertFalse(len(chunk) > max_length,
300                            'chunk too big (%d>%d)' % (len(chunk),max_length))
301                bufs.append(chunk)
302        self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
303
304    def test_decompressmaxlenflush(self):
305        self.test_decompressmaxlen(flush=True)
306
307    def test_maxlenmisc(self):
308        # Misc tests of max_length
309        dco = zlib.decompressobj()
310        self.assertRaises(ValueError, dco.decompress, "", -1)
311        self.assertEqual('', dco.unconsumed_tail)
312
313    def test_clear_unconsumed_tail(self):
314        # Issue #12050: calling decompress() without providing max_length
315        # should clear the unconsumed_tail attribute.
316        cdata = "x\x9cKLJ\x06\x00\x02M\x01"     # "abc"
317        dco = zlib.decompressobj()
318        ddata = dco.decompress(cdata, 1)
319        ddata += dco.decompress(dco.unconsumed_tail)
320        self.assertEqual(dco.unconsumed_tail, "")
321
322    def test_flushes(self):
323        # Test flush() with the various options, using all the
324        # different levels in order to provide more variations.
325        sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH']
326        sync_opt = [getattr(zlib, opt) for opt in sync_opt
327                    if hasattr(zlib, opt)]
328        data = HAMLET_SCENE * 8
329
330        for sync in sync_opt:
331            for level in range(10):
332                obj = zlib.compressobj( level )
333                a = obj.compress( data[:3000] )
334                b = obj.flush( sync )
335                c = obj.compress( data[3000:] )
336                d = obj.flush()
337                self.assertEqual(zlib.decompress(''.join([a,b,c,d])),
338                                 data, ("Decompress failed: flush "
339                                        "mode=%i, level=%i") % (sync, level))
340                del obj
341
342    def test_odd_flush(self):
343        # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
344        import random
345
346        if hasattr(zlib, 'Z_SYNC_FLUSH'):
347            # Testing on 17K of "random" data
348
349            # Create compressor and decompressor objects
350            co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
351            dco = zlib.decompressobj()
352
353            # Try 17K of data
354            # generate random data stream
355            try:
356                # In 2.3 and later, WichmannHill is the RNG of the bug report
357                gen = random.WichmannHill()
358            except AttributeError:
359                try:
360                    # 2.2 called it Random
361                    gen = random.Random()
362                except AttributeError:
363                    # others might simply have a single RNG
364                    gen = random
365            gen.seed(1)
366            data = genblock(1, 17 * 1024, generator=gen)
367
368            # compress, sync-flush, and decompress
369            first = co.compress(data)
370            second = co.flush(zlib.Z_SYNC_FLUSH)
371            expanded = dco.decompress(first + second)
372
373            # if decompressed data is different from the input data, choke.
374            self.assertEqual(expanded, data, "17K random source doesn't match")
375
376    def test_empty_flush(self):
377        # Test that calling .flush() on unused objects works.
378        # (Bug #1083110 -- calling .flush() on decompress objects
379        # caused a core dump.)
380
381        co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
382        self.assertTrue(co.flush())  # Returns a zlib header
383        dco = zlib.decompressobj()
384        self.assertEqual(dco.flush(), "") # Returns nothing
385
386    def test_decompress_incomplete_stream(self):
387        # This is 'foo', deflated
388        x = 'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
389        # For the record
390        self.assertEqual(zlib.decompress(x), 'foo')
391        self.assertRaises(zlib.error, zlib.decompress, x[:-5])
392        # Omitting the stream end works with decompressor objects
393        # (see issue #8672).
394        dco = zlib.decompressobj()
395        y = dco.decompress(x[:-5])
396        y += dco.flush()
397        self.assertEqual(y, 'foo')
398
399    def test_flush_with_freed_input(self):
400        # Issue #16411: decompressor accesses input to last decompress() call
401        # in flush(), even if this object has been freed in the meanwhile.
402        input1 = 'abcdefghijklmnopqrstuvwxyz'
403        input2 = 'QWERTYUIOPASDFGHJKLZXCVBNM'
404        data = zlib.compress(input1)
405        dco = zlib.decompressobj()
406        dco.decompress(data, 1)
407        del data
408        data = zlib.compress(input2)
409        self.assertEqual(dco.flush(), input1[1:])
410
411    if hasattr(zlib.compressobj(), "copy"):
412        def test_compresscopy(self):
413            # Test copying a compression object
414            data0 = HAMLET_SCENE
415            data1 = HAMLET_SCENE.swapcase()
416            c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
417            bufs0 = []
418            bufs0.append(c0.compress(data0))
419
420            c1 = c0.copy()
421            bufs1 = bufs0[:]
422
423            bufs0.append(c0.compress(data0))
424            bufs0.append(c0.flush())
425            s0 = ''.join(bufs0)
426
427            bufs1.append(c1.compress(data1))
428            bufs1.append(c1.flush())
429            s1 = ''.join(bufs1)
430
431            self.assertEqual(zlib.decompress(s0),data0+data0)
432            self.assertEqual(zlib.decompress(s1),data0+data1)
433
434        def test_badcompresscopy(self):
435            # Test copying a compression object in an inconsistent state
436            c = zlib.compressobj()
437            c.compress(HAMLET_SCENE)
438            c.flush()
439            self.assertRaises(ValueError, c.copy)
440
441    def test_decompress_unused_data(self):
442        # Repeated calls to decompress() after EOF should accumulate data in
443        # dco.unused_data, instead of just storing the arg to the last call.
444        source = b'abcdefghijklmnopqrstuvwxyz'
445        remainder = b'0123456789'
446        y = zlib.compress(source)
447        x = y + remainder
448        for maxlen in 0, 1000:
449            for step in 1, 2, len(y), len(x):
450                dco = zlib.decompressobj()
451                data = b''
452                for i in range(0, len(x), step):
453                    if i < len(y):
454                        self.assertEqual(dco.unused_data, b'')
455                    if maxlen == 0:
456                        data += dco.decompress(x[i : i + step])
457                        self.assertEqual(dco.unconsumed_tail, b'')
458                    else:
459                        data += dco.decompress(
460                                dco.unconsumed_tail + x[i : i + step], maxlen)
461                data += dco.flush()
462                self.assertEqual(data, source)
463                self.assertEqual(dco.unconsumed_tail, b'')
464                self.assertEqual(dco.unused_data, remainder)
465
466    if hasattr(zlib.decompressobj(), "copy"):
467        def test_decompresscopy(self):
468            # Test copying a decompression object
469            data = HAMLET_SCENE
470            comp = zlib.compress(data)
471
472            d0 = zlib.decompressobj()
473            bufs0 = []
474            bufs0.append(d0.decompress(comp[:32]))
475
476            d1 = d0.copy()
477            bufs1 = bufs0[:]
478
479            bufs0.append(d0.decompress(comp[32:]))
480            s0 = ''.join(bufs0)
481
482            bufs1.append(d1.decompress(comp[32:]))
483            s1 = ''.join(bufs1)
484
485            self.assertEqual(s0,s1)
486            self.assertEqual(s0,data)
487
488        def test_baddecompresscopy(self):
489            # Test copying a compression object in an inconsistent state
490            data = zlib.compress(HAMLET_SCENE)
491            d = zlib.decompressobj()
492            d.decompress(data)
493            d.flush()
494            self.assertRaises(ValueError, d.copy)
495
496    # Memory use of the following functions takes into account overallocation
497
498    @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
499    def test_big_compress_buffer(self, size):
500        c = zlib.compressobj(1)
501        compress = lambda s: c.compress(s) + c.flush()
502        self.check_big_compress_buffer(size, compress)
503
504    @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
505    def test_big_decompress_buffer(self, size):
506        d = zlib.decompressobj()
507        decompress = lambda s: d.decompress(s) + d.flush()
508        self.check_big_decompress_buffer(size, decompress)
509
510
511def genblock(seed, length, step=1024, generator=random):
512    """length-byte stream of random data from a seed (in step-byte blocks)."""
513    if seed is not None:
514        generator.seed(seed)
515    randint = generator.randint
516    if length < step or step < 2:
517        step = length
518    blocks = []
519    for i in range(0, length, step):
520        blocks.append(''.join([chr(randint(0,255))
521                               for x in range(step)]))
522    return ''.join(blocks)[:length]
523
524
525
526def choose_lines(source, number, seed=None, generator=random):
527    """Return a list of number lines randomly chosen from the source"""
528    if seed is not None:
529        generator.seed(seed)
530    sources = source.split('\n')
531    return [generator.choice(sources) for n in range(number)]
532
533
534
535HAMLET_SCENE = """
536LAERTES
537
538       O, fear me not.
539       I stay too long: but here my father comes.
540
541       Enter POLONIUS
542
543       A double blessing is a double grace,
544       Occasion smiles upon a second leave.
545
546LORD POLONIUS
547
548       Yet here, Laertes! aboard, aboard, for shame!
549       The wind sits in the shoulder of your sail,
550       And you are stay'd for. There; my blessing with thee!
551       And these few precepts in thy memory
552       See thou character. Give thy thoughts no tongue,
553       Nor any unproportioned thought his act.
554       Be thou familiar, but by no means vulgar.
555       Those friends thou hast, and their adoption tried,
556       Grapple them to thy soul with hoops of steel;
557       But do not dull thy palm with entertainment
558       Of each new-hatch'd, unfledged comrade. Beware
559       Of entrance to a quarrel, but being in,
560       Bear't that the opposed may beware of thee.
561       Give every man thy ear, but few thy voice;
562       Take each man's censure, but reserve thy judgment.
563       Costly thy habit as thy purse can buy,
564       But not express'd in fancy; rich, not gaudy;
565       For the apparel oft proclaims the man,
566       And they in France of the best rank and station
567       Are of a most select and generous chief in that.
568       Neither a borrower nor a lender be;
569       For loan oft loses both itself and friend,
570       And borrowing dulls the edge of husbandry.
571       This above all: to thine ownself be true,
572       And it must follow, as the night the day,
573       Thou canst not then be false to any man.
574       Farewell: my blessing season this in thee!
575
576LAERTES
577
578       Most humbly do I take my leave, my lord.
579
580LORD POLONIUS
581
582       The time invites you; go; your servants tend.
583
584LAERTES
585
586       Farewell, Ophelia; and remember well
587       What I have said to you.
588
589OPHELIA
590
591       'Tis in my memory lock'd,
592       And you yourself shall keep the key of it.
593
594LAERTES
595
596       Farewell.
597"""
598
599
600def test_main():
601    run_unittest(
602        ChecksumTestCase,
603        ExceptionTestCase,
604        CompressTestCase,
605        CompressObjectTestCase
606    )
607
608if __name__ == "__main__":
609    test_main()
610