test_zlib.py revision c856fa811dfdd6d17396d9aa522eea19b0f46c93
1import unittest
2from test import test_support
3import zlib
4import binascii
5import random
6
7
8class ChecksumTestCase(unittest.TestCase):
9    # checksum test cases
10    def test_crc32start(self):
11        self.assertEqual(zlib.crc32(""), zlib.crc32("", 0))
12        self.assert_(zlib.crc32("abc", 0xffffffff))
13
14    def test_crc32empty(self):
15        self.assertEqual(zlib.crc32("", 0), 0)
16        self.assertEqual(zlib.crc32("", 1), 1)
17        self.assertEqual(zlib.crc32("", 432), 432)
18
19    def test_adler32start(self):
20        self.assertEqual(zlib.adler32(""), zlib.adler32("", 1))
21        self.assert_(zlib.adler32("abc", 0xffffffff))
22
23    def test_adler32empty(self):
24        self.assertEqual(zlib.adler32("", 0), 0)
25        self.assertEqual(zlib.adler32("", 1), 1)
26        self.assertEqual(zlib.adler32("", 432), 432)
27
28    def assertEqual32(self, seen, expected):
29        # 32-bit values masked -- checksums on 32- vs 64- bit machines
30        # This is important if bit 31 (0x08000000L) is set.
31        self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL)
32
33    def test_penguins(self):
34        self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L)
35        self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94)
36        self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6)
37        self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7)
38
39        self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0))
40        self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1))
41
42    def test_abcdefghijklmnop(self):
43        """test issue1202 compliance: signed crc32, adler32 in 2.x"""
44        foo = 'abcdefghijklmnop'
45        # explicitly test signed behavior
46        self.assertEqual(zlib.crc32(foo), -1808088941)
47        self.assertEqual(zlib.crc32('spam'), 1138425661)
48        self.assertEqual(zlib.adler32(foo+foo), -721416943)
49        self.assertEqual(zlib.adler32('spam'), 72286642)
50
51    def test_same_as_binascii_crc32(self):
52        foo = 'abcdefghijklmnop'
53        self.assertEqual(binascii.crc32(foo), zlib.crc32(foo))
54        self.assertEqual(binascii.crc32('spam'), zlib.crc32('spam'))
55
56
57
58class ExceptionTestCase(unittest.TestCase):
59    # make sure we generate some expected errors
60    def test_badlevel(self):
61        # specifying compression level out of range causes an error
62        # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
63        # accepts 0 too)
64        self.assertRaises(zlib.error, zlib.compress, 'ERROR', 10)
65
66    def test_badcompressobj(self):
67        # verify failure on building compress object with bad params
68        self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
69        # specifying total bits too large causes an error
70        self.assertRaises(ValueError,
71                zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
72
73    def test_baddecompressobj(self):
74        # verify failure on building decompress object with bad params
75        self.assertRaises(ValueError, zlib.decompressobj, 0)
76
77
78
79class CompressTestCase(unittest.TestCase):
80    # Test compression in one go (whole message compression)
81    def test_speech(self):
82        x = zlib.compress(HAMLET_SCENE)
83        self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
84
85    def test_speech128(self):
86        # compress more data
87        data = HAMLET_SCENE * 128
88        x = zlib.compress(data)
89        self.assertEqual(zlib.decompress(x), data)
90
91
92
93
94class CompressObjectTestCase(unittest.TestCase):
95    # Test compression object
96    def test_pair(self):
97        # straightforward compress/decompress objects
98        data = HAMLET_SCENE * 128
99        co = zlib.compressobj()
100        x1 = co.compress(data)
101        x2 = co.flush()
102        self.assertRaises(zlib.error, co.flush) # second flush should not work
103        dco = zlib.decompressobj()
104        y1 = dco.decompress(x1 + x2)
105        y2 = dco.flush()
106        self.assertEqual(data, y1 + y2)
107
108    def test_compressoptions(self):
109        # specify lots of options to compressobj()
110        level = 2
111        method = zlib.DEFLATED
112        wbits = -12
113        memlevel = 9
114        strategy = zlib.Z_FILTERED
115        co = zlib.compressobj(level, method, wbits, memlevel, strategy)
116        x1 = co.compress(HAMLET_SCENE)
117        x2 = co.flush()
118        dco = zlib.decompressobj(wbits)
119        y1 = dco.decompress(x1 + x2)
120        y2 = dco.flush()
121        self.assertEqual(HAMLET_SCENE, y1 + y2)
122
123    def test_compressincremental(self):
124        # compress object in steps, decompress object as one-shot
125        data = HAMLET_SCENE * 128
126        co = zlib.compressobj()
127        bufs = []
128        for i in range(0, len(data), 256):
129            bufs.append(co.compress(data[i:i+256]))
130        bufs.append(co.flush())
131        combuf = ''.join(bufs)
132
133        dco = zlib.decompressobj()
134        y1 = dco.decompress(''.join(bufs))
135        y2 = dco.flush()
136        self.assertEqual(data, y1 + y2)
137
138    def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
139        # compress object in steps, decompress object in steps
140        source = source or HAMLET_SCENE
141        data = source * 128
142        co = zlib.compressobj()
143        bufs = []
144        for i in range(0, len(data), cx):
145            bufs.append(co.compress(data[i:i+cx]))
146        bufs.append(co.flush())
147        combuf = ''.join(bufs)
148
149        self.assertEqual(data, zlib.decompress(combuf))
150
151        dco = zlib.decompressobj()
152        bufs = []
153        for i in range(0, len(combuf), dcx):
154            bufs.append(dco.decompress(combuf[i:i+dcx]))
155            self.assertEqual('', dco.unconsumed_tail, ########
156                             "(A) uct should be '': not %d long" %
157                                       len(dco.unconsumed_tail))
158        if flush:
159            bufs.append(dco.flush())
160        else:
161            while True:
162                chunk = dco.decompress('')
163                if chunk:
164                    bufs.append(chunk)
165                else:
166                    break
167        self.assertEqual('', dco.unconsumed_tail, ########
168                         "(B) uct should be '': not %d long" %
169                                       len(dco.unconsumed_tail))
170        self.assertEqual(data, ''.join(bufs))
171        # Failure means: "decompressobj with init options failed"
172
173    def test_decompincflush(self):
174        self.test_decompinc(flush=True)
175
176    def test_decompimax(self, source=None, cx=256, dcx=64):
177        # compress in steps, decompress in length-restricted steps
178        source = source or HAMLET_SCENE
179        # Check a decompression object with max_length specified
180        data = source * 128
181        co = zlib.compressobj()
182        bufs = []
183        for i in range(0, len(data), cx):
184            bufs.append(co.compress(data[i:i+cx]))
185        bufs.append(co.flush())
186        combuf = ''.join(bufs)
187        self.assertEqual(data, zlib.decompress(combuf),
188                         'compressed data failure')
189
190        dco = zlib.decompressobj()
191        bufs = []
192        cb = combuf
193        while cb:
194            #max_length = 1 + len(cb)//10
195            chunk = dco.decompress(cb, dcx)
196            self.failIf(len(chunk) > dcx,
197                    'chunk too big (%d>%d)' % (len(chunk), dcx))
198            bufs.append(chunk)
199            cb = dco.unconsumed_tail
200        bufs.append(dco.flush())
201        self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
202
203    def test_decompressmaxlen(self, flush=False):
204        # Check a decompression object with max_length specified
205        data = HAMLET_SCENE * 128
206        co = zlib.compressobj()
207        bufs = []
208        for i in range(0, len(data), 256):
209            bufs.append(co.compress(data[i:i+256]))
210        bufs.append(co.flush())
211        combuf = ''.join(bufs)
212        self.assertEqual(data, zlib.decompress(combuf),
213                         'compressed data failure')
214
215        dco = zlib.decompressobj()
216        bufs = []
217        cb = combuf
218        while cb:
219            max_length = 1 + len(cb)//10
220            chunk = dco.decompress(cb, max_length)
221            self.failIf(len(chunk) > max_length,
222                        'chunk too big (%d>%d)' % (len(chunk),max_length))
223            bufs.append(chunk)
224            cb = dco.unconsumed_tail
225        if flush:
226            bufs.append(dco.flush())
227        else:
228            while chunk:
229                chunk = dco.decompress('', max_length)
230                self.failIf(len(chunk) > max_length,
231                            'chunk too big (%d>%d)' % (len(chunk),max_length))
232                bufs.append(chunk)
233        self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
234
235    def test_decompressmaxlenflush(self):
236        self.test_decompressmaxlen(flush=True)
237
238    def test_maxlenmisc(self):
239        # Misc tests of max_length
240        dco = zlib.decompressobj()
241        self.assertRaises(ValueError, dco.decompress, "", -1)
242        self.assertEqual('', dco.unconsumed_tail)
243
244    def test_flushes(self):
245        # Test flush() with the various options, using all the
246        # different levels in order to provide more variations.
247        sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH']
248        sync_opt = [getattr(zlib, opt) for opt in sync_opt
249                    if hasattr(zlib, opt)]
250        data = HAMLET_SCENE * 8
251
252        for sync in sync_opt:
253            for level in range(10):
254                obj = zlib.compressobj( level )
255                a = obj.compress( data[:3000] )
256                b = obj.flush( sync )
257                c = obj.compress( data[3000:] )
258                d = obj.flush()
259                self.assertEqual(zlib.decompress(''.join([a,b,c,d])),
260                                 data, ("Decompress failed: flush "
261                                        "mode=%i, level=%i") % (sync, level))
262                del obj
263
264    def test_odd_flush(self):
265        # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
266        import random
267
268        if hasattr(zlib, 'Z_SYNC_FLUSH'):
269            # Testing on 17K of "random" data
270
271            # Create compressor and decompressor objects
272            co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
273            dco = zlib.decompressobj()
274
275            # Try 17K of data
276            # generate random data stream
277            try:
278                # In 2.3 and later, WichmannHill is the RNG of the bug report
279                gen = random.WichmannHill()
280            except AttributeError:
281                try:
282                    # 2.2 called it Random
283                    gen = random.Random()
284                except AttributeError:
285                    # others might simply have a single RNG
286                    gen = random
287            gen.seed(1)
288            data = genblock(1, 17 * 1024, generator=gen)
289
290            # compress, sync-flush, and decompress
291            first = co.compress(data)
292            second = co.flush(zlib.Z_SYNC_FLUSH)
293            expanded = dco.decompress(first + second)
294
295            # if decompressed data is different from the input data, choke.
296            self.assertEqual(expanded, data, "17K random source doesn't match")
297
298    def test_empty_flush(self):
299        # Test that calling .flush() on unused objects works.
300        # (Bug #1083110 -- calling .flush() on decompress objects
301        # caused a core dump.)
302
303        co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
304        self.failUnless(co.flush())  # Returns a zlib header
305        dco = zlib.decompressobj()
306        self.assertEqual(dco.flush(), "") # Returns nothing
307
308    if hasattr(zlib.compressobj(), "copy"):
309        def test_compresscopy(self):
310            # Test copying a compression object
311            data0 = HAMLET_SCENE
312            data1 = HAMLET_SCENE.swapcase()
313            c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
314            bufs0 = []
315            bufs0.append(c0.compress(data0))
316
317            c1 = c0.copy()
318            bufs1 = bufs0[:]
319
320            bufs0.append(c0.compress(data0))
321            bufs0.append(c0.flush())
322            s0 = ''.join(bufs0)
323
324            bufs1.append(c1.compress(data1))
325            bufs1.append(c1.flush())
326            s1 = ''.join(bufs1)
327
328            self.assertEqual(zlib.decompress(s0),data0+data0)
329            self.assertEqual(zlib.decompress(s1),data0+data1)
330
331        def test_badcompresscopy(self):
332            # Test copying a compression object in an inconsistent state
333            c = zlib.compressobj()
334            c.compress(HAMLET_SCENE)
335            c.flush()
336            self.assertRaises(ValueError, c.copy)
337
338    if hasattr(zlib.decompressobj(), "copy"):
339        def test_decompresscopy(self):
340            # Test copying a decompression object
341            data = HAMLET_SCENE
342            comp = zlib.compress(data)
343
344            d0 = zlib.decompressobj()
345            bufs0 = []
346            bufs0.append(d0.decompress(comp[:32]))
347
348            d1 = d0.copy()
349            bufs1 = bufs0[:]
350
351            bufs0.append(d0.decompress(comp[32:]))
352            s0 = ''.join(bufs0)
353
354            bufs1.append(d1.decompress(comp[32:]))
355            s1 = ''.join(bufs1)
356
357            self.assertEqual(s0,s1)
358            self.assertEqual(s0,data)
359
360        def test_baddecompresscopy(self):
361            # Test copying a compression object in an inconsistent state
362            data = zlib.compress(HAMLET_SCENE)
363            d = zlib.decompressobj()
364            d.decompress(data)
365            d.flush()
366            self.assertRaises(ValueError, d.copy)
367
368def genblock(seed, length, step=1024, generator=random):
369    """length-byte stream of random data from a seed (in step-byte blocks)."""
370    if seed is not None:
371        generator.seed(seed)
372    randint = generator.randint
373    if length < step or step < 2:
374        step = length
375    blocks = []
376    for i in range(0, length, step):
377        blocks.append(''.join([chr(randint(0,255))
378                               for x in range(step)]))
379    return ''.join(blocks)[:length]
380
381
382
383def choose_lines(source, number, seed=None, generator=random):
384    """Return a list of number lines randomly chosen from the source"""
385    if seed is not None:
386        generator.seed(seed)
387    sources = source.split('\n')
388    return [generator.choice(sources) for n in range(number)]
389
390
391
392HAMLET_SCENE = """
393LAERTES
394
395       O, fear me not.
396       I stay too long: but here my father comes.
397
398       Enter POLONIUS
399
400       A double blessing is a double grace,
401       Occasion smiles upon a second leave.
402
403LORD POLONIUS
404
405       Yet here, Laertes! aboard, aboard, for shame!
406       The wind sits in the shoulder of your sail,
407       And you are stay'd for. There; my blessing with thee!
408       And these few precepts in thy memory
409       See thou character. Give thy thoughts no tongue,
410       Nor any unproportioned thought his act.
411       Be thou familiar, but by no means vulgar.
412       Those friends thou hast, and their adoption tried,
413       Grapple them to thy soul with hoops of steel;
414       But do not dull thy palm with entertainment
415       Of each new-hatch'd, unfledged comrade. Beware
416       Of entrance to a quarrel, but being in,
417       Bear't that the opposed may beware of thee.
418       Give every man thy ear, but few thy voice;
419       Take each man's censure, but reserve thy judgment.
420       Costly thy habit as thy purse can buy,
421       But not express'd in fancy; rich, not gaudy;
422       For the apparel oft proclaims the man,
423       And they in France of the best rank and station
424       Are of a most select and generous chief in that.
425       Neither a borrower nor a lender be;
426       For loan oft loses both itself and friend,
427       And borrowing dulls the edge of husbandry.
428       This above all: to thine ownself be true,
429       And it must follow, as the night the day,
430       Thou canst not then be false to any man.
431       Farewell: my blessing season this in thee!
432
433LAERTES
434
435       Most humbly do I take my leave, my lord.
436
437LORD POLONIUS
438
439       The time invites you; go; your servants tend.
440
441LAERTES
442
443       Farewell, Ophelia; and remember well
444       What I have said to you.
445
446OPHELIA
447
448       'Tis in my memory lock'd,
449       And you yourself shall keep the key of it.
450
451LAERTES
452
453       Farewell.
454"""
455
456
457def test_main():
458    test_support.run_unittest(
459        ChecksumTestCase,
460        ExceptionTestCase,
461        CompressTestCase,
462        CompressObjectTestCase
463    )
464
465if __name__ == "__main__":
466    test_main()
467