test_zlib.py revision f359410ce22749da69b8adb5a3c23f78876292c6
1import unittest
2from test import test_support
3import zlib
4import random
5
6# print test_support.TESTFN
7
8def getbuf():
9    # This was in the original.  Avoid non-repeatable sources.
10    # Left here (unused) in case something wants to be done with it.
11    import imp
12    try:
13        t = imp.find_module('test_zlib')
14        file = t[0]
15    except ImportError:
16        file = open(__file__)
17    buf = file.read() * 8
18    file.close()
19    return buf
20
21
22
23class ChecksumTestCase(unittest.TestCase):
24    # checksum test cases
25    def test_crc32start(self):
26        self.assertEqual(zlib.crc32(""), zlib.crc32("", 0))
27
28    def test_crc32empty(self):
29        self.assertEqual(zlib.crc32("", 0), 0)
30        self.assertEqual(zlib.crc32("", 1), 1)
31        self.assertEqual(zlib.crc32("", 432), 432)
32
33    def test_adler32start(self):
34        self.assertEqual(zlib.adler32(""), zlib.adler32("", 1))
35
36    def test_adler32empty(self):
37        self.assertEqual(zlib.adler32("", 0), 0)
38        self.assertEqual(zlib.adler32("", 1), 1)
39        self.assertEqual(zlib.adler32("", 432), 432)
40
41    def assertEqual32(self, seen, expected):
42        # 32-bit values masked -- checksums on 32- vs 64- bit machines
43        # This is important if bit 31 (0x08000000L) is set.
44        self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL)
45
46    def test_penguins(self):
47        self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L)
48        self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94)
49        self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6)
50        self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7)
51
52        self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0))
53        self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1))
54
55
56
57class ExceptionTestCase(unittest.TestCase):
58    # make sure we generate some expected errors
59    def test_bigbits(self):
60        # specifying total bits too large causes an error
61        self.assertRaises(zlib.error,
62                zlib.compress, 'ERROR', zlib.MAX_WBITS + 1)
63
64    def test_badcompressobj(self):
65        # verify failure on building compress object with bad params
66        self.assertRaises(ValueError, zlib.compressobj, 1, 8, 0)
67
68    def test_baddecompressobj(self):
69        # verify failure on building decompress object with bad params
70        self.assertRaises(ValueError, zlib.decompressobj, 0)
71
72
73
74class CompressTestCase(unittest.TestCase):
75    # Test compression in one go (whole message compression)
76    def test_speech(self):
77        # decompress(compress(data)) better be data
78        x = zlib.compress(hamlet_scene)
79        self.assertEqual(zlib.decompress(x), hamlet_scene)
80
81    def test_speech8(self):
82        # decompress(compress(data)) better be data -- more compression chances
83        data = hamlet_scene * 8
84        x = zlib.compress(data)
85        self.assertEqual(zlib.decompress(x), data)
86
87    def test_speech16(self):
88        # decompress(compress(data)) better be data -- more compression chances
89        data = hamlet_scene * 16
90        x = zlib.compress(data)
91        self.assertEqual(zlib.decompress(x), data)
92
93    def test_speech128(self):
94        # decompress(compress(data)) better be data -- more compression chances
95        data = hamlet_scene * 8 * 16
96        x = zlib.compress(data)
97        self.assertEqual(zlib.decompress(x), data)
98
99    def test_monotonic(self):
100        # higher compression levels should not expand compressed size
101        data = hamlet_scene * 8 * 16
102        last = length = len(zlib.compress(data, 0))
103        self.failUnless(last > len(data), "compress level 0 always expands")
104        for level in range(10):
105            length = len(zlib.compress(data, level))
106            self.failUnless(length <= last,
107                            'compress level %d more effective than %d!' % (
108                                            level-1, level))
109            last = length
110
111
112
113class CompressObjectTestCase(unittest.TestCase):
114    # Test compression object
115    def test_pairsmall(self):
116        # use compress object in straightforward manner, decompress w/ object
117        data = hamlet_scene
118        co = zlib.compressobj(8, 8, -15)
119        x1 = co.compress(data)
120        x2 = co.flush()
121        self.assertRaises(zlib.error, co.flush) # second flush should not work
122        dco = zlib.decompressobj(-15)
123        y1 = dco.decompress(x1 + x2)
124        y2 = dco.flush()
125        self.assertEqual(data, y1 + y2)
126
127    def test_pair(self):
128        # straightforward compress/decompress objects, more compression
129        data = hamlet_scene * 8 * 16
130        co = zlib.compressobj(8, 8, -15)
131        x1 = co.compress(data)
132        x2 = co.flush()
133        self.assertRaises(zlib.error, co.flush) # second flush should not work
134        dco = zlib.decompressobj(-15)
135        y1 = dco.decompress(x1 + x2)
136        y2 = dco.flush()
137        self.assertEqual(data, y1 + y2)
138
139    def test_compressincremental(self):
140        # compress object in steps, decompress object as one-shot
141        data = hamlet_scene * 8 * 16
142        co = zlib.compressobj(2, 8, -12, 9, 1)
143        bufs = []
144        for i in range(0, len(data), 256):
145            bufs.append(co.compress(data[i:i+256]))
146        bufs.append(co.flush())
147        combuf = ''.join(bufs)
148
149        dco = zlib.decompressobj(-15)
150        y1 = dco.decompress(''.join(bufs))
151        y2 = dco.flush()
152        self.assertEqual(data, y1 + y2)
153
154    def test_decompressincremental(self):
155        # compress object in steps, decompress object in steps
156        data = hamlet_scene * 8 * 16
157        co = zlib.compressobj(2, 8, -12, 9, 1)
158        bufs = []
159        for i in range(0, len(data), 256):
160            bufs.append(co.compress(data[i:i+256]))
161        bufs.append(co.flush())
162        combuf = ''.join(bufs)
163
164        self.assertEqual(data, zlib.decompress(combuf, -12, -5))
165
166        dco = zlib.decompressobj(-12)
167        bufs = []
168        for i in range(0, len(combuf), 128):
169            bufs.append(dco.decompress(combuf[i:i+128]))
170            self.assertEqual('', dco.unconsumed_tail, ########
171                             "(A) uct should be '': not %d long" %
172                                           len(dco.unconsumed_tail))
173        bufs.append(dco.flush())
174        self.assertEqual('', dco.unconsumed_tail, ########
175                             "(B) uct should be '': not %d long" %
176                                           len(dco.unconsumed_tail))
177        self.assertEqual(data, ''.join(bufs))
178        # Failure means: "decompressobj with init options failed"
179
180    def test_decompinc(self,sizes=[128],flush=True,source=None,cx=256,dcx=64):
181        # compress object in steps, decompress object in steps, loop sizes
182        source = source or hamlet_scene
183        for reps in sizes:
184            data = source * reps
185            co = zlib.compressobj(2, 8, -12, 9, 1)
186            bufs = []
187            for i in range(0, len(data), cx):
188                bufs.append(co.compress(data[i:i+cx]))
189            bufs.append(co.flush())
190            combuf = ''.join(bufs)
191
192            self.assertEqual(data, zlib.decompress(combuf, -12, -5))
193
194            dco = zlib.decompressobj(-12)
195            bufs = []
196            for i in range(0, len(combuf), dcx):
197                bufs.append(dco.decompress(combuf[i:i+dcx]))
198                self.assertEqual('', dco.unconsumed_tail, ########
199                                 "(A) uct should be '': not %d long" %
200                                           len(dco.unconsumed_tail))
201            if flush:
202                bufs.append(dco.flush())
203            else:
204                while True:
205                    chunk = dco.decompress('')
206                    if chunk:
207                        bufs.append(chunk)
208                    else:
209                        break
210            self.assertEqual('', dco.unconsumed_tail, ########
211                             "(B) uct should be '': not %d long" %
212                                           len(dco.unconsumed_tail))
213            self.assertEqual(data, ''.join(bufs))
214            # Failure means: "decompressobj with init options failed"
215
216    def test_decompimax(self,sizes=[128],flush=True,source=None,cx=256,dcx=64):
217        # compress in steps, decompress in length-restricted steps, loop sizes
218        source = source or hamlet_scene
219        for reps in sizes:
220            # Check a decompression object with max_length specified
221            data = source * reps
222            co = zlib.compressobj(2, 8, -12, 9, 1)
223            bufs = []
224            for i in range(0, len(data), cx):
225                bufs.append(co.compress(data[i:i+cx]))
226            bufs.append(co.flush())
227            combuf = ''.join(bufs)
228            self.assertEqual(data, zlib.decompress(combuf, -12, -5),
229                             'compressed data failure')
230
231            dco = zlib.decompressobj(-12)
232            bufs = []
233            cb = combuf
234            while cb:
235                #max_length = 1 + len(cb)//10
236                chunk = dco.decompress(cb, dcx)
237                self.failIf(len(chunk) > dcx,
238                        'chunk too big (%d>%d)' % (len(chunk), dcx))
239                bufs.append(chunk)
240                cb = dco.unconsumed_tail
241            if flush:
242                bufs.append(dco.flush())
243            else:
244                while True:
245                    chunk = dco.decompress('', dcx)
246                    self.failIf(len(chunk) > dcx,
247                        'chunk too big in tail (%d>%d)' % (len(chunk), dcx))
248                    if chunk:
249                        bufs.append(chunk)
250                    else:
251                        break
252            self.assertEqual(len(data), len(''.join(bufs)))
253            self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
254
255    def test_decompressmaxlen(self):
256        # Check a decompression object with max_length specified
257        data = hamlet_scene * 8 * 16
258        co = zlib.compressobj(2, 8, -12, 9, 1)
259        bufs = []
260        for i in range(0, len(data), 256):
261            bufs.append(co.compress(data[i:i+256]))
262        bufs.append(co.flush())
263        combuf = ''.join(bufs)
264        self.assertEqual(data, zlib.decompress(combuf, -12, -5),
265                         'compressed data failure')
266
267        dco = zlib.decompressobj(-12)
268        bufs = []
269        cb = combuf
270        while cb:
271            max_length = 1 + len(cb)//10
272            chunk = dco.decompress(cb, max_length)
273            self.failIf(len(chunk) > max_length,
274                        'chunk too big (%d>%d)' % (len(chunk),max_length))
275            bufs.append(chunk)
276            cb = dco.unconsumed_tail
277        bufs.append(dco.flush())
278        self.assertEqual(len(data), len(''.join(bufs)))
279        self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
280
281    def test_decompressmaxlenflushless(self):
282        # identical to test_decompressmaxlen except flush is replaced
283        # with an equivalent.  This works and other fails on (eg) 2.2.2
284        data = hamlet_scene * 8 * 16
285        co = zlib.compressobj(2, 8, -12, 9, 1)
286        bufs = []
287        for i in range(0, len(data), 256):
288            bufs.append(co.compress(data[i:i+256]))
289        bufs.append(co.flush())
290        combuf = ''.join(bufs)
291        self.assertEqual(data, zlib.decompress(combuf, -12, -5),
292                         'compressed data mismatch')
293
294        dco = zlib.decompressobj(-12)
295        bufs = []
296        cb = combuf
297        while cb:
298            max_length = 1 + len(cb)//10
299            chunk = dco.decompress(cb, max_length)
300            self.failIf(len(chunk) > max_length,
301                        'chunk too big (%d>%d)' % (len(chunk),max_length))
302            bufs.append(chunk)
303            cb = dco.unconsumed_tail
304
305        #bufs.append(dco.flush())
306        while len(chunk):
307            chunk = dco.decompress('', max_length)
308            self.failIf(len(chunk) > max_length,
309                        'chunk too big (%d>%d)' % (len(chunk),max_length))
310            bufs.append(chunk)
311
312        self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
313
314    def test_maxlenmisc(self):
315        # Misc tests of max_length
316        dco = zlib.decompressobj(-12)
317        self.assertRaises(ValueError, dco.decompress, "", -1)
318        self.assertEqual('', dco.unconsumed_tail)
319
320    def test_flushes(self):
321        # Test flush() with the various options, using all the
322        # different levels in order to provide more variations.
323        sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH']
324        sync_opt = [getattr(zlib, opt) for opt in sync_opt
325                    if hasattr(zlib, opt)]
326        data = hamlet_scene * 8
327
328        for sync in sync_opt:
329            for level in range(10):
330                obj = zlib.compressobj( level )
331                a = obj.compress( data[:3000] )
332                b = obj.flush( sync )
333                c = obj.compress( data[3000:] )
334                d = obj.flush()
335                self.assertEqual(zlib.decompress(''.join([a,b,c,d])),
336                                 data, ("Decompress failed: flush "
337                                        "mode=%i, level=%i") % (sync, level))
338                del obj
339
340    def test_odd_flush(self):
341        # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
342        import random
343
344        if hasattr(zlib, 'Z_SYNC_FLUSH'):
345            # Testing on 17K of "random" data
346
347            # Create compressor and decompressor objects
348            co = zlib.compressobj(9)
349            dco = zlib.decompressobj()
350
351            # Try 17K of data
352            # generate random data stream
353            try:
354                # In 2.3 and later, WichmannHill is the RNG of the bug report
355                gen = random.WichmannHill()
356            except AttributeError:
357                try:
358                    # 2.2 called it Random
359                    gen = random.Random()
360                except AttributeError:
361                    # others might simply have a single RNG
362                    gen = random
363            gen.seed(1)
364            data = genblock(1, 17 * 1024, generator=gen)
365
366            # compress, sync-flush, and decompress
367            first = co.compress(data)
368            second = co.flush(zlib.Z_SYNC_FLUSH)
369            expanded = dco.decompress(first + second)
370
371            # if decompressed data is different from the input data, choke.
372            self.assertEqual(expanded, data, "17K random source doesn't match")
373
374    def test_manydecompinc(self):
375        # Run incremental decompress test for a large range of sizes
376        self.test_decompinc(sizes=[1<<n for n in range(8)],
377                             flush=True, cx=32, dcx=4)
378
379    def test_manydecompimax(self):
380        # Run incremental decompress maxlen test for a large range of sizes
381        # avoid the flush bug
382        self.test_decompimax(sizes=[1<<n for n in range(8)],
383                             flush=False, cx=32, dcx=4)
384
385    def test_manydecompimaxflush(self):
386        # Run incremental decompress maxlen test for a large range of sizes
387        # avoid the flush bug
388        self.test_decompimax(sizes=[1<<n for n in range(8)],
389                             flush=True, cx=32, dcx=4)
390
391
392def genblock(seed, length, step=1024, generator=random):
393    """length-byte stream of random data from a seed (in step-byte blocks)."""
394    if seed is not None:
395        generator.seed(seed)
396    randint = generator.randint
397    if length < step or step < 2:
398        step = length
399    blocks = []
400    for i in range(0, length, step):
401        blocks.append(''.join([chr(randint(0,255))
402                               for x in range(step)]))
403    return ''.join(blocks)[:length]
404
405
406
407def choose_lines(source, number, seed=None, generator=random):
408    """Return a list of number lines randomly chosen from the source"""
409    if seed is not None:
410        generator.seed(seed)
411    sources = source.split('\n')
412    return [generator.choice(sources) for n in range(number)]
413
414
415
416hamlet_scene = """
417LAERTES
418
419       O, fear me not.
420       I stay too long: but here my father comes.
421
422       Enter POLONIUS
423
424       A double blessing is a double grace,
425       Occasion smiles upon a second leave.
426
427LORD POLONIUS
428
429       Yet here, Laertes! aboard, aboard, for shame!
430       The wind sits in the shoulder of your sail,
431       And you are stay'd for. There; my blessing with thee!
432       And these few precepts in thy memory
433       See thou character. Give thy thoughts no tongue,
434       Nor any unproportioned thought his act.
435       Be thou familiar, but by no means vulgar.
436       Those friends thou hast, and their adoption tried,
437       Grapple them to thy soul with hoops of steel;
438       But do not dull thy palm with entertainment
439       Of each new-hatch'd, unfledged comrade. Beware
440       Of entrance to a quarrel, but being in,
441       Bear't that the opposed may beware of thee.
442       Give every man thy ear, but few thy voice;
443       Take each man's censure, but reserve thy judgment.
444       Costly thy habit as thy purse can buy,
445       But not express'd in fancy; rich, not gaudy;
446       For the apparel oft proclaims the man,
447       And they in France of the best rank and station
448       Are of a most select and generous chief in that.
449       Neither a borrower nor a lender be;
450       For loan oft loses both itself and friend,
451       And borrowing dulls the edge of husbandry.
452       This above all: to thine ownself be true,
453       And it must follow, as the night the day,
454       Thou canst not then be false to any man.
455       Farewell: my blessing season this in thee!
456
457LAERTES
458
459       Most humbly do I take my leave, my lord.
460
461LORD POLONIUS
462
463       The time invites you; go; your servants tend.
464
465LAERTES
466
467       Farewell, Ophelia; and remember well
468       What I have said to you.
469
470OPHELIA
471
472       'Tis in my memory lock'd,
473       And you yourself shall keep the key of it.
474
475LAERTES
476
477       Farewell.
478"""
479
480
481def test_main():
482    suite = unittest.TestSuite()
483    suite.addTest(unittest.makeSuite(ChecksumTestCase))
484    suite.addTest(unittest.makeSuite(ExceptionTestCase))
485    suite.addTest(unittest.makeSuite(CompressTestCase))
486    suite.addTest(unittest.makeSuite(CompressObjectTestCase))
487    test_support.run_suite(suite)
488
489if __name__ == "__main__":
490    test_main()
491
492def test(tests=''):
493    if not tests: tests = 'o'
494    suite = unittest.TestSuite()
495    if 'k' in tests: suite.addTest(unittest.makeSuite(ChecksumTestCase))
496    if 'x' in tests: suite.addTest(unittest.makeSuite(ExceptionTestCase))
497    if 'c' in tests: suite.addTest(unittest.makeSuite(CompressTestCase))
498    if 'o' in tests: suite.addTest(unittest.makeSuite(CompressObjectTestCase))
499    test_support.run_suite(suite)
500
501if False:
502    import sys
503    sys.path.insert(1, '/Py23Src/python/dist/src/Lib/test')
504    import test_zlib as tz
505    ts, ut = tz.test_support, tz.unittest
506    su = ut.TestSuite()
507    su.addTest(ut.makeSuite(tz.CompressTestCase))
508    ts.run_suite(su)
509