test_zlib.py revision fc3bfad2e0376ea5aa8a0c5352aafd3d731aaad7
1import unittest 2from test import test_support 3import binascii 4import random 5from test.test_support import precisionbigmemtest, _1G 6 7zlib = test_support.import_module('zlib') 8 9 10class ChecksumTestCase(unittest.TestCase): 11 # checksum test cases 12 def test_crc32start(self): 13 self.assertEqual(zlib.crc32(""), zlib.crc32("", 0)) 14 self.assertTrue(zlib.crc32("abc", 0xffffffff)) 15 16 def test_crc32empty(self): 17 self.assertEqual(zlib.crc32("", 0), 0) 18 self.assertEqual(zlib.crc32("", 1), 1) 19 self.assertEqual(zlib.crc32("", 432), 432) 20 21 def test_adler32start(self): 22 self.assertEqual(zlib.adler32(""), zlib.adler32("", 1)) 23 self.assertTrue(zlib.adler32("abc", 0xffffffff)) 24 25 def test_adler32empty(self): 26 self.assertEqual(zlib.adler32("", 0), 0) 27 self.assertEqual(zlib.adler32("", 1), 1) 28 self.assertEqual(zlib.adler32("", 432), 432) 29 30 def assertEqual32(self, seen, expected): 31 # 32-bit values masked -- checksums on 32- vs 64- bit machines 32 # This is important if bit 31 (0x08000000L) is set. 33 self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL) 34 35 def test_penguins(self): 36 self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L) 37 self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94) 38 self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6) 39 self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7) 40 41 self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0)) 42 self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1)) 43 44 def test_abcdefghijklmnop(self): 45 """test issue1202 compliance: signed crc32, adler32 in 2.x""" 46 foo = 'abcdefghijklmnop' 47 # explicitly test signed behavior 48 self.assertEqual(zlib.crc32(foo), -1808088941) 49 self.assertEqual(zlib.crc32('spam'), 1138425661) 50 self.assertEqual(zlib.adler32(foo+foo), -721416943) 51 self.assertEqual(zlib.adler32('spam'), 72286642) 52 53 def test_same_as_binascii_crc32(self): 54 foo = 'abcdefghijklmnop' 55 self.assertEqual(binascii.crc32(foo), zlib.crc32(foo)) 56 self.assertEqual(binascii.crc32('spam'), zlib.crc32('spam')) 57 58 def test_negative_crc_iv_input(self): 59 # The range of valid input values for the crc state should be 60 # -2**31 through 2**32-1 to allow inputs artifically constrained 61 # to a signed 32-bit integer. 62 self.assertEqual(zlib.crc32('ham', -1), zlib.crc32('ham', 0xffffffffL)) 63 self.assertEqual(zlib.crc32('spam', -3141593), 64 zlib.crc32('spam', 0xffd01027L)) 65 self.assertEqual(zlib.crc32('spam', -(2**31)), 66 zlib.crc32('spam', (2**31))) 67 68 69class ExceptionTestCase(unittest.TestCase): 70 # make sure we generate some expected errors 71 def test_badlevel(self): 72 # specifying compression level out of range causes an error 73 # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib 74 # accepts 0 too) 75 self.assertRaises(zlib.error, zlib.compress, 'ERROR', 10) 76 77 def test_badcompressobj(self): 78 # verify failure on building compress object with bad params 79 self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0) 80 # specifying total bits too large causes an error 81 self.assertRaises(ValueError, 82 zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1) 83 84 def test_baddecompressobj(self): 85 # verify failure on building decompress object with bad params 86 self.assertRaises(ValueError, zlib.decompressobj, -1) 87 88 def test_decompressobj_badflush(self): 89 # verify failure on calling decompressobj.flush with bad params 90 self.assertRaises(ValueError, zlib.decompressobj().flush, 0) 91 self.assertRaises(ValueError, zlib.decompressobj().flush, -1) 92 93 94class BaseCompressTestCase(object): 95 def check_big_compress_buffer(self, size, compress_func): 96 _1M = 1024 * 1024 97 fmt = "%%0%dx" % (2 * _1M) 98 # Generate 10MB worth of random, and expand it by repeating it. 99 # The assumption is that zlib's memory is not big enough to exploit 100 # such spread out redundancy. 101 data = ''.join([binascii.a2b_hex(fmt % random.getrandbits(8 * _1M)) 102 for i in range(10)]) 103 data = data * (size // len(data) + 1) 104 try: 105 compress_func(data) 106 finally: 107 # Release memory 108 data = None 109 110 def check_big_decompress_buffer(self, size, decompress_func): 111 data = 'x' * size 112 try: 113 compressed = zlib.compress(data, 1) 114 finally: 115 # Release memory 116 data = None 117 data = decompress_func(compressed) 118 # Sanity check 119 try: 120 self.assertEqual(len(data), size) 121 self.assertEqual(len(data.strip('x')), 0) 122 finally: 123 data = None 124 125 126class CompressTestCase(BaseCompressTestCase, unittest.TestCase): 127 # Test compression in one go (whole message compression) 128 def test_speech(self): 129 x = zlib.compress(HAMLET_SCENE) 130 self.assertEqual(zlib.decompress(x), HAMLET_SCENE) 131 132 def test_speech128(self): 133 # compress more data 134 data = HAMLET_SCENE * 128 135 x = zlib.compress(data) 136 self.assertEqual(zlib.decompress(x), data) 137 138 def test_incomplete_stream(self): 139 # An useful error message is given 140 x = zlib.compress(HAMLET_SCENE) 141 self.assertRaisesRegexp(zlib.error, 142 "Error -5 while decompressing data: incomplete or truncated stream", 143 zlib.decompress, x[:-1]) 144 145 # Memory use of the following functions takes into account overallocation 146 147 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3) 148 def test_big_compress_buffer(self, size): 149 compress = lambda s: zlib.compress(s, 1) 150 self.check_big_compress_buffer(size, compress) 151 152 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2) 153 def test_big_decompress_buffer(self, size): 154 self.check_big_decompress_buffer(size, zlib.decompress) 155 156 157class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): 158 # Test compression object 159 def test_pair(self): 160 # straightforward compress/decompress objects 161 data = HAMLET_SCENE * 128 162 co = zlib.compressobj() 163 x1 = co.compress(data) 164 x2 = co.flush() 165 self.assertRaises(zlib.error, co.flush) # second flush should not work 166 dco = zlib.decompressobj() 167 y1 = dco.decompress(x1 + x2) 168 y2 = dco.flush() 169 self.assertEqual(data, y1 + y2) 170 171 def test_compressoptions(self): 172 # specify lots of options to compressobj() 173 level = 2 174 method = zlib.DEFLATED 175 wbits = -12 176 memlevel = 9 177 strategy = zlib.Z_FILTERED 178 co = zlib.compressobj(level, method, wbits, memlevel, strategy) 179 x1 = co.compress(HAMLET_SCENE) 180 x2 = co.flush() 181 dco = zlib.decompressobj(wbits) 182 y1 = dco.decompress(x1 + x2) 183 y2 = dco.flush() 184 self.assertEqual(HAMLET_SCENE, y1 + y2) 185 186 def test_compressincremental(self): 187 # compress object in steps, decompress object as one-shot 188 data = HAMLET_SCENE * 128 189 co = zlib.compressobj() 190 bufs = [] 191 for i in range(0, len(data), 256): 192 bufs.append(co.compress(data[i:i+256])) 193 bufs.append(co.flush()) 194 combuf = ''.join(bufs) 195 196 dco = zlib.decompressobj() 197 y1 = dco.decompress(''.join(bufs)) 198 y2 = dco.flush() 199 self.assertEqual(data, y1 + y2) 200 201 def test_decompinc(self, flush=False, source=None, cx=256, dcx=64): 202 # compress object in steps, decompress object in steps 203 source = source or HAMLET_SCENE 204 data = source * 128 205 co = zlib.compressobj() 206 bufs = [] 207 for i in range(0, len(data), cx): 208 bufs.append(co.compress(data[i:i+cx])) 209 bufs.append(co.flush()) 210 combuf = ''.join(bufs) 211 212 self.assertEqual(data, zlib.decompress(combuf)) 213 214 dco = zlib.decompressobj() 215 bufs = [] 216 for i in range(0, len(combuf), dcx): 217 bufs.append(dco.decompress(combuf[i:i+dcx])) 218 self.assertEqual('', dco.unconsumed_tail, ######## 219 "(A) uct should be '': not %d long" % 220 len(dco.unconsumed_tail)) 221 if flush: 222 bufs.append(dco.flush()) 223 else: 224 while True: 225 chunk = dco.decompress('') 226 if chunk: 227 bufs.append(chunk) 228 else: 229 break 230 self.assertEqual('', dco.unconsumed_tail, ######## 231 "(B) uct should be '': not %d long" % 232 len(dco.unconsumed_tail)) 233 self.assertEqual(data, ''.join(bufs)) 234 # Failure means: "decompressobj with init options failed" 235 236 def test_decompincflush(self): 237 self.test_decompinc(flush=True) 238 239 def test_decompimax(self, source=None, cx=256, dcx=64): 240 # compress in steps, decompress in length-restricted steps 241 source = source or HAMLET_SCENE 242 # Check a decompression object with max_length specified 243 data = source * 128 244 co = zlib.compressobj() 245 bufs = [] 246 for i in range(0, len(data), cx): 247 bufs.append(co.compress(data[i:i+cx])) 248 bufs.append(co.flush()) 249 combuf = ''.join(bufs) 250 self.assertEqual(data, zlib.decompress(combuf), 251 'compressed data failure') 252 253 dco = zlib.decompressobj() 254 bufs = [] 255 cb = combuf 256 while cb: 257 #max_length = 1 + len(cb)//10 258 chunk = dco.decompress(cb, dcx) 259 self.assertFalse(len(chunk) > dcx, 260 'chunk too big (%d>%d)' % (len(chunk), dcx)) 261 bufs.append(chunk) 262 cb = dco.unconsumed_tail 263 bufs.append(dco.flush()) 264 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') 265 266 def test_decompressmaxlen(self, flush=False): 267 # Check a decompression object with max_length specified 268 data = HAMLET_SCENE * 128 269 co = zlib.compressobj() 270 bufs = [] 271 for i in range(0, len(data), 256): 272 bufs.append(co.compress(data[i:i+256])) 273 bufs.append(co.flush()) 274 combuf = ''.join(bufs) 275 self.assertEqual(data, zlib.decompress(combuf), 276 'compressed data failure') 277 278 dco = zlib.decompressobj() 279 bufs = [] 280 cb = combuf 281 while cb: 282 max_length = 1 + len(cb)//10 283 chunk = dco.decompress(cb, max_length) 284 self.assertFalse(len(chunk) > max_length, 285 'chunk too big (%d>%d)' % (len(chunk),max_length)) 286 bufs.append(chunk) 287 cb = dco.unconsumed_tail 288 if flush: 289 bufs.append(dco.flush()) 290 else: 291 while chunk: 292 chunk = dco.decompress('', max_length) 293 self.assertFalse(len(chunk) > max_length, 294 'chunk too big (%d>%d)' % (len(chunk),max_length)) 295 bufs.append(chunk) 296 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') 297 298 def test_decompressmaxlenflush(self): 299 self.test_decompressmaxlen(flush=True) 300 301 def test_maxlenmisc(self): 302 # Misc tests of max_length 303 dco = zlib.decompressobj() 304 self.assertRaises(ValueError, dco.decompress, "", -1) 305 self.assertEqual('', dco.unconsumed_tail) 306 307 def test_flushes(self): 308 # Test flush() with the various options, using all the 309 # different levels in order to provide more variations. 310 sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH'] 311 sync_opt = [getattr(zlib, opt) for opt in sync_opt 312 if hasattr(zlib, opt)] 313 data = HAMLET_SCENE * 8 314 315 for sync in sync_opt: 316 for level in range(10): 317 obj = zlib.compressobj( level ) 318 a = obj.compress( data[:3000] ) 319 b = obj.flush( sync ) 320 c = obj.compress( data[3000:] ) 321 d = obj.flush() 322 self.assertEqual(zlib.decompress(''.join([a,b,c,d])), 323 data, ("Decompress failed: flush " 324 "mode=%i, level=%i") % (sync, level)) 325 del obj 326 327 def test_odd_flush(self): 328 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1 329 import random 330 331 if hasattr(zlib, 'Z_SYNC_FLUSH'): 332 # Testing on 17K of "random" data 333 334 # Create compressor and decompressor objects 335 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 336 dco = zlib.decompressobj() 337 338 # Try 17K of data 339 # generate random data stream 340 try: 341 # In 2.3 and later, WichmannHill is the RNG of the bug report 342 gen = random.WichmannHill() 343 except AttributeError: 344 try: 345 # 2.2 called it Random 346 gen = random.Random() 347 except AttributeError: 348 # others might simply have a single RNG 349 gen = random 350 gen.seed(1) 351 data = genblock(1, 17 * 1024, generator=gen) 352 353 # compress, sync-flush, and decompress 354 first = co.compress(data) 355 second = co.flush(zlib.Z_SYNC_FLUSH) 356 expanded = dco.decompress(first + second) 357 358 # if decompressed data is different from the input data, choke. 359 self.assertEqual(expanded, data, "17K random source doesn't match") 360 361 def test_empty_flush(self): 362 # Test that calling .flush() on unused objects works. 363 # (Bug #1083110 -- calling .flush() on decompress objects 364 # caused a core dump.) 365 366 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 367 self.assertTrue(co.flush()) # Returns a zlib header 368 dco = zlib.decompressobj() 369 self.assertEqual(dco.flush(), "") # Returns nothing 370 371 def test_decompress_incomplete_stream(self): 372 # This is 'foo', deflated 373 x = 'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' 374 # For the record 375 self.assertEqual(zlib.decompress(x), 'foo') 376 self.assertRaises(zlib.error, zlib.decompress, x[:-5]) 377 # Omitting the stream end works with decompressor objects 378 # (see issue #8672). 379 dco = zlib.decompressobj() 380 y = dco.decompress(x[:-5]) 381 y += dco.flush() 382 self.assertEqual(y, 'foo') 383 384 if hasattr(zlib.compressobj(), "copy"): 385 def test_compresscopy(self): 386 # Test copying a compression object 387 data0 = HAMLET_SCENE 388 data1 = HAMLET_SCENE.swapcase() 389 c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 390 bufs0 = [] 391 bufs0.append(c0.compress(data0)) 392 393 c1 = c0.copy() 394 bufs1 = bufs0[:] 395 396 bufs0.append(c0.compress(data0)) 397 bufs0.append(c0.flush()) 398 s0 = ''.join(bufs0) 399 400 bufs1.append(c1.compress(data1)) 401 bufs1.append(c1.flush()) 402 s1 = ''.join(bufs1) 403 404 self.assertEqual(zlib.decompress(s0),data0+data0) 405 self.assertEqual(zlib.decompress(s1),data0+data1) 406 407 def test_badcompresscopy(self): 408 # Test copying a compression object in an inconsistent state 409 c = zlib.compressobj() 410 c.compress(HAMLET_SCENE) 411 c.flush() 412 self.assertRaises(ValueError, c.copy) 413 414 if hasattr(zlib.decompressobj(), "copy"): 415 def test_decompresscopy(self): 416 # Test copying a decompression object 417 data = HAMLET_SCENE 418 comp = zlib.compress(data) 419 420 d0 = zlib.decompressobj() 421 bufs0 = [] 422 bufs0.append(d0.decompress(comp[:32])) 423 424 d1 = d0.copy() 425 bufs1 = bufs0[:] 426 427 bufs0.append(d0.decompress(comp[32:])) 428 s0 = ''.join(bufs0) 429 430 bufs1.append(d1.decompress(comp[32:])) 431 s1 = ''.join(bufs1) 432 433 self.assertEqual(s0,s1) 434 self.assertEqual(s0,data) 435 436 def test_baddecompresscopy(self): 437 # Test copying a compression object in an inconsistent state 438 data = zlib.compress(HAMLET_SCENE) 439 d = zlib.decompressobj() 440 d.decompress(data) 441 d.flush() 442 self.assertRaises(ValueError, d.copy) 443 444 # Memory use of the following functions takes into account overallocation 445 446 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3) 447 def test_big_compress_buffer(self, size): 448 c = zlib.compressobj(1) 449 compress = lambda s: c.compress(s) + c.flush() 450 self.check_big_compress_buffer(size, compress) 451 452 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2) 453 def test_big_decompress_buffer(self, size): 454 d = zlib.decompressobj() 455 decompress = lambda s: d.decompress(s) + d.flush() 456 self.check_big_decompress_buffer(size, decompress) 457 458 459def genblock(seed, length, step=1024, generator=random): 460 """length-byte stream of random data from a seed (in step-byte blocks).""" 461 if seed is not None: 462 generator.seed(seed) 463 randint = generator.randint 464 if length < step or step < 2: 465 step = length 466 blocks = [] 467 for i in range(0, length, step): 468 blocks.append(''.join([chr(randint(0,255)) 469 for x in range(step)])) 470 return ''.join(blocks)[:length] 471 472 473 474def choose_lines(source, number, seed=None, generator=random): 475 """Return a list of number lines randomly chosen from the source""" 476 if seed is not None: 477 generator.seed(seed) 478 sources = source.split('\n') 479 return [generator.choice(sources) for n in range(number)] 480 481 482 483HAMLET_SCENE = """ 484LAERTES 485 486 O, fear me not. 487 I stay too long: but here my father comes. 488 489 Enter POLONIUS 490 491 A double blessing is a double grace, 492 Occasion smiles upon a second leave. 493 494LORD POLONIUS 495 496 Yet here, Laertes! aboard, aboard, for shame! 497 The wind sits in the shoulder of your sail, 498 And you are stay'd for. There; my blessing with thee! 499 And these few precepts in thy memory 500 See thou character. Give thy thoughts no tongue, 501 Nor any unproportioned thought his act. 502 Be thou familiar, but by no means vulgar. 503 Those friends thou hast, and their adoption tried, 504 Grapple them to thy soul with hoops of steel; 505 But do not dull thy palm with entertainment 506 Of each new-hatch'd, unfledged comrade. Beware 507 Of entrance to a quarrel, but being in, 508 Bear't that the opposed may beware of thee. 509 Give every man thy ear, but few thy voice; 510 Take each man's censure, but reserve thy judgment. 511 Costly thy habit as thy purse can buy, 512 But not express'd in fancy; rich, not gaudy; 513 For the apparel oft proclaims the man, 514 And they in France of the best rank and station 515 Are of a most select and generous chief in that. 516 Neither a borrower nor a lender be; 517 For loan oft loses both itself and friend, 518 And borrowing dulls the edge of husbandry. 519 This above all: to thine ownself be true, 520 And it must follow, as the night the day, 521 Thou canst not then be false to any man. 522 Farewell: my blessing season this in thee! 523 524LAERTES 525 526 Most humbly do I take my leave, my lord. 527 528LORD POLONIUS 529 530 The time invites you; go; your servants tend. 531 532LAERTES 533 534 Farewell, Ophelia; and remember well 535 What I have said to you. 536 537OPHELIA 538 539 'Tis in my memory lock'd, 540 And you yourself shall keep the key of it. 541 542LAERTES 543 544 Farewell. 545""" 546 547 548def test_main(): 549 test_support.run_unittest( 550 ChecksumTestCase, 551 ExceptionTestCase, 552 CompressTestCase, 553 CompressObjectTestCase 554 ) 555 556if __name__ == "__main__": 557 test_main() 558