test_zlib.py revision f359410ce22749da69b8adb5a3c23f78876292c6
1import unittest 2from test import test_support 3import zlib 4import random 5 6# print test_support.TESTFN 7 8def getbuf(): 9 # This was in the original. Avoid non-repeatable sources. 10 # Left here (unused) in case something wants to be done with it. 11 import imp 12 try: 13 t = imp.find_module('test_zlib') 14 file = t[0] 15 except ImportError: 16 file = open(__file__) 17 buf = file.read() * 8 18 file.close() 19 return buf 20 21 22 23class ChecksumTestCase(unittest.TestCase): 24 # checksum test cases 25 def test_crc32start(self): 26 self.assertEqual(zlib.crc32(""), zlib.crc32("", 0)) 27 28 def test_crc32empty(self): 29 self.assertEqual(zlib.crc32("", 0), 0) 30 self.assertEqual(zlib.crc32("", 1), 1) 31 self.assertEqual(zlib.crc32("", 432), 432) 32 33 def test_adler32start(self): 34 self.assertEqual(zlib.adler32(""), zlib.adler32("", 1)) 35 36 def test_adler32empty(self): 37 self.assertEqual(zlib.adler32("", 0), 0) 38 self.assertEqual(zlib.adler32("", 1), 1) 39 self.assertEqual(zlib.adler32("", 432), 432) 40 41 def assertEqual32(self, seen, expected): 42 # 32-bit values masked -- checksums on 32- vs 64- bit machines 43 # This is important if bit 31 (0x08000000L) is set. 44 self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL) 45 46 def test_penguins(self): 47 self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L) 48 self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94) 49 self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6) 50 self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7) 51 52 self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0)) 53 self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1)) 54 55 56 57class ExceptionTestCase(unittest.TestCase): 58 # make sure we generate some expected errors 59 def test_bigbits(self): 60 # specifying total bits too large causes an error 61 self.assertRaises(zlib.error, 62 zlib.compress, 'ERROR', zlib.MAX_WBITS + 1) 63 64 def test_badcompressobj(self): 65 # verify failure on building compress object with bad params 66 self.assertRaises(ValueError, zlib.compressobj, 1, 8, 0) 67 68 def test_baddecompressobj(self): 69 # verify failure on building decompress object with bad params 70 self.assertRaises(ValueError, zlib.decompressobj, 0) 71 72 73 74class CompressTestCase(unittest.TestCase): 75 # Test compression in one go (whole message compression) 76 def test_speech(self): 77 # decompress(compress(data)) better be data 78 x = zlib.compress(hamlet_scene) 79 self.assertEqual(zlib.decompress(x), hamlet_scene) 80 81 def test_speech8(self): 82 # decompress(compress(data)) better be data -- more compression chances 83 data = hamlet_scene * 8 84 x = zlib.compress(data) 85 self.assertEqual(zlib.decompress(x), data) 86 87 def test_speech16(self): 88 # decompress(compress(data)) better be data -- more compression chances 89 data = hamlet_scene * 16 90 x = zlib.compress(data) 91 self.assertEqual(zlib.decompress(x), data) 92 93 def test_speech128(self): 94 # decompress(compress(data)) better be data -- more compression chances 95 data = hamlet_scene * 8 * 16 96 x = zlib.compress(data) 97 self.assertEqual(zlib.decompress(x), data) 98 99 def test_monotonic(self): 100 # higher compression levels should not expand compressed size 101 data = hamlet_scene * 8 * 16 102 last = length = len(zlib.compress(data, 0)) 103 self.failUnless(last > len(data), "compress level 0 always expands") 104 for level in range(10): 105 length = len(zlib.compress(data, level)) 106 self.failUnless(length <= last, 107 'compress level %d more effective than %d!' % ( 108 level-1, level)) 109 last = length 110 111 112 113class CompressObjectTestCase(unittest.TestCase): 114 # Test compression object 115 def test_pairsmall(self): 116 # use compress object in straightforward manner, decompress w/ object 117 data = hamlet_scene 118 co = zlib.compressobj(8, 8, -15) 119 x1 = co.compress(data) 120 x2 = co.flush() 121 self.assertRaises(zlib.error, co.flush) # second flush should not work 122 dco = zlib.decompressobj(-15) 123 y1 = dco.decompress(x1 + x2) 124 y2 = dco.flush() 125 self.assertEqual(data, y1 + y2) 126 127 def test_pair(self): 128 # straightforward compress/decompress objects, more compression 129 data = hamlet_scene * 8 * 16 130 co = zlib.compressobj(8, 8, -15) 131 x1 = co.compress(data) 132 x2 = co.flush() 133 self.assertRaises(zlib.error, co.flush) # second flush should not work 134 dco = zlib.decompressobj(-15) 135 y1 = dco.decompress(x1 + x2) 136 y2 = dco.flush() 137 self.assertEqual(data, y1 + y2) 138 139 def test_compressincremental(self): 140 # compress object in steps, decompress object as one-shot 141 data = hamlet_scene * 8 * 16 142 co = zlib.compressobj(2, 8, -12, 9, 1) 143 bufs = [] 144 for i in range(0, len(data), 256): 145 bufs.append(co.compress(data[i:i+256])) 146 bufs.append(co.flush()) 147 combuf = ''.join(bufs) 148 149 dco = zlib.decompressobj(-15) 150 y1 = dco.decompress(''.join(bufs)) 151 y2 = dco.flush() 152 self.assertEqual(data, y1 + y2) 153 154 def test_decompressincremental(self): 155 # compress object in steps, decompress object in steps 156 data = hamlet_scene * 8 * 16 157 co = zlib.compressobj(2, 8, -12, 9, 1) 158 bufs = [] 159 for i in range(0, len(data), 256): 160 bufs.append(co.compress(data[i:i+256])) 161 bufs.append(co.flush()) 162 combuf = ''.join(bufs) 163 164 self.assertEqual(data, zlib.decompress(combuf, -12, -5)) 165 166 dco = zlib.decompressobj(-12) 167 bufs = [] 168 for i in range(0, len(combuf), 128): 169 bufs.append(dco.decompress(combuf[i:i+128])) 170 self.assertEqual('', dco.unconsumed_tail, ######## 171 "(A) uct should be '': not %d long" % 172 len(dco.unconsumed_tail)) 173 bufs.append(dco.flush()) 174 self.assertEqual('', dco.unconsumed_tail, ######## 175 "(B) uct should be '': not %d long" % 176 len(dco.unconsumed_tail)) 177 self.assertEqual(data, ''.join(bufs)) 178 # Failure means: "decompressobj with init options failed" 179 180 def test_decompinc(self,sizes=[128],flush=True,source=None,cx=256,dcx=64): 181 # compress object in steps, decompress object in steps, loop sizes 182 source = source or hamlet_scene 183 for reps in sizes: 184 data = source * reps 185 co = zlib.compressobj(2, 8, -12, 9, 1) 186 bufs = [] 187 for i in range(0, len(data), cx): 188 bufs.append(co.compress(data[i:i+cx])) 189 bufs.append(co.flush()) 190 combuf = ''.join(bufs) 191 192 self.assertEqual(data, zlib.decompress(combuf, -12, -5)) 193 194 dco = zlib.decompressobj(-12) 195 bufs = [] 196 for i in range(0, len(combuf), dcx): 197 bufs.append(dco.decompress(combuf[i:i+dcx])) 198 self.assertEqual('', dco.unconsumed_tail, ######## 199 "(A) uct should be '': not %d long" % 200 len(dco.unconsumed_tail)) 201 if flush: 202 bufs.append(dco.flush()) 203 else: 204 while True: 205 chunk = dco.decompress('') 206 if chunk: 207 bufs.append(chunk) 208 else: 209 break 210 self.assertEqual('', dco.unconsumed_tail, ######## 211 "(B) uct should be '': not %d long" % 212 len(dco.unconsumed_tail)) 213 self.assertEqual(data, ''.join(bufs)) 214 # Failure means: "decompressobj with init options failed" 215 216 def test_decompimax(self,sizes=[128],flush=True,source=None,cx=256,dcx=64): 217 # compress in steps, decompress in length-restricted steps, loop sizes 218 source = source or hamlet_scene 219 for reps in sizes: 220 # Check a decompression object with max_length specified 221 data = source * reps 222 co = zlib.compressobj(2, 8, -12, 9, 1) 223 bufs = [] 224 for i in range(0, len(data), cx): 225 bufs.append(co.compress(data[i:i+cx])) 226 bufs.append(co.flush()) 227 combuf = ''.join(bufs) 228 self.assertEqual(data, zlib.decompress(combuf, -12, -5), 229 'compressed data failure') 230 231 dco = zlib.decompressobj(-12) 232 bufs = [] 233 cb = combuf 234 while cb: 235 #max_length = 1 + len(cb)//10 236 chunk = dco.decompress(cb, dcx) 237 self.failIf(len(chunk) > dcx, 238 'chunk too big (%d>%d)' % (len(chunk), dcx)) 239 bufs.append(chunk) 240 cb = dco.unconsumed_tail 241 if flush: 242 bufs.append(dco.flush()) 243 else: 244 while True: 245 chunk = dco.decompress('', dcx) 246 self.failIf(len(chunk) > dcx, 247 'chunk too big in tail (%d>%d)' % (len(chunk), dcx)) 248 if chunk: 249 bufs.append(chunk) 250 else: 251 break 252 self.assertEqual(len(data), len(''.join(bufs))) 253 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') 254 255 def test_decompressmaxlen(self): 256 # Check a decompression object with max_length specified 257 data = hamlet_scene * 8 * 16 258 co = zlib.compressobj(2, 8, -12, 9, 1) 259 bufs = [] 260 for i in range(0, len(data), 256): 261 bufs.append(co.compress(data[i:i+256])) 262 bufs.append(co.flush()) 263 combuf = ''.join(bufs) 264 self.assertEqual(data, zlib.decompress(combuf, -12, -5), 265 'compressed data failure') 266 267 dco = zlib.decompressobj(-12) 268 bufs = [] 269 cb = combuf 270 while cb: 271 max_length = 1 + len(cb)//10 272 chunk = dco.decompress(cb, max_length) 273 self.failIf(len(chunk) > max_length, 274 'chunk too big (%d>%d)' % (len(chunk),max_length)) 275 bufs.append(chunk) 276 cb = dco.unconsumed_tail 277 bufs.append(dco.flush()) 278 self.assertEqual(len(data), len(''.join(bufs))) 279 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') 280 281 def test_decompressmaxlenflushless(self): 282 # identical to test_decompressmaxlen except flush is replaced 283 # with an equivalent. This works and other fails on (eg) 2.2.2 284 data = hamlet_scene * 8 * 16 285 co = zlib.compressobj(2, 8, -12, 9, 1) 286 bufs = [] 287 for i in range(0, len(data), 256): 288 bufs.append(co.compress(data[i:i+256])) 289 bufs.append(co.flush()) 290 combuf = ''.join(bufs) 291 self.assertEqual(data, zlib.decompress(combuf, -12, -5), 292 'compressed data mismatch') 293 294 dco = zlib.decompressobj(-12) 295 bufs = [] 296 cb = combuf 297 while cb: 298 max_length = 1 + len(cb)//10 299 chunk = dco.decompress(cb, max_length) 300 self.failIf(len(chunk) > max_length, 301 'chunk too big (%d>%d)' % (len(chunk),max_length)) 302 bufs.append(chunk) 303 cb = dco.unconsumed_tail 304 305 #bufs.append(dco.flush()) 306 while len(chunk): 307 chunk = dco.decompress('', max_length) 308 self.failIf(len(chunk) > max_length, 309 'chunk too big (%d>%d)' % (len(chunk),max_length)) 310 bufs.append(chunk) 311 312 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') 313 314 def test_maxlenmisc(self): 315 # Misc tests of max_length 316 dco = zlib.decompressobj(-12) 317 self.assertRaises(ValueError, dco.decompress, "", -1) 318 self.assertEqual('', dco.unconsumed_tail) 319 320 def test_flushes(self): 321 # Test flush() with the various options, using all the 322 # different levels in order to provide more variations. 323 sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH'] 324 sync_opt = [getattr(zlib, opt) for opt in sync_opt 325 if hasattr(zlib, opt)] 326 data = hamlet_scene * 8 327 328 for sync in sync_opt: 329 for level in range(10): 330 obj = zlib.compressobj( level ) 331 a = obj.compress( data[:3000] ) 332 b = obj.flush( sync ) 333 c = obj.compress( data[3000:] ) 334 d = obj.flush() 335 self.assertEqual(zlib.decompress(''.join([a,b,c,d])), 336 data, ("Decompress failed: flush " 337 "mode=%i, level=%i") % (sync, level)) 338 del obj 339 340 def test_odd_flush(self): 341 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1 342 import random 343 344 if hasattr(zlib, 'Z_SYNC_FLUSH'): 345 # Testing on 17K of "random" data 346 347 # Create compressor and decompressor objects 348 co = zlib.compressobj(9) 349 dco = zlib.decompressobj() 350 351 # Try 17K of data 352 # generate random data stream 353 try: 354 # In 2.3 and later, WichmannHill is the RNG of the bug report 355 gen = random.WichmannHill() 356 except AttributeError: 357 try: 358 # 2.2 called it Random 359 gen = random.Random() 360 except AttributeError: 361 # others might simply have a single RNG 362 gen = random 363 gen.seed(1) 364 data = genblock(1, 17 * 1024, generator=gen) 365 366 # compress, sync-flush, and decompress 367 first = co.compress(data) 368 second = co.flush(zlib.Z_SYNC_FLUSH) 369 expanded = dco.decompress(first + second) 370 371 # if decompressed data is different from the input data, choke. 372 self.assertEqual(expanded, data, "17K random source doesn't match") 373 374 def test_manydecompinc(self): 375 # Run incremental decompress test for a large range of sizes 376 self.test_decompinc(sizes=[1<<n for n in range(8)], 377 flush=True, cx=32, dcx=4) 378 379 def test_manydecompimax(self): 380 # Run incremental decompress maxlen test for a large range of sizes 381 # avoid the flush bug 382 self.test_decompimax(sizes=[1<<n for n in range(8)], 383 flush=False, cx=32, dcx=4) 384 385 def test_manydecompimaxflush(self): 386 # Run incremental decompress maxlen test for a large range of sizes 387 # avoid the flush bug 388 self.test_decompimax(sizes=[1<<n for n in range(8)], 389 flush=True, cx=32, dcx=4) 390 391 392def genblock(seed, length, step=1024, generator=random): 393 """length-byte stream of random data from a seed (in step-byte blocks).""" 394 if seed is not None: 395 generator.seed(seed) 396 randint = generator.randint 397 if length < step or step < 2: 398 step = length 399 blocks = [] 400 for i in range(0, length, step): 401 blocks.append(''.join([chr(randint(0,255)) 402 for x in range(step)])) 403 return ''.join(blocks)[:length] 404 405 406 407def choose_lines(source, number, seed=None, generator=random): 408 """Return a list of number lines randomly chosen from the source""" 409 if seed is not None: 410 generator.seed(seed) 411 sources = source.split('\n') 412 return [generator.choice(sources) for n in range(number)] 413 414 415 416hamlet_scene = """ 417LAERTES 418 419 O, fear me not. 420 I stay too long: but here my father comes. 421 422 Enter POLONIUS 423 424 A double blessing is a double grace, 425 Occasion smiles upon a second leave. 426 427LORD POLONIUS 428 429 Yet here, Laertes! aboard, aboard, for shame! 430 The wind sits in the shoulder of your sail, 431 And you are stay'd for. There; my blessing with thee! 432 And these few precepts in thy memory 433 See thou character. Give thy thoughts no tongue, 434 Nor any unproportioned thought his act. 435 Be thou familiar, but by no means vulgar. 436 Those friends thou hast, and their adoption tried, 437 Grapple them to thy soul with hoops of steel; 438 But do not dull thy palm with entertainment 439 Of each new-hatch'd, unfledged comrade. Beware 440 Of entrance to a quarrel, but being in, 441 Bear't that the opposed may beware of thee. 442 Give every man thy ear, but few thy voice; 443 Take each man's censure, but reserve thy judgment. 444 Costly thy habit as thy purse can buy, 445 But not express'd in fancy; rich, not gaudy; 446 For the apparel oft proclaims the man, 447 And they in France of the best rank and station 448 Are of a most select and generous chief in that. 449 Neither a borrower nor a lender be; 450 For loan oft loses both itself and friend, 451 And borrowing dulls the edge of husbandry. 452 This above all: to thine ownself be true, 453 And it must follow, as the night the day, 454 Thou canst not then be false to any man. 455 Farewell: my blessing season this in thee! 456 457LAERTES 458 459 Most humbly do I take my leave, my lord. 460 461LORD POLONIUS 462 463 The time invites you; go; your servants tend. 464 465LAERTES 466 467 Farewell, Ophelia; and remember well 468 What I have said to you. 469 470OPHELIA 471 472 'Tis in my memory lock'd, 473 And you yourself shall keep the key of it. 474 475LAERTES 476 477 Farewell. 478""" 479 480 481def test_main(): 482 suite = unittest.TestSuite() 483 suite.addTest(unittest.makeSuite(ChecksumTestCase)) 484 suite.addTest(unittest.makeSuite(ExceptionTestCase)) 485 suite.addTest(unittest.makeSuite(CompressTestCase)) 486 suite.addTest(unittest.makeSuite(CompressObjectTestCase)) 487 test_support.run_suite(suite) 488 489if __name__ == "__main__": 490 test_main() 491 492def test(tests=''): 493 if not tests: tests = 'o' 494 suite = unittest.TestSuite() 495 if 'k' in tests: suite.addTest(unittest.makeSuite(ChecksumTestCase)) 496 if 'x' in tests: suite.addTest(unittest.makeSuite(ExceptionTestCase)) 497 if 'c' in tests: suite.addTest(unittest.makeSuite(CompressTestCase)) 498 if 'o' in tests: suite.addTest(unittest.makeSuite(CompressObjectTestCase)) 499 test_support.run_suite(suite) 500 501if False: 502 import sys 503 sys.path.insert(1, '/Py23Src/python/dist/src/Lib/test') 504 import test_zlib as tz 505 ts, ut = tz.test_support, tz.unittest 506 su = ut.TestSuite() 507 su.addTest(ut.makeSuite(tz.CompressTestCase)) 508 ts.run_suite(su) 509