test_zlib.py revision fc3bfad2e0376ea5aa8a0c5352aafd3d731aaad7
1import unittest
2from test import test_support
3import binascii
4import random
5from test.test_support import precisionbigmemtest, _1G
6
7zlib = test_support.import_module('zlib')
8
9
10class ChecksumTestCase(unittest.TestCase):
11    # checksum test cases
12    def test_crc32start(self):
13        self.assertEqual(zlib.crc32(""), zlib.crc32("", 0))
14        self.assertTrue(zlib.crc32("abc", 0xffffffff))
15
16    def test_crc32empty(self):
17        self.assertEqual(zlib.crc32("", 0), 0)
18        self.assertEqual(zlib.crc32("", 1), 1)
19        self.assertEqual(zlib.crc32("", 432), 432)
20
21    def test_adler32start(self):
22        self.assertEqual(zlib.adler32(""), zlib.adler32("", 1))
23        self.assertTrue(zlib.adler32("abc", 0xffffffff))
24
25    def test_adler32empty(self):
26        self.assertEqual(zlib.adler32("", 0), 0)
27        self.assertEqual(zlib.adler32("", 1), 1)
28        self.assertEqual(zlib.adler32("", 432), 432)
29
30    def assertEqual32(self, seen, expected):
31        # 32-bit values masked -- checksums on 32- vs 64- bit machines
32        # This is important if bit 31 (0x08000000L) is set.
33        self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL)
34
35    def test_penguins(self):
36        self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L)
37        self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94)
38        self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6)
39        self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7)
40
41        self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0))
42        self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1))
43
44    def test_abcdefghijklmnop(self):
45        """test issue1202 compliance: signed crc32, adler32 in 2.x"""
46        foo = 'abcdefghijklmnop'
47        # explicitly test signed behavior
48        self.assertEqual(zlib.crc32(foo), -1808088941)
49        self.assertEqual(zlib.crc32('spam'), 1138425661)
50        self.assertEqual(zlib.adler32(foo+foo), -721416943)
51        self.assertEqual(zlib.adler32('spam'), 72286642)
52
53    def test_same_as_binascii_crc32(self):
54        foo = 'abcdefghijklmnop'
55        self.assertEqual(binascii.crc32(foo), zlib.crc32(foo))
56        self.assertEqual(binascii.crc32('spam'), zlib.crc32('spam'))
57
58    def test_negative_crc_iv_input(self):
59        # The range of valid input values for the crc state should be
60        # -2**31 through 2**32-1 to allow inputs artifically constrained
61        # to a signed 32-bit integer.
62        self.assertEqual(zlib.crc32('ham', -1), zlib.crc32('ham', 0xffffffffL))
63        self.assertEqual(zlib.crc32('spam', -3141593),
64                         zlib.crc32('spam',  0xffd01027L))
65        self.assertEqual(zlib.crc32('spam', -(2**31)),
66                         zlib.crc32('spam',  (2**31)))
67
68
69class ExceptionTestCase(unittest.TestCase):
70    # make sure we generate some expected errors
71    def test_badlevel(self):
72        # specifying compression level out of range causes an error
73        # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
74        # accepts 0 too)
75        self.assertRaises(zlib.error, zlib.compress, 'ERROR', 10)
76
77    def test_badcompressobj(self):
78        # verify failure on building compress object with bad params
79        self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
80        # specifying total bits too large causes an error
81        self.assertRaises(ValueError,
82                zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
83
84    def test_baddecompressobj(self):
85        # verify failure on building decompress object with bad params
86        self.assertRaises(ValueError, zlib.decompressobj, -1)
87
88    def test_decompressobj_badflush(self):
89        # verify failure on calling decompressobj.flush with bad params
90        self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
91        self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
92
93
94class BaseCompressTestCase(object):
95    def check_big_compress_buffer(self, size, compress_func):
96        _1M = 1024 * 1024
97        fmt = "%%0%dx" % (2 * _1M)
98        # Generate 10MB worth of random, and expand it by repeating it.
99        # The assumption is that zlib's memory is not big enough to exploit
100        # such spread out redundancy.
101        data = ''.join([binascii.a2b_hex(fmt % random.getrandbits(8 * _1M))
102                        for i in range(10)])
103        data = data * (size // len(data) + 1)
104        try:
105            compress_func(data)
106        finally:
107            # Release memory
108            data = None
109
110    def check_big_decompress_buffer(self, size, decompress_func):
111        data = 'x' * size
112        try:
113            compressed = zlib.compress(data, 1)
114        finally:
115            # Release memory
116            data = None
117        data = decompress_func(compressed)
118        # Sanity check
119        try:
120            self.assertEqual(len(data), size)
121            self.assertEqual(len(data.strip('x')), 0)
122        finally:
123            data = None
124
125
126class CompressTestCase(BaseCompressTestCase, unittest.TestCase):
127    # Test compression in one go (whole message compression)
128    def test_speech(self):
129        x = zlib.compress(HAMLET_SCENE)
130        self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
131
132    def test_speech128(self):
133        # compress more data
134        data = HAMLET_SCENE * 128
135        x = zlib.compress(data)
136        self.assertEqual(zlib.decompress(x), data)
137
138    def test_incomplete_stream(self):
139        # An useful error message is given
140        x = zlib.compress(HAMLET_SCENE)
141        self.assertRaisesRegexp(zlib.error,
142            "Error -5 while decompressing data: incomplete or truncated stream",
143            zlib.decompress, x[:-1])
144
145    # Memory use of the following functions takes into account overallocation
146
147    @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
148    def test_big_compress_buffer(self, size):
149        compress = lambda s: zlib.compress(s, 1)
150        self.check_big_compress_buffer(size, compress)
151
152    @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
153    def test_big_decompress_buffer(self, size):
154        self.check_big_decompress_buffer(size, zlib.decompress)
155
156
157class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
158    # Test compression object
159    def test_pair(self):
160        # straightforward compress/decompress objects
161        data = HAMLET_SCENE * 128
162        co = zlib.compressobj()
163        x1 = co.compress(data)
164        x2 = co.flush()
165        self.assertRaises(zlib.error, co.flush) # second flush should not work
166        dco = zlib.decompressobj()
167        y1 = dco.decompress(x1 + x2)
168        y2 = dco.flush()
169        self.assertEqual(data, y1 + y2)
170
171    def test_compressoptions(self):
172        # specify lots of options to compressobj()
173        level = 2
174        method = zlib.DEFLATED
175        wbits = -12
176        memlevel = 9
177        strategy = zlib.Z_FILTERED
178        co = zlib.compressobj(level, method, wbits, memlevel, strategy)
179        x1 = co.compress(HAMLET_SCENE)
180        x2 = co.flush()
181        dco = zlib.decompressobj(wbits)
182        y1 = dco.decompress(x1 + x2)
183        y2 = dco.flush()
184        self.assertEqual(HAMLET_SCENE, y1 + y2)
185
186    def test_compressincremental(self):
187        # compress object in steps, decompress object as one-shot
188        data = HAMLET_SCENE * 128
189        co = zlib.compressobj()
190        bufs = []
191        for i in range(0, len(data), 256):
192            bufs.append(co.compress(data[i:i+256]))
193        bufs.append(co.flush())
194        combuf = ''.join(bufs)
195
196        dco = zlib.decompressobj()
197        y1 = dco.decompress(''.join(bufs))
198        y2 = dco.flush()
199        self.assertEqual(data, y1 + y2)
200
201    def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
202        # compress object in steps, decompress object in steps
203        source = source or HAMLET_SCENE
204        data = source * 128
205        co = zlib.compressobj()
206        bufs = []
207        for i in range(0, len(data), cx):
208            bufs.append(co.compress(data[i:i+cx]))
209        bufs.append(co.flush())
210        combuf = ''.join(bufs)
211
212        self.assertEqual(data, zlib.decompress(combuf))
213
214        dco = zlib.decompressobj()
215        bufs = []
216        for i in range(0, len(combuf), dcx):
217            bufs.append(dco.decompress(combuf[i:i+dcx]))
218            self.assertEqual('', dco.unconsumed_tail, ########
219                             "(A) uct should be '': not %d long" %
220                                       len(dco.unconsumed_tail))
221        if flush:
222            bufs.append(dco.flush())
223        else:
224            while True:
225                chunk = dco.decompress('')
226                if chunk:
227                    bufs.append(chunk)
228                else:
229                    break
230        self.assertEqual('', dco.unconsumed_tail, ########
231                         "(B) uct should be '': not %d long" %
232                                       len(dco.unconsumed_tail))
233        self.assertEqual(data, ''.join(bufs))
234        # Failure means: "decompressobj with init options failed"
235
236    def test_decompincflush(self):
237        self.test_decompinc(flush=True)
238
239    def test_decompimax(self, source=None, cx=256, dcx=64):
240        # compress in steps, decompress in length-restricted steps
241        source = source or HAMLET_SCENE
242        # Check a decompression object with max_length specified
243        data = source * 128
244        co = zlib.compressobj()
245        bufs = []
246        for i in range(0, len(data), cx):
247            bufs.append(co.compress(data[i:i+cx]))
248        bufs.append(co.flush())
249        combuf = ''.join(bufs)
250        self.assertEqual(data, zlib.decompress(combuf),
251                         'compressed data failure')
252
253        dco = zlib.decompressobj()
254        bufs = []
255        cb = combuf
256        while cb:
257            #max_length = 1 + len(cb)//10
258            chunk = dco.decompress(cb, dcx)
259            self.assertFalse(len(chunk) > dcx,
260                    'chunk too big (%d>%d)' % (len(chunk), dcx))
261            bufs.append(chunk)
262            cb = dco.unconsumed_tail
263        bufs.append(dco.flush())
264        self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
265
266    def test_decompressmaxlen(self, flush=False):
267        # Check a decompression object with max_length specified
268        data = HAMLET_SCENE * 128
269        co = zlib.compressobj()
270        bufs = []
271        for i in range(0, len(data), 256):
272            bufs.append(co.compress(data[i:i+256]))
273        bufs.append(co.flush())
274        combuf = ''.join(bufs)
275        self.assertEqual(data, zlib.decompress(combuf),
276                         'compressed data failure')
277
278        dco = zlib.decompressobj()
279        bufs = []
280        cb = combuf
281        while cb:
282            max_length = 1 + len(cb)//10
283            chunk = dco.decompress(cb, max_length)
284            self.assertFalse(len(chunk) > max_length,
285                        'chunk too big (%d>%d)' % (len(chunk),max_length))
286            bufs.append(chunk)
287            cb = dco.unconsumed_tail
288        if flush:
289            bufs.append(dco.flush())
290        else:
291            while chunk:
292                chunk = dco.decompress('', max_length)
293                self.assertFalse(len(chunk) > max_length,
294                            'chunk too big (%d>%d)' % (len(chunk),max_length))
295                bufs.append(chunk)
296        self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
297
298    def test_decompressmaxlenflush(self):
299        self.test_decompressmaxlen(flush=True)
300
301    def test_maxlenmisc(self):
302        # Misc tests of max_length
303        dco = zlib.decompressobj()
304        self.assertRaises(ValueError, dco.decompress, "", -1)
305        self.assertEqual('', dco.unconsumed_tail)
306
307    def test_flushes(self):
308        # Test flush() with the various options, using all the
309        # different levels in order to provide more variations.
310        sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH']
311        sync_opt = [getattr(zlib, opt) for opt in sync_opt
312                    if hasattr(zlib, opt)]
313        data = HAMLET_SCENE * 8
314
315        for sync in sync_opt:
316            for level in range(10):
317                obj = zlib.compressobj( level )
318                a = obj.compress( data[:3000] )
319                b = obj.flush( sync )
320                c = obj.compress( data[3000:] )
321                d = obj.flush()
322                self.assertEqual(zlib.decompress(''.join([a,b,c,d])),
323                                 data, ("Decompress failed: flush "
324                                        "mode=%i, level=%i") % (sync, level))
325                del obj
326
327    def test_odd_flush(self):
328        # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
329        import random
330
331        if hasattr(zlib, 'Z_SYNC_FLUSH'):
332            # Testing on 17K of "random" data
333
334            # Create compressor and decompressor objects
335            co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
336            dco = zlib.decompressobj()
337
338            # Try 17K of data
339            # generate random data stream
340            try:
341                # In 2.3 and later, WichmannHill is the RNG of the bug report
342                gen = random.WichmannHill()
343            except AttributeError:
344                try:
345                    # 2.2 called it Random
346                    gen = random.Random()
347                except AttributeError:
348                    # others might simply have a single RNG
349                    gen = random
350            gen.seed(1)
351            data = genblock(1, 17 * 1024, generator=gen)
352
353            # compress, sync-flush, and decompress
354            first = co.compress(data)
355            second = co.flush(zlib.Z_SYNC_FLUSH)
356            expanded = dco.decompress(first + second)
357
358            # if decompressed data is different from the input data, choke.
359            self.assertEqual(expanded, data, "17K random source doesn't match")
360
361    def test_empty_flush(self):
362        # Test that calling .flush() on unused objects works.
363        # (Bug #1083110 -- calling .flush() on decompress objects
364        # caused a core dump.)
365
366        co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
367        self.assertTrue(co.flush())  # Returns a zlib header
368        dco = zlib.decompressobj()
369        self.assertEqual(dco.flush(), "") # Returns nothing
370
371    def test_decompress_incomplete_stream(self):
372        # This is 'foo', deflated
373        x = 'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
374        # For the record
375        self.assertEqual(zlib.decompress(x), 'foo')
376        self.assertRaises(zlib.error, zlib.decompress, x[:-5])
377        # Omitting the stream end works with decompressor objects
378        # (see issue #8672).
379        dco = zlib.decompressobj()
380        y = dco.decompress(x[:-5])
381        y += dco.flush()
382        self.assertEqual(y, 'foo')
383
384    if hasattr(zlib.compressobj(), "copy"):
385        def test_compresscopy(self):
386            # Test copying a compression object
387            data0 = HAMLET_SCENE
388            data1 = HAMLET_SCENE.swapcase()
389            c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
390            bufs0 = []
391            bufs0.append(c0.compress(data0))
392
393            c1 = c0.copy()
394            bufs1 = bufs0[:]
395
396            bufs0.append(c0.compress(data0))
397            bufs0.append(c0.flush())
398            s0 = ''.join(bufs0)
399
400            bufs1.append(c1.compress(data1))
401            bufs1.append(c1.flush())
402            s1 = ''.join(bufs1)
403
404            self.assertEqual(zlib.decompress(s0),data0+data0)
405            self.assertEqual(zlib.decompress(s1),data0+data1)
406
407        def test_badcompresscopy(self):
408            # Test copying a compression object in an inconsistent state
409            c = zlib.compressobj()
410            c.compress(HAMLET_SCENE)
411            c.flush()
412            self.assertRaises(ValueError, c.copy)
413
414    if hasattr(zlib.decompressobj(), "copy"):
415        def test_decompresscopy(self):
416            # Test copying a decompression object
417            data = HAMLET_SCENE
418            comp = zlib.compress(data)
419
420            d0 = zlib.decompressobj()
421            bufs0 = []
422            bufs0.append(d0.decompress(comp[:32]))
423
424            d1 = d0.copy()
425            bufs1 = bufs0[:]
426
427            bufs0.append(d0.decompress(comp[32:]))
428            s0 = ''.join(bufs0)
429
430            bufs1.append(d1.decompress(comp[32:]))
431            s1 = ''.join(bufs1)
432
433            self.assertEqual(s0,s1)
434            self.assertEqual(s0,data)
435
436        def test_baddecompresscopy(self):
437            # Test copying a compression object in an inconsistent state
438            data = zlib.compress(HAMLET_SCENE)
439            d = zlib.decompressobj()
440            d.decompress(data)
441            d.flush()
442            self.assertRaises(ValueError, d.copy)
443
444    # Memory use of the following functions takes into account overallocation
445
446    @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
447    def test_big_compress_buffer(self, size):
448        c = zlib.compressobj(1)
449        compress = lambda s: c.compress(s) + c.flush()
450        self.check_big_compress_buffer(size, compress)
451
452    @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
453    def test_big_decompress_buffer(self, size):
454        d = zlib.decompressobj()
455        decompress = lambda s: d.decompress(s) + d.flush()
456        self.check_big_decompress_buffer(size, decompress)
457
458
459def genblock(seed, length, step=1024, generator=random):
460    """length-byte stream of random data from a seed (in step-byte blocks)."""
461    if seed is not None:
462        generator.seed(seed)
463    randint = generator.randint
464    if length < step or step < 2:
465        step = length
466    blocks = []
467    for i in range(0, length, step):
468        blocks.append(''.join([chr(randint(0,255))
469                               for x in range(step)]))
470    return ''.join(blocks)[:length]
471
472
473
474def choose_lines(source, number, seed=None, generator=random):
475    """Return a list of number lines randomly chosen from the source"""
476    if seed is not None:
477        generator.seed(seed)
478    sources = source.split('\n')
479    return [generator.choice(sources) for n in range(number)]
480
481
482
483HAMLET_SCENE = """
484LAERTES
485
486       O, fear me not.
487       I stay too long: but here my father comes.
488
489       Enter POLONIUS
490
491       A double blessing is a double grace,
492       Occasion smiles upon a second leave.
493
494LORD POLONIUS
495
496       Yet here, Laertes! aboard, aboard, for shame!
497       The wind sits in the shoulder of your sail,
498       And you are stay'd for. There; my blessing with thee!
499       And these few precepts in thy memory
500       See thou character. Give thy thoughts no tongue,
501       Nor any unproportioned thought his act.
502       Be thou familiar, but by no means vulgar.
503       Those friends thou hast, and their adoption tried,
504       Grapple them to thy soul with hoops of steel;
505       But do not dull thy palm with entertainment
506       Of each new-hatch'd, unfledged comrade. Beware
507       Of entrance to a quarrel, but being in,
508       Bear't that the opposed may beware of thee.
509       Give every man thy ear, but few thy voice;
510       Take each man's censure, but reserve thy judgment.
511       Costly thy habit as thy purse can buy,
512       But not express'd in fancy; rich, not gaudy;
513       For the apparel oft proclaims the man,
514       And they in France of the best rank and station
515       Are of a most select and generous chief in that.
516       Neither a borrower nor a lender be;
517       For loan oft loses both itself and friend,
518       And borrowing dulls the edge of husbandry.
519       This above all: to thine ownself be true,
520       And it must follow, as the night the day,
521       Thou canst not then be false to any man.
522       Farewell: my blessing season this in thee!
523
524LAERTES
525
526       Most humbly do I take my leave, my lord.
527
528LORD POLONIUS
529
530       The time invites you; go; your servants tend.
531
532LAERTES
533
534       Farewell, Ophelia; and remember well
535       What I have said to you.
536
537OPHELIA
538
539       'Tis in my memory lock'd,
540       And you yourself shall keep the key of it.
541
542LAERTES
543
544       Farewell.
545"""
546
547
548def test_main():
549    test_support.run_unittest(
550        ChecksumTestCase,
551        ExceptionTestCase,
552        CompressTestCase,
553        CompressObjectTestCase
554    )
555
556if __name__ == "__main__":
557    test_main()
558