test_zlib.py revision fb5d3e75bba8dfb567acefa6fd56ac56721bfe95
1import unittest 2from test.test_support import TESTFN, run_unittest, import_module, unlink, requires 3import binascii 4import random 5from test.test_support import precisionbigmemtest, _1G 6import sys 7 8try: 9 import mmap 10except ImportError: 11 mmap = None 12 13zlib = import_module('zlib') 14 15 16class ChecksumTestCase(unittest.TestCase): 17 # checksum test cases 18 def test_crc32start(self): 19 self.assertEqual(zlib.crc32(""), zlib.crc32("", 0)) 20 self.assertTrue(zlib.crc32("abc", 0xffffffff)) 21 22 def test_crc32empty(self): 23 self.assertEqual(zlib.crc32("", 0), 0) 24 self.assertEqual(zlib.crc32("", 1), 1) 25 self.assertEqual(zlib.crc32("", 432), 432) 26 27 def test_adler32start(self): 28 self.assertEqual(zlib.adler32(""), zlib.adler32("", 1)) 29 self.assertTrue(zlib.adler32("abc", 0xffffffff)) 30 31 def test_adler32empty(self): 32 self.assertEqual(zlib.adler32("", 0), 0) 33 self.assertEqual(zlib.adler32("", 1), 1) 34 self.assertEqual(zlib.adler32("", 432), 432) 35 36 def assertEqual32(self, seen, expected): 37 # 32-bit values masked -- checksums on 32- vs 64- bit machines 38 # This is important if bit 31 (0x08000000L) is set. 39 self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL) 40 41 def test_penguins(self): 42 self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L) 43 self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94) 44 self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6) 45 self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7) 46 47 self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0)) 48 self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1)) 49 50 def test_abcdefghijklmnop(self): 51 """test issue1202 compliance: signed crc32, adler32 in 2.x""" 52 foo = 'abcdefghijklmnop' 53 # explicitly test signed behavior 54 self.assertEqual(zlib.crc32(foo), -1808088941) 55 self.assertEqual(zlib.crc32('spam'), 1138425661) 56 self.assertEqual(zlib.adler32(foo+foo), -721416943) 57 self.assertEqual(zlib.adler32('spam'), 72286642) 58 59 def test_same_as_binascii_crc32(self): 60 foo = 'abcdefghijklmnop' 61 self.assertEqual(binascii.crc32(foo), zlib.crc32(foo)) 62 self.assertEqual(binascii.crc32('spam'), zlib.crc32('spam')) 63 64 def test_negative_crc_iv_input(self): 65 # The range of valid input values for the crc state should be 66 # -2**31 through 2**32-1 to allow inputs artifically constrained 67 # to a signed 32-bit integer. 68 self.assertEqual(zlib.crc32('ham', -1), zlib.crc32('ham', 0xffffffffL)) 69 self.assertEqual(zlib.crc32('spam', -3141593), 70 zlib.crc32('spam', 0xffd01027L)) 71 self.assertEqual(zlib.crc32('spam', -(2**31)), 72 zlib.crc32('spam', (2**31))) 73 74 75# Issue #11277 - check that inputs of 2 GB are handled correctly. 76# Be aware of issues #1202, #8650, #8651 and #10276 77class ChecksumBigBufferTestCase(unittest.TestCase): 78 int_max = 0x7FFFFFFF 79 80 @unittest.skipUnless(mmap, "mmap() is not available.") 81 def test_big_buffer(self): 82 if sys.platform[:3] == 'win' or sys.platform == 'darwin': 83 requires('largefile', 84 'test requires %s bytes and a long time to run' % 85 str(self.int_max)) 86 try: 87 with open(TESTFN, "wb+") as f: 88 f.seek(self.int_max-4) 89 f.write("asdf") 90 f.flush() 91 m = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) 92 try: 93 self.assertEqual(zlib.crc32(m), 0x709418e7) 94 self.assertEqual(zlib.adler32(m), -2072837729) 95 finally: 96 m.close() 97 except (IOError, OverflowError): 98 raise unittest.SkipTest("filesystem doesn't have largefile support") 99 finally: 100 unlink(TESTFN) 101 102 103class ExceptionTestCase(unittest.TestCase): 104 # make sure we generate some expected errors 105 def test_badlevel(self): 106 # specifying compression level out of range causes an error 107 # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib 108 # accepts 0 too) 109 self.assertRaises(zlib.error, zlib.compress, 'ERROR', 10) 110 111 def test_badcompressobj(self): 112 # verify failure on building compress object with bad params 113 self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0) 114 # specifying total bits too large causes an error 115 self.assertRaises(ValueError, 116 zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1) 117 118 def test_baddecompressobj(self): 119 # verify failure on building decompress object with bad params 120 self.assertRaises(ValueError, zlib.decompressobj, -1) 121 122 def test_decompressobj_badflush(self): 123 # verify failure on calling decompressobj.flush with bad params 124 self.assertRaises(ValueError, zlib.decompressobj().flush, 0) 125 self.assertRaises(ValueError, zlib.decompressobj().flush, -1) 126 127 128class BaseCompressTestCase(object): 129 def check_big_compress_buffer(self, size, compress_func): 130 _1M = 1024 * 1024 131 fmt = "%%0%dx" % (2 * _1M) 132 # Generate 10MB worth of random, and expand it by repeating it. 133 # The assumption is that zlib's memory is not big enough to exploit 134 # such spread out redundancy. 135 data = ''.join([binascii.a2b_hex(fmt % random.getrandbits(8 * _1M)) 136 for i in range(10)]) 137 data = data * (size // len(data) + 1) 138 try: 139 compress_func(data) 140 finally: 141 # Release memory 142 data = None 143 144 def check_big_decompress_buffer(self, size, decompress_func): 145 data = 'x' * size 146 try: 147 compressed = zlib.compress(data, 1) 148 finally: 149 # Release memory 150 data = None 151 data = decompress_func(compressed) 152 # Sanity check 153 try: 154 self.assertEqual(len(data), size) 155 self.assertEqual(len(data.strip('x')), 0) 156 finally: 157 data = None 158 159 160class CompressTestCase(BaseCompressTestCase, unittest.TestCase): 161 # Test compression in one go (whole message compression) 162 def test_speech(self): 163 x = zlib.compress(HAMLET_SCENE) 164 self.assertEqual(zlib.decompress(x), HAMLET_SCENE) 165 166 def test_speech128(self): 167 # compress more data 168 data = HAMLET_SCENE * 128 169 x = zlib.compress(data) 170 self.assertEqual(zlib.decompress(x), data) 171 172 def test_incomplete_stream(self): 173 # An useful error message is given 174 x = zlib.compress(HAMLET_SCENE) 175 self.assertRaisesRegexp(zlib.error, 176 "Error -5 while decompressing data: incomplete or truncated stream", 177 zlib.decompress, x[:-1]) 178 179 # Memory use of the following functions takes into account overallocation 180 181 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3) 182 def test_big_compress_buffer(self, size): 183 compress = lambda s: zlib.compress(s, 1) 184 self.check_big_compress_buffer(size, compress) 185 186 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2) 187 def test_big_decompress_buffer(self, size): 188 self.check_big_decompress_buffer(size, zlib.decompress) 189 190 191class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): 192 # Test compression object 193 def test_pair(self): 194 # straightforward compress/decompress objects 195 data = HAMLET_SCENE * 128 196 co = zlib.compressobj() 197 x1 = co.compress(data) 198 x2 = co.flush() 199 self.assertRaises(zlib.error, co.flush) # second flush should not work 200 dco = zlib.decompressobj() 201 y1 = dco.decompress(x1 + x2) 202 y2 = dco.flush() 203 self.assertEqual(data, y1 + y2) 204 205 def test_compressoptions(self): 206 # specify lots of options to compressobj() 207 level = 2 208 method = zlib.DEFLATED 209 wbits = -12 210 memlevel = 9 211 strategy = zlib.Z_FILTERED 212 co = zlib.compressobj(level, method, wbits, memlevel, strategy) 213 x1 = co.compress(HAMLET_SCENE) 214 x2 = co.flush() 215 dco = zlib.decompressobj(wbits) 216 y1 = dco.decompress(x1 + x2) 217 y2 = dco.flush() 218 self.assertEqual(HAMLET_SCENE, y1 + y2) 219 220 def test_compressincremental(self): 221 # compress object in steps, decompress object as one-shot 222 data = HAMLET_SCENE * 128 223 co = zlib.compressobj() 224 bufs = [] 225 for i in range(0, len(data), 256): 226 bufs.append(co.compress(data[i:i+256])) 227 bufs.append(co.flush()) 228 combuf = ''.join(bufs) 229 230 dco = zlib.decompressobj() 231 y1 = dco.decompress(''.join(bufs)) 232 y2 = dco.flush() 233 self.assertEqual(data, y1 + y2) 234 235 def test_decompinc(self, flush=False, source=None, cx=256, dcx=64): 236 # compress object in steps, decompress object in steps 237 source = source or HAMLET_SCENE 238 data = source * 128 239 co = zlib.compressobj() 240 bufs = [] 241 for i in range(0, len(data), cx): 242 bufs.append(co.compress(data[i:i+cx])) 243 bufs.append(co.flush()) 244 combuf = ''.join(bufs) 245 246 self.assertEqual(data, zlib.decompress(combuf)) 247 248 dco = zlib.decompressobj() 249 bufs = [] 250 for i in range(0, len(combuf), dcx): 251 bufs.append(dco.decompress(combuf[i:i+dcx])) 252 self.assertEqual('', dco.unconsumed_tail, ######## 253 "(A) uct should be '': not %d long" % 254 len(dco.unconsumed_tail)) 255 if flush: 256 bufs.append(dco.flush()) 257 else: 258 while True: 259 chunk = dco.decompress('') 260 if chunk: 261 bufs.append(chunk) 262 else: 263 break 264 self.assertEqual('', dco.unconsumed_tail, ######## 265 "(B) uct should be '': not %d long" % 266 len(dco.unconsumed_tail)) 267 self.assertEqual(data, ''.join(bufs)) 268 # Failure means: "decompressobj with init options failed" 269 270 def test_decompincflush(self): 271 self.test_decompinc(flush=True) 272 273 def test_decompimax(self, source=None, cx=256, dcx=64): 274 # compress in steps, decompress in length-restricted steps 275 source = source or HAMLET_SCENE 276 # Check a decompression object with max_length specified 277 data = source * 128 278 co = zlib.compressobj() 279 bufs = [] 280 for i in range(0, len(data), cx): 281 bufs.append(co.compress(data[i:i+cx])) 282 bufs.append(co.flush()) 283 combuf = ''.join(bufs) 284 self.assertEqual(data, zlib.decompress(combuf), 285 'compressed data failure') 286 287 dco = zlib.decompressobj() 288 bufs = [] 289 cb = combuf 290 while cb: 291 #max_length = 1 + len(cb)//10 292 chunk = dco.decompress(cb, dcx) 293 self.assertFalse(len(chunk) > dcx, 294 'chunk too big (%d>%d)' % (len(chunk), dcx)) 295 bufs.append(chunk) 296 cb = dco.unconsumed_tail 297 bufs.append(dco.flush()) 298 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') 299 300 def test_decompressmaxlen(self, flush=False): 301 # Check a decompression object with max_length specified 302 data = HAMLET_SCENE * 128 303 co = zlib.compressobj() 304 bufs = [] 305 for i in range(0, len(data), 256): 306 bufs.append(co.compress(data[i:i+256])) 307 bufs.append(co.flush()) 308 combuf = ''.join(bufs) 309 self.assertEqual(data, zlib.decompress(combuf), 310 'compressed data failure') 311 312 dco = zlib.decompressobj() 313 bufs = [] 314 cb = combuf 315 while cb: 316 max_length = 1 + len(cb)//10 317 chunk = dco.decompress(cb, max_length) 318 self.assertFalse(len(chunk) > max_length, 319 'chunk too big (%d>%d)' % (len(chunk),max_length)) 320 bufs.append(chunk) 321 cb = dco.unconsumed_tail 322 if flush: 323 bufs.append(dco.flush()) 324 else: 325 while chunk: 326 chunk = dco.decompress('', max_length) 327 self.assertFalse(len(chunk) > max_length, 328 'chunk too big (%d>%d)' % (len(chunk),max_length)) 329 bufs.append(chunk) 330 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') 331 332 def test_decompressmaxlenflush(self): 333 self.test_decompressmaxlen(flush=True) 334 335 def test_maxlenmisc(self): 336 # Misc tests of max_length 337 dco = zlib.decompressobj() 338 self.assertRaises(ValueError, dco.decompress, "", -1) 339 self.assertEqual('', dco.unconsumed_tail) 340 341 def test_flushes(self): 342 # Test flush() with the various options, using all the 343 # different levels in order to provide more variations. 344 sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH'] 345 sync_opt = [getattr(zlib, opt) for opt in sync_opt 346 if hasattr(zlib, opt)] 347 data = HAMLET_SCENE * 8 348 349 for sync in sync_opt: 350 for level in range(10): 351 obj = zlib.compressobj( level ) 352 a = obj.compress( data[:3000] ) 353 b = obj.flush( sync ) 354 c = obj.compress( data[3000:] ) 355 d = obj.flush() 356 self.assertEqual(zlib.decompress(''.join([a,b,c,d])), 357 data, ("Decompress failed: flush " 358 "mode=%i, level=%i") % (sync, level)) 359 del obj 360 361 def test_odd_flush(self): 362 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1 363 import random 364 365 if hasattr(zlib, 'Z_SYNC_FLUSH'): 366 # Testing on 17K of "random" data 367 368 # Create compressor and decompressor objects 369 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 370 dco = zlib.decompressobj() 371 372 # Try 17K of data 373 # generate random data stream 374 try: 375 # In 2.3 and later, WichmannHill is the RNG of the bug report 376 gen = random.WichmannHill() 377 except AttributeError: 378 try: 379 # 2.2 called it Random 380 gen = random.Random() 381 except AttributeError: 382 # others might simply have a single RNG 383 gen = random 384 gen.seed(1) 385 data = genblock(1, 17 * 1024, generator=gen) 386 387 # compress, sync-flush, and decompress 388 first = co.compress(data) 389 second = co.flush(zlib.Z_SYNC_FLUSH) 390 expanded = dco.decompress(first + second) 391 392 # if decompressed data is different from the input data, choke. 393 self.assertEqual(expanded, data, "17K random source doesn't match") 394 395 def test_empty_flush(self): 396 # Test that calling .flush() on unused objects works. 397 # (Bug #1083110 -- calling .flush() on decompress objects 398 # caused a core dump.) 399 400 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 401 self.assertTrue(co.flush()) # Returns a zlib header 402 dco = zlib.decompressobj() 403 self.assertEqual(dco.flush(), "") # Returns nothing 404 405 def test_decompress_incomplete_stream(self): 406 # This is 'foo', deflated 407 x = 'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' 408 # For the record 409 self.assertEqual(zlib.decompress(x), 'foo') 410 self.assertRaises(zlib.error, zlib.decompress, x[:-5]) 411 # Omitting the stream end works with decompressor objects 412 # (see issue #8672). 413 dco = zlib.decompressobj() 414 y = dco.decompress(x[:-5]) 415 y += dco.flush() 416 self.assertEqual(y, 'foo') 417 418 if hasattr(zlib.compressobj(), "copy"): 419 def test_compresscopy(self): 420 # Test copying a compression object 421 data0 = HAMLET_SCENE 422 data1 = HAMLET_SCENE.swapcase() 423 c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 424 bufs0 = [] 425 bufs0.append(c0.compress(data0)) 426 427 c1 = c0.copy() 428 bufs1 = bufs0[:] 429 430 bufs0.append(c0.compress(data0)) 431 bufs0.append(c0.flush()) 432 s0 = ''.join(bufs0) 433 434 bufs1.append(c1.compress(data1)) 435 bufs1.append(c1.flush()) 436 s1 = ''.join(bufs1) 437 438 self.assertEqual(zlib.decompress(s0),data0+data0) 439 self.assertEqual(zlib.decompress(s1),data0+data1) 440 441 def test_badcompresscopy(self): 442 # Test copying a compression object in an inconsistent state 443 c = zlib.compressobj() 444 c.compress(HAMLET_SCENE) 445 c.flush() 446 self.assertRaises(ValueError, c.copy) 447 448 if hasattr(zlib.decompressobj(), "copy"): 449 def test_decompresscopy(self): 450 # Test copying a decompression object 451 data = HAMLET_SCENE 452 comp = zlib.compress(data) 453 454 d0 = zlib.decompressobj() 455 bufs0 = [] 456 bufs0.append(d0.decompress(comp[:32])) 457 458 d1 = d0.copy() 459 bufs1 = bufs0[:] 460 461 bufs0.append(d0.decompress(comp[32:])) 462 s0 = ''.join(bufs0) 463 464 bufs1.append(d1.decompress(comp[32:])) 465 s1 = ''.join(bufs1) 466 467 self.assertEqual(s0,s1) 468 self.assertEqual(s0,data) 469 470 def test_baddecompresscopy(self): 471 # Test copying a compression object in an inconsistent state 472 data = zlib.compress(HAMLET_SCENE) 473 d = zlib.decompressobj() 474 d.decompress(data) 475 d.flush() 476 self.assertRaises(ValueError, d.copy) 477 478 # Memory use of the following functions takes into account overallocation 479 480 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3) 481 def test_big_compress_buffer(self, size): 482 c = zlib.compressobj(1) 483 compress = lambda s: c.compress(s) + c.flush() 484 self.check_big_compress_buffer(size, compress) 485 486 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2) 487 def test_big_decompress_buffer(self, size): 488 d = zlib.decompressobj() 489 decompress = lambda s: d.decompress(s) + d.flush() 490 self.check_big_decompress_buffer(size, decompress) 491 492 493def genblock(seed, length, step=1024, generator=random): 494 """length-byte stream of random data from a seed (in step-byte blocks).""" 495 if seed is not None: 496 generator.seed(seed) 497 randint = generator.randint 498 if length < step or step < 2: 499 step = length 500 blocks = [] 501 for i in range(0, length, step): 502 blocks.append(''.join([chr(randint(0,255)) 503 for x in range(step)])) 504 return ''.join(blocks)[:length] 505 506 507 508def choose_lines(source, number, seed=None, generator=random): 509 """Return a list of number lines randomly chosen from the source""" 510 if seed is not None: 511 generator.seed(seed) 512 sources = source.split('\n') 513 return [generator.choice(sources) for n in range(number)] 514 515 516 517HAMLET_SCENE = """ 518LAERTES 519 520 O, fear me not. 521 I stay too long: but here my father comes. 522 523 Enter POLONIUS 524 525 A double blessing is a double grace, 526 Occasion smiles upon a second leave. 527 528LORD POLONIUS 529 530 Yet here, Laertes! aboard, aboard, for shame! 531 The wind sits in the shoulder of your sail, 532 And you are stay'd for. There; my blessing with thee! 533 And these few precepts in thy memory 534 See thou character. Give thy thoughts no tongue, 535 Nor any unproportioned thought his act. 536 Be thou familiar, but by no means vulgar. 537 Those friends thou hast, and their adoption tried, 538 Grapple them to thy soul with hoops of steel; 539 But do not dull thy palm with entertainment 540 Of each new-hatch'd, unfledged comrade. Beware 541 Of entrance to a quarrel, but being in, 542 Bear't that the opposed may beware of thee. 543 Give every man thy ear, but few thy voice; 544 Take each man's censure, but reserve thy judgment. 545 Costly thy habit as thy purse can buy, 546 But not express'd in fancy; rich, not gaudy; 547 For the apparel oft proclaims the man, 548 And they in France of the best rank and station 549 Are of a most select and generous chief in that. 550 Neither a borrower nor a lender be; 551 For loan oft loses both itself and friend, 552 And borrowing dulls the edge of husbandry. 553 This above all: to thine ownself be true, 554 And it must follow, as the night the day, 555 Thou canst not then be false to any man. 556 Farewell: my blessing season this in thee! 557 558LAERTES 559 560 Most humbly do I take my leave, my lord. 561 562LORD POLONIUS 563 564 The time invites you; go; your servants tend. 565 566LAERTES 567 568 Farewell, Ophelia; and remember well 569 What I have said to you. 570 571OPHELIA 572 573 'Tis in my memory lock'd, 574 And you yourself shall keep the key of it. 575 576LAERTES 577 578 Farewell. 579""" 580 581 582def test_main(): 583 run_unittest( 584 ChecksumTestCase, 585 ChecksumBigBufferTestCase, 586 ExceptionTestCase, 587 CompressTestCase, 588 CompressObjectTestCase 589 ) 590 591if __name__ == "__main__": 592 test_main() 593