test_zlib.py revision c856fa811dfdd6d17396d9aa522eea19b0f46c93
1import unittest 2from test import test_support 3import zlib 4import binascii 5import random 6 7 8class ChecksumTestCase(unittest.TestCase): 9 # checksum test cases 10 def test_crc32start(self): 11 self.assertEqual(zlib.crc32(""), zlib.crc32("", 0)) 12 self.assert_(zlib.crc32("abc", 0xffffffff)) 13 14 def test_crc32empty(self): 15 self.assertEqual(zlib.crc32("", 0), 0) 16 self.assertEqual(zlib.crc32("", 1), 1) 17 self.assertEqual(zlib.crc32("", 432), 432) 18 19 def test_adler32start(self): 20 self.assertEqual(zlib.adler32(""), zlib.adler32("", 1)) 21 self.assert_(zlib.adler32("abc", 0xffffffff)) 22 23 def test_adler32empty(self): 24 self.assertEqual(zlib.adler32("", 0), 0) 25 self.assertEqual(zlib.adler32("", 1), 1) 26 self.assertEqual(zlib.adler32("", 432), 432) 27 28 def assertEqual32(self, seen, expected): 29 # 32-bit values masked -- checksums on 32- vs 64- bit machines 30 # This is important if bit 31 (0x08000000L) is set. 31 self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL) 32 33 def test_penguins(self): 34 self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L) 35 self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94) 36 self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6) 37 self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7) 38 39 self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0)) 40 self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1)) 41 42 def test_abcdefghijklmnop(self): 43 """test issue1202 compliance: signed crc32, adler32 in 2.x""" 44 foo = 'abcdefghijklmnop' 45 # explicitly test signed behavior 46 self.assertEqual(zlib.crc32(foo), -1808088941) 47 self.assertEqual(zlib.crc32('spam'), 1138425661) 48 self.assertEqual(zlib.adler32(foo+foo), -721416943) 49 self.assertEqual(zlib.adler32('spam'), 72286642) 50 51 def test_same_as_binascii_crc32(self): 52 foo = 'abcdefghijklmnop' 53 self.assertEqual(binascii.crc32(foo), zlib.crc32(foo)) 54 self.assertEqual(binascii.crc32('spam'), zlib.crc32('spam')) 55 56 57 58class ExceptionTestCase(unittest.TestCase): 59 # make sure we generate some expected errors 60 def test_badlevel(self): 61 # specifying compression level out of range causes an error 62 # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib 63 # accepts 0 too) 64 self.assertRaises(zlib.error, zlib.compress, 'ERROR', 10) 65 66 def test_badcompressobj(self): 67 # verify failure on building compress object with bad params 68 self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0) 69 # specifying total bits too large causes an error 70 self.assertRaises(ValueError, 71 zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1) 72 73 def test_baddecompressobj(self): 74 # verify failure on building decompress object with bad params 75 self.assertRaises(ValueError, zlib.decompressobj, 0) 76 77 78 79class CompressTestCase(unittest.TestCase): 80 # Test compression in one go (whole message compression) 81 def test_speech(self): 82 x = zlib.compress(HAMLET_SCENE) 83 self.assertEqual(zlib.decompress(x), HAMLET_SCENE) 84 85 def test_speech128(self): 86 # compress more data 87 data = HAMLET_SCENE * 128 88 x = zlib.compress(data) 89 self.assertEqual(zlib.decompress(x), data) 90 91 92 93 94class CompressObjectTestCase(unittest.TestCase): 95 # Test compression object 96 def test_pair(self): 97 # straightforward compress/decompress objects 98 data = HAMLET_SCENE * 128 99 co = zlib.compressobj() 100 x1 = co.compress(data) 101 x2 = co.flush() 102 self.assertRaises(zlib.error, co.flush) # second flush should not work 103 dco = zlib.decompressobj() 104 y1 = dco.decompress(x1 + x2) 105 y2 = dco.flush() 106 self.assertEqual(data, y1 + y2) 107 108 def test_compressoptions(self): 109 # specify lots of options to compressobj() 110 level = 2 111 method = zlib.DEFLATED 112 wbits = -12 113 memlevel = 9 114 strategy = zlib.Z_FILTERED 115 co = zlib.compressobj(level, method, wbits, memlevel, strategy) 116 x1 = co.compress(HAMLET_SCENE) 117 x2 = co.flush() 118 dco = zlib.decompressobj(wbits) 119 y1 = dco.decompress(x1 + x2) 120 y2 = dco.flush() 121 self.assertEqual(HAMLET_SCENE, y1 + y2) 122 123 def test_compressincremental(self): 124 # compress object in steps, decompress object as one-shot 125 data = HAMLET_SCENE * 128 126 co = zlib.compressobj() 127 bufs = [] 128 for i in range(0, len(data), 256): 129 bufs.append(co.compress(data[i:i+256])) 130 bufs.append(co.flush()) 131 combuf = ''.join(bufs) 132 133 dco = zlib.decompressobj() 134 y1 = dco.decompress(''.join(bufs)) 135 y2 = dco.flush() 136 self.assertEqual(data, y1 + y2) 137 138 def test_decompinc(self, flush=False, source=None, cx=256, dcx=64): 139 # compress object in steps, decompress object in steps 140 source = source or HAMLET_SCENE 141 data = source * 128 142 co = zlib.compressobj() 143 bufs = [] 144 for i in range(0, len(data), cx): 145 bufs.append(co.compress(data[i:i+cx])) 146 bufs.append(co.flush()) 147 combuf = ''.join(bufs) 148 149 self.assertEqual(data, zlib.decompress(combuf)) 150 151 dco = zlib.decompressobj() 152 bufs = [] 153 for i in range(0, len(combuf), dcx): 154 bufs.append(dco.decompress(combuf[i:i+dcx])) 155 self.assertEqual('', dco.unconsumed_tail, ######## 156 "(A) uct should be '': not %d long" % 157 len(dco.unconsumed_tail)) 158 if flush: 159 bufs.append(dco.flush()) 160 else: 161 while True: 162 chunk = dco.decompress('') 163 if chunk: 164 bufs.append(chunk) 165 else: 166 break 167 self.assertEqual('', dco.unconsumed_tail, ######## 168 "(B) uct should be '': not %d long" % 169 len(dco.unconsumed_tail)) 170 self.assertEqual(data, ''.join(bufs)) 171 # Failure means: "decompressobj with init options failed" 172 173 def test_decompincflush(self): 174 self.test_decompinc(flush=True) 175 176 def test_decompimax(self, source=None, cx=256, dcx=64): 177 # compress in steps, decompress in length-restricted steps 178 source = source or HAMLET_SCENE 179 # Check a decompression object with max_length specified 180 data = source * 128 181 co = zlib.compressobj() 182 bufs = [] 183 for i in range(0, len(data), cx): 184 bufs.append(co.compress(data[i:i+cx])) 185 bufs.append(co.flush()) 186 combuf = ''.join(bufs) 187 self.assertEqual(data, zlib.decompress(combuf), 188 'compressed data failure') 189 190 dco = zlib.decompressobj() 191 bufs = [] 192 cb = combuf 193 while cb: 194 #max_length = 1 + len(cb)//10 195 chunk = dco.decompress(cb, dcx) 196 self.failIf(len(chunk) > dcx, 197 'chunk too big (%d>%d)' % (len(chunk), dcx)) 198 bufs.append(chunk) 199 cb = dco.unconsumed_tail 200 bufs.append(dco.flush()) 201 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') 202 203 def test_decompressmaxlen(self, flush=False): 204 # Check a decompression object with max_length specified 205 data = HAMLET_SCENE * 128 206 co = zlib.compressobj() 207 bufs = [] 208 for i in range(0, len(data), 256): 209 bufs.append(co.compress(data[i:i+256])) 210 bufs.append(co.flush()) 211 combuf = ''.join(bufs) 212 self.assertEqual(data, zlib.decompress(combuf), 213 'compressed data failure') 214 215 dco = zlib.decompressobj() 216 bufs = [] 217 cb = combuf 218 while cb: 219 max_length = 1 + len(cb)//10 220 chunk = dco.decompress(cb, max_length) 221 self.failIf(len(chunk) > max_length, 222 'chunk too big (%d>%d)' % (len(chunk),max_length)) 223 bufs.append(chunk) 224 cb = dco.unconsumed_tail 225 if flush: 226 bufs.append(dco.flush()) 227 else: 228 while chunk: 229 chunk = dco.decompress('', max_length) 230 self.failIf(len(chunk) > max_length, 231 'chunk too big (%d>%d)' % (len(chunk),max_length)) 232 bufs.append(chunk) 233 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') 234 235 def test_decompressmaxlenflush(self): 236 self.test_decompressmaxlen(flush=True) 237 238 def test_maxlenmisc(self): 239 # Misc tests of max_length 240 dco = zlib.decompressobj() 241 self.assertRaises(ValueError, dco.decompress, "", -1) 242 self.assertEqual('', dco.unconsumed_tail) 243 244 def test_flushes(self): 245 # Test flush() with the various options, using all the 246 # different levels in order to provide more variations. 247 sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH'] 248 sync_opt = [getattr(zlib, opt) for opt in sync_opt 249 if hasattr(zlib, opt)] 250 data = HAMLET_SCENE * 8 251 252 for sync in sync_opt: 253 for level in range(10): 254 obj = zlib.compressobj( level ) 255 a = obj.compress( data[:3000] ) 256 b = obj.flush( sync ) 257 c = obj.compress( data[3000:] ) 258 d = obj.flush() 259 self.assertEqual(zlib.decompress(''.join([a,b,c,d])), 260 data, ("Decompress failed: flush " 261 "mode=%i, level=%i") % (sync, level)) 262 del obj 263 264 def test_odd_flush(self): 265 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1 266 import random 267 268 if hasattr(zlib, 'Z_SYNC_FLUSH'): 269 # Testing on 17K of "random" data 270 271 # Create compressor and decompressor objects 272 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 273 dco = zlib.decompressobj() 274 275 # Try 17K of data 276 # generate random data stream 277 try: 278 # In 2.3 and later, WichmannHill is the RNG of the bug report 279 gen = random.WichmannHill() 280 except AttributeError: 281 try: 282 # 2.2 called it Random 283 gen = random.Random() 284 except AttributeError: 285 # others might simply have a single RNG 286 gen = random 287 gen.seed(1) 288 data = genblock(1, 17 * 1024, generator=gen) 289 290 # compress, sync-flush, and decompress 291 first = co.compress(data) 292 second = co.flush(zlib.Z_SYNC_FLUSH) 293 expanded = dco.decompress(first + second) 294 295 # if decompressed data is different from the input data, choke. 296 self.assertEqual(expanded, data, "17K random source doesn't match") 297 298 def test_empty_flush(self): 299 # Test that calling .flush() on unused objects works. 300 # (Bug #1083110 -- calling .flush() on decompress objects 301 # caused a core dump.) 302 303 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 304 self.failUnless(co.flush()) # Returns a zlib header 305 dco = zlib.decompressobj() 306 self.assertEqual(dco.flush(), "") # Returns nothing 307 308 if hasattr(zlib.compressobj(), "copy"): 309 def test_compresscopy(self): 310 # Test copying a compression object 311 data0 = HAMLET_SCENE 312 data1 = HAMLET_SCENE.swapcase() 313 c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 314 bufs0 = [] 315 bufs0.append(c0.compress(data0)) 316 317 c1 = c0.copy() 318 bufs1 = bufs0[:] 319 320 bufs0.append(c0.compress(data0)) 321 bufs0.append(c0.flush()) 322 s0 = ''.join(bufs0) 323 324 bufs1.append(c1.compress(data1)) 325 bufs1.append(c1.flush()) 326 s1 = ''.join(bufs1) 327 328 self.assertEqual(zlib.decompress(s0),data0+data0) 329 self.assertEqual(zlib.decompress(s1),data0+data1) 330 331 def test_badcompresscopy(self): 332 # Test copying a compression object in an inconsistent state 333 c = zlib.compressobj() 334 c.compress(HAMLET_SCENE) 335 c.flush() 336 self.assertRaises(ValueError, c.copy) 337 338 if hasattr(zlib.decompressobj(), "copy"): 339 def test_decompresscopy(self): 340 # Test copying a decompression object 341 data = HAMLET_SCENE 342 comp = zlib.compress(data) 343 344 d0 = zlib.decompressobj() 345 bufs0 = [] 346 bufs0.append(d0.decompress(comp[:32])) 347 348 d1 = d0.copy() 349 bufs1 = bufs0[:] 350 351 bufs0.append(d0.decompress(comp[32:])) 352 s0 = ''.join(bufs0) 353 354 bufs1.append(d1.decompress(comp[32:])) 355 s1 = ''.join(bufs1) 356 357 self.assertEqual(s0,s1) 358 self.assertEqual(s0,data) 359 360 def test_baddecompresscopy(self): 361 # Test copying a compression object in an inconsistent state 362 data = zlib.compress(HAMLET_SCENE) 363 d = zlib.decompressobj() 364 d.decompress(data) 365 d.flush() 366 self.assertRaises(ValueError, d.copy) 367 368def genblock(seed, length, step=1024, generator=random): 369 """length-byte stream of random data from a seed (in step-byte blocks).""" 370 if seed is not None: 371 generator.seed(seed) 372 randint = generator.randint 373 if length < step or step < 2: 374 step = length 375 blocks = [] 376 for i in range(0, length, step): 377 blocks.append(''.join([chr(randint(0,255)) 378 for x in range(step)])) 379 return ''.join(blocks)[:length] 380 381 382 383def choose_lines(source, number, seed=None, generator=random): 384 """Return a list of number lines randomly chosen from the source""" 385 if seed is not None: 386 generator.seed(seed) 387 sources = source.split('\n') 388 return [generator.choice(sources) for n in range(number)] 389 390 391 392HAMLET_SCENE = """ 393LAERTES 394 395 O, fear me not. 396 I stay too long: but here my father comes. 397 398 Enter POLONIUS 399 400 A double blessing is a double grace, 401 Occasion smiles upon a second leave. 402 403LORD POLONIUS 404 405 Yet here, Laertes! aboard, aboard, for shame! 406 The wind sits in the shoulder of your sail, 407 And you are stay'd for. There; my blessing with thee! 408 And these few precepts in thy memory 409 See thou character. Give thy thoughts no tongue, 410 Nor any unproportioned thought his act. 411 Be thou familiar, but by no means vulgar. 412 Those friends thou hast, and their adoption tried, 413 Grapple them to thy soul with hoops of steel; 414 But do not dull thy palm with entertainment 415 Of each new-hatch'd, unfledged comrade. Beware 416 Of entrance to a quarrel, but being in, 417 Bear't that the opposed may beware of thee. 418 Give every man thy ear, but few thy voice; 419 Take each man's censure, but reserve thy judgment. 420 Costly thy habit as thy purse can buy, 421 But not express'd in fancy; rich, not gaudy; 422 For the apparel oft proclaims the man, 423 And they in France of the best rank and station 424 Are of a most select and generous chief in that. 425 Neither a borrower nor a lender be; 426 For loan oft loses both itself and friend, 427 And borrowing dulls the edge of husbandry. 428 This above all: to thine ownself be true, 429 And it must follow, as the night the day, 430 Thou canst not then be false to any man. 431 Farewell: my blessing season this in thee! 432 433LAERTES 434 435 Most humbly do I take my leave, my lord. 436 437LORD POLONIUS 438 439 The time invites you; go; your servants tend. 440 441LAERTES 442 443 Farewell, Ophelia; and remember well 444 What I have said to you. 445 446OPHELIA 447 448 'Tis in my memory lock'd, 449 And you yourself shall keep the key of it. 450 451LAERTES 452 453 Farewell. 454""" 455 456 457def test_main(): 458 test_support.run_unittest( 459 ChecksumTestCase, 460 ExceptionTestCase, 461 CompressTestCase, 462 CompressObjectTestCase 463 ) 464 465if __name__ == "__main__": 466 test_main() 467