test_zlib.py revision e4163e2c8d1882b7318cbaf1d5ba78c0c7070734
1import unittest
2from test.test_support import TESTFN, run_unittest, import_module, unlink, requires
3import binascii
4import random
5from test.test_support import precisionbigmemtest, _1G
6import sys
7
8try:
9    import mmap
10except ImportError:
11    mmap = None
12
13zlib = import_module('zlib')
14
15
16class ChecksumTestCase(unittest.TestCase):
17    # checksum test cases
18    def test_crc32start(self):
19        self.assertEqual(zlib.crc32(""), zlib.crc32("", 0))
20        self.assertTrue(zlib.crc32("abc", 0xffffffff))
21
22    def test_crc32empty(self):
23        self.assertEqual(zlib.crc32("", 0), 0)
24        self.assertEqual(zlib.crc32("", 1), 1)
25        self.assertEqual(zlib.crc32("", 432), 432)
26
27    def test_adler32start(self):
28        self.assertEqual(zlib.adler32(""), zlib.adler32("", 1))
29        self.assertTrue(zlib.adler32("abc", 0xffffffff))
30
31    def test_adler32empty(self):
32        self.assertEqual(zlib.adler32("", 0), 0)
33        self.assertEqual(zlib.adler32("", 1), 1)
34        self.assertEqual(zlib.adler32("", 432), 432)
35
36    def assertEqual32(self, seen, expected):
37        # 32-bit values masked -- checksums on 32- vs 64- bit machines
38        # This is important if bit 31 (0x08000000L) is set.
39        self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL)
40
41    def test_penguins(self):
42        self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L)
43        self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94)
44        self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6)
45        self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7)
46
47        self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0))
48        self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1))
49
50    def test_abcdefghijklmnop(self):
51        """test issue1202 compliance: signed crc32, adler32 in 2.x"""
52        foo = 'abcdefghijklmnop'
53        # explicitly test signed behavior
54        self.assertEqual(zlib.crc32(foo), -1808088941)
55        self.assertEqual(zlib.crc32('spam'), 1138425661)
56        self.assertEqual(zlib.adler32(foo+foo), -721416943)
57        self.assertEqual(zlib.adler32('spam'), 72286642)
58
59    def test_same_as_binascii_crc32(self):
60        foo = 'abcdefghijklmnop'
61        self.assertEqual(binascii.crc32(foo), zlib.crc32(foo))
62        self.assertEqual(binascii.crc32('spam'), zlib.crc32('spam'))
63
64    def test_negative_crc_iv_input(self):
65        # The range of valid input values for the crc state should be
66        # -2**31 through 2**32-1 to allow inputs artifically constrained
67        # to a signed 32-bit integer.
68        self.assertEqual(zlib.crc32('ham', -1), zlib.crc32('ham', 0xffffffffL))
69        self.assertEqual(zlib.crc32('spam', -3141593),
70                         zlib.crc32('spam',  0xffd01027L))
71        self.assertEqual(zlib.crc32('spam', -(2**31)),
72                         zlib.crc32('spam',  (2**31)))
73
74
75# Issue #10276 - check that inputs of 2 GB are handled correctly.
76# Be aware of issues #1202, #8650, #8651 and #10276
77class ChecksumBigBufferTestCase(unittest.TestCase):
78    int_max = 0x7FFFFFFF
79
80    @unittest.skipUnless(mmap, "mmap() is not available.")
81    def test_big_buffer(self):
82        if sys.platform[:3] == 'win' or sys.platform == 'darwin':
83            requires('largefile',
84                     'test requires %s bytes and a long time to run' %
85                     str(self.int_max))
86        try:
87            with open(TESTFN, "wb+") as f:
88                f.seek(self.int_max-4)
89                f.write("asdf")
90                f.flush()
91                m = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
92                try:
93                    self.assertEqual(zlib.crc32(m), 0x709418e7)
94                    self.assertEqual(zlib.adler32(m), -2072837729)
95                finally:
96                    m.close()
97        except (IOError, OverflowError):
98            raise unittest.SkipTest("filesystem doesn't have largefile support")
99        finally:
100            unlink(TESTFN)
101
102
103class ExceptionTestCase(unittest.TestCase):
104    # make sure we generate some expected errors
105    def test_badlevel(self):
106        # specifying compression level out of range causes an error
107        # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
108        # accepts 0 too)
109        self.assertRaises(zlib.error, zlib.compress, 'ERROR', 10)
110
111    def test_badcompressobj(self):
112        # verify failure on building compress object with bad params
113        self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
114        # specifying total bits too large causes an error
115        self.assertRaises(ValueError,
116                zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
117
118    def test_baddecompressobj(self):
119        # verify failure on building decompress object with bad params
120        self.assertRaises(ValueError, zlib.decompressobj, -1)
121
122    def test_decompressobj_badflush(self):
123        # verify failure on calling decompressobj.flush with bad params
124        self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
125        self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
126
127
128class BaseCompressTestCase(object):
129    def check_big_compress_buffer(self, size, compress_func):
130        _1M = 1024 * 1024
131        fmt = "%%0%dx" % (2 * _1M)
132        # Generate 10MB worth of random, and expand it by repeating it.
133        # The assumption is that zlib's memory is not big enough to exploit
134        # such spread out redundancy.
135        data = ''.join([binascii.a2b_hex(fmt % random.getrandbits(8 * _1M))
136                        for i in range(10)])
137        data = data * (size // len(data) + 1)
138        try:
139            compress_func(data)
140        finally:
141            # Release memory
142            data = None
143
144    def check_big_decompress_buffer(self, size, decompress_func):
145        data = 'x' * size
146        try:
147            compressed = zlib.compress(data, 1)
148        finally:
149            # Release memory
150            data = None
151        data = decompress_func(compressed)
152        # Sanity check
153        try:
154            self.assertEqual(len(data), size)
155            self.assertEqual(len(data.strip('x')), 0)
156        finally:
157            data = None
158
159
160class CompressTestCase(BaseCompressTestCase, unittest.TestCase):
161    # Test compression in one go (whole message compression)
162    def test_speech(self):
163        x = zlib.compress(HAMLET_SCENE)
164        self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
165
166    def test_speech128(self):
167        # compress more data
168        data = HAMLET_SCENE * 128
169        x = zlib.compress(data)
170        self.assertEqual(zlib.decompress(x), data)
171
172    def test_incomplete_stream(self):
173        # An useful error message is given
174        x = zlib.compress(HAMLET_SCENE)
175        self.assertRaisesRegexp(zlib.error,
176            "Error -5 while decompressing data: incomplete or truncated stream",
177            zlib.decompress, x[:-1])
178
179    # Memory use of the following functions takes into account overallocation
180
181    @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
182    def test_big_compress_buffer(self, size):
183        compress = lambda s: zlib.compress(s, 1)
184        self.check_big_compress_buffer(size, compress)
185
186    @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
187    def test_big_decompress_buffer(self, size):
188        self.check_big_decompress_buffer(size, zlib.decompress)
189
190
191class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
192    # Test compression object
193    def test_pair(self):
194        # straightforward compress/decompress objects
195        data = HAMLET_SCENE * 128
196        co = zlib.compressobj()
197        x1 = co.compress(data)
198        x2 = co.flush()
199        self.assertRaises(zlib.error, co.flush) # second flush should not work
200        dco = zlib.decompressobj()
201        y1 = dco.decompress(x1 + x2)
202        y2 = dco.flush()
203        self.assertEqual(data, y1 + y2)
204
205    def test_compressoptions(self):
206        # specify lots of options to compressobj()
207        level = 2
208        method = zlib.DEFLATED
209        wbits = -12
210        memlevel = 9
211        strategy = zlib.Z_FILTERED
212        co = zlib.compressobj(level, method, wbits, memlevel, strategy)
213        x1 = co.compress(HAMLET_SCENE)
214        x2 = co.flush()
215        dco = zlib.decompressobj(wbits)
216        y1 = dco.decompress(x1 + x2)
217        y2 = dco.flush()
218        self.assertEqual(HAMLET_SCENE, y1 + y2)
219
220    def test_compressincremental(self):
221        # compress object in steps, decompress object as one-shot
222        data = HAMLET_SCENE * 128
223        co = zlib.compressobj()
224        bufs = []
225        for i in range(0, len(data), 256):
226            bufs.append(co.compress(data[i:i+256]))
227        bufs.append(co.flush())
228        combuf = ''.join(bufs)
229
230        dco = zlib.decompressobj()
231        y1 = dco.decompress(''.join(bufs))
232        y2 = dco.flush()
233        self.assertEqual(data, y1 + y2)
234
235    def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
236        # compress object in steps, decompress object in steps
237        source = source or HAMLET_SCENE
238        data = source * 128
239        co = zlib.compressobj()
240        bufs = []
241        for i in range(0, len(data), cx):
242            bufs.append(co.compress(data[i:i+cx]))
243        bufs.append(co.flush())
244        combuf = ''.join(bufs)
245
246        self.assertEqual(data, zlib.decompress(combuf))
247
248        dco = zlib.decompressobj()
249        bufs = []
250        for i in range(0, len(combuf), dcx):
251            bufs.append(dco.decompress(combuf[i:i+dcx]))
252            self.assertEqual('', dco.unconsumed_tail, ########
253                             "(A) uct should be '': not %d long" %
254                                       len(dco.unconsumed_tail))
255        if flush:
256            bufs.append(dco.flush())
257        else:
258            while True:
259                chunk = dco.decompress('')
260                if chunk:
261                    bufs.append(chunk)
262                else:
263                    break
264        self.assertEqual('', dco.unconsumed_tail, ########
265                         "(B) uct should be '': not %d long" %
266                                       len(dco.unconsumed_tail))
267        self.assertEqual(data, ''.join(bufs))
268        # Failure means: "decompressobj with init options failed"
269
270    def test_decompincflush(self):
271        self.test_decompinc(flush=True)
272
273    def test_decompimax(self, source=None, cx=256, dcx=64):
274        # compress in steps, decompress in length-restricted steps
275        source = source or HAMLET_SCENE
276        # Check a decompression object with max_length specified
277        data = source * 128
278        co = zlib.compressobj()
279        bufs = []
280        for i in range(0, len(data), cx):
281            bufs.append(co.compress(data[i:i+cx]))
282        bufs.append(co.flush())
283        combuf = ''.join(bufs)
284        self.assertEqual(data, zlib.decompress(combuf),
285                         'compressed data failure')
286
287        dco = zlib.decompressobj()
288        bufs = []
289        cb = combuf
290        while cb:
291            #max_length = 1 + len(cb)//10
292            chunk = dco.decompress(cb, dcx)
293            self.assertFalse(len(chunk) > dcx,
294                    'chunk too big (%d>%d)' % (len(chunk), dcx))
295            bufs.append(chunk)
296            cb = dco.unconsumed_tail
297        bufs.append(dco.flush())
298        self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
299
300    def test_decompressmaxlen(self, flush=False):
301        # Check a decompression object with max_length specified
302        data = HAMLET_SCENE * 128
303        co = zlib.compressobj()
304        bufs = []
305        for i in range(0, len(data), 256):
306            bufs.append(co.compress(data[i:i+256]))
307        bufs.append(co.flush())
308        combuf = ''.join(bufs)
309        self.assertEqual(data, zlib.decompress(combuf),
310                         'compressed data failure')
311
312        dco = zlib.decompressobj()
313        bufs = []
314        cb = combuf
315        while cb:
316            max_length = 1 + len(cb)//10
317            chunk = dco.decompress(cb, max_length)
318            self.assertFalse(len(chunk) > max_length,
319                        'chunk too big (%d>%d)' % (len(chunk),max_length))
320            bufs.append(chunk)
321            cb = dco.unconsumed_tail
322        if flush:
323            bufs.append(dco.flush())
324        else:
325            while chunk:
326                chunk = dco.decompress('', max_length)
327                self.assertFalse(len(chunk) > max_length,
328                            'chunk too big (%d>%d)' % (len(chunk),max_length))
329                bufs.append(chunk)
330        self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
331
332    def test_decompressmaxlenflush(self):
333        self.test_decompressmaxlen(flush=True)
334
335    def test_maxlenmisc(self):
336        # Misc tests of max_length
337        dco = zlib.decompressobj()
338        self.assertRaises(ValueError, dco.decompress, "", -1)
339        self.assertEqual('', dco.unconsumed_tail)
340
341    def test_flushes(self):
342        # Test flush() with the various options, using all the
343        # different levels in order to provide more variations.
344        sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH']
345        sync_opt = [getattr(zlib, opt) for opt in sync_opt
346                    if hasattr(zlib, opt)]
347        data = HAMLET_SCENE * 8
348
349        for sync in sync_opt:
350            for level in range(10):
351                obj = zlib.compressobj( level )
352                a = obj.compress( data[:3000] )
353                b = obj.flush( sync )
354                c = obj.compress( data[3000:] )
355                d = obj.flush()
356                self.assertEqual(zlib.decompress(''.join([a,b,c,d])),
357                                 data, ("Decompress failed: flush "
358                                        "mode=%i, level=%i") % (sync, level))
359                del obj
360
361    def test_odd_flush(self):
362        # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
363        import random
364
365        if hasattr(zlib, 'Z_SYNC_FLUSH'):
366            # Testing on 17K of "random" data
367
368            # Create compressor and decompressor objects
369            co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
370            dco = zlib.decompressobj()
371
372            # Try 17K of data
373            # generate random data stream
374            try:
375                # In 2.3 and later, WichmannHill is the RNG of the bug report
376                gen = random.WichmannHill()
377            except AttributeError:
378                try:
379                    # 2.2 called it Random
380                    gen = random.Random()
381                except AttributeError:
382                    # others might simply have a single RNG
383                    gen = random
384            gen.seed(1)
385            data = genblock(1, 17 * 1024, generator=gen)
386
387            # compress, sync-flush, and decompress
388            first = co.compress(data)
389            second = co.flush(zlib.Z_SYNC_FLUSH)
390            expanded = dco.decompress(first + second)
391
392            # if decompressed data is different from the input data, choke.
393            self.assertEqual(expanded, data, "17K random source doesn't match")
394
395    def test_empty_flush(self):
396        # Test that calling .flush() on unused objects works.
397        # (Bug #1083110 -- calling .flush() on decompress objects
398        # caused a core dump.)
399
400        co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
401        self.assertTrue(co.flush())  # Returns a zlib header
402        dco = zlib.decompressobj()
403        self.assertEqual(dco.flush(), "") # Returns nothing
404
405    def test_decompress_incomplete_stream(self):
406        # This is 'foo', deflated
407        x = 'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
408        # For the record
409        self.assertEqual(zlib.decompress(x), 'foo')
410        self.assertRaises(zlib.error, zlib.decompress, x[:-5])
411        # Omitting the stream end works with decompressor objects
412        # (see issue #8672).
413        dco = zlib.decompressobj()
414        y = dco.decompress(x[:-5])
415        y += dco.flush()
416        self.assertEqual(y, 'foo')
417
418    if hasattr(zlib.compressobj(), "copy"):
419        def test_compresscopy(self):
420            # Test copying a compression object
421            data0 = HAMLET_SCENE
422            data1 = HAMLET_SCENE.swapcase()
423            c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
424            bufs0 = []
425            bufs0.append(c0.compress(data0))
426
427            c1 = c0.copy()
428            bufs1 = bufs0[:]
429
430            bufs0.append(c0.compress(data0))
431            bufs0.append(c0.flush())
432            s0 = ''.join(bufs0)
433
434            bufs1.append(c1.compress(data1))
435            bufs1.append(c1.flush())
436            s1 = ''.join(bufs1)
437
438            self.assertEqual(zlib.decompress(s0),data0+data0)
439            self.assertEqual(zlib.decompress(s1),data0+data1)
440
441        def test_badcompresscopy(self):
442            # Test copying a compression object in an inconsistent state
443            c = zlib.compressobj()
444            c.compress(HAMLET_SCENE)
445            c.flush()
446            self.assertRaises(ValueError, c.copy)
447
448    if hasattr(zlib.decompressobj(), "copy"):
449        def test_decompresscopy(self):
450            # Test copying a decompression object
451            data = HAMLET_SCENE
452            comp = zlib.compress(data)
453
454            d0 = zlib.decompressobj()
455            bufs0 = []
456            bufs0.append(d0.decompress(comp[:32]))
457
458            d1 = d0.copy()
459            bufs1 = bufs0[:]
460
461            bufs0.append(d0.decompress(comp[32:]))
462            s0 = ''.join(bufs0)
463
464            bufs1.append(d1.decompress(comp[32:]))
465            s1 = ''.join(bufs1)
466
467            self.assertEqual(s0,s1)
468            self.assertEqual(s0,data)
469
470        def test_baddecompresscopy(self):
471            # Test copying a compression object in an inconsistent state
472            data = zlib.compress(HAMLET_SCENE)
473            d = zlib.decompressobj()
474            d.decompress(data)
475            d.flush()
476            self.assertRaises(ValueError, d.copy)
477
478    # Memory use of the following functions takes into account overallocation
479
480    @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
481    def test_big_compress_buffer(self, size):
482        c = zlib.compressobj(1)
483        compress = lambda s: c.compress(s) + c.flush()
484        self.check_big_compress_buffer(size, compress)
485
486    @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
487    def test_big_decompress_buffer(self, size):
488        d = zlib.decompressobj()
489        decompress = lambda s: d.decompress(s) + d.flush()
490        self.check_big_decompress_buffer(size, decompress)
491
492
493def genblock(seed, length, step=1024, generator=random):
494    """length-byte stream of random data from a seed (in step-byte blocks)."""
495    if seed is not None:
496        generator.seed(seed)
497    randint = generator.randint
498    if length < step or step < 2:
499        step = length
500    blocks = []
501    for i in range(0, length, step):
502        blocks.append(''.join([chr(randint(0,255))
503                               for x in range(step)]))
504    return ''.join(blocks)[:length]
505
506
507
508def choose_lines(source, number, seed=None, generator=random):
509    """Return a list of number lines randomly chosen from the source"""
510    if seed is not None:
511        generator.seed(seed)
512    sources = source.split('\n')
513    return [generator.choice(sources) for n in range(number)]
514
515
516
517HAMLET_SCENE = """
518LAERTES
519
520       O, fear me not.
521       I stay too long: but here my father comes.
522
523       Enter POLONIUS
524
525       A double blessing is a double grace,
526       Occasion smiles upon a second leave.
527
528LORD POLONIUS
529
530       Yet here, Laertes! aboard, aboard, for shame!
531       The wind sits in the shoulder of your sail,
532       And you are stay'd for. There; my blessing with thee!
533       And these few precepts in thy memory
534       See thou character. Give thy thoughts no tongue,
535       Nor any unproportioned thought his act.
536       Be thou familiar, but by no means vulgar.
537       Those friends thou hast, and their adoption tried,
538       Grapple them to thy soul with hoops of steel;
539       But do not dull thy palm with entertainment
540       Of each new-hatch'd, unfledged comrade. Beware
541       Of entrance to a quarrel, but being in,
542       Bear't that the opposed may beware of thee.
543       Give every man thy ear, but few thy voice;
544       Take each man's censure, but reserve thy judgment.
545       Costly thy habit as thy purse can buy,
546       But not express'd in fancy; rich, not gaudy;
547       For the apparel oft proclaims the man,
548       And they in France of the best rank and station
549       Are of a most select and generous chief in that.
550       Neither a borrower nor a lender be;
551       For loan oft loses both itself and friend,
552       And borrowing dulls the edge of husbandry.
553       This above all: to thine ownself be true,
554       And it must follow, as the night the day,
555       Thou canst not then be false to any man.
556       Farewell: my blessing season this in thee!
557
558LAERTES
559
560       Most humbly do I take my leave, my lord.
561
562LORD POLONIUS
563
564       The time invites you; go; your servants tend.
565
566LAERTES
567
568       Farewell, Ophelia; and remember well
569       What I have said to you.
570
571OPHELIA
572
573       'Tis in my memory lock'd,
574       And you yourself shall keep the key of it.
575
576LAERTES
577
578       Farewell.
579"""
580
581
582def test_main():
583    run_unittest(
584        ChecksumTestCase,
585        ChecksumBigBufferTestCase,
586        ExceptionTestCase,
587        CompressTestCase,
588        CompressObjectTestCase
589    )
590
591if __name__ == "__main__":
592    test_main()
593