test_support.py revision 6057b2e645dd3b7a262553d5a547ef45d3fc5d93
1"""Supporting definitions for the Python regression tests.""" 2 3if __name__ != 'test.test_support': 4 raise ImportError('test_support must be imported from the test package') 5 6import contextlib 7import errno 8import socket 9import sys 10import os 11import os.path 12import shutil 13import warnings 14import unittest 15 16class Error(Exception): 17 """Base class for regression test exceptions.""" 18 19class TestFailed(Error): 20 """Test failed.""" 21 22class TestSkipped(Error): 23 """Test skipped. 24 25 This can be raised to indicate that a test was deliberatly 26 skipped, but not because a feature wasn't available. For 27 example, if some resource can't be used, such as the network 28 appears to be unavailable, this should be raised instead of 29 TestFailed. 30 """ 31 32class ResourceDenied(TestSkipped): 33 """Test skipped because it requested a disallowed resource. 34 35 This is raised when a test calls requires() for a resource that 36 has not be enabled. It is used to distinguish between expected 37 and unexpected skips. 38 """ 39 40verbose = 1 # Flag set to 0 by regrtest.py 41use_resources = None # Flag set to [] by regrtest.py 42max_memuse = 0 # Disable bigmem tests (they will still be run with 43 # small sizes, to make sure they work.) 44 45# _original_stdout is meant to hold stdout at the time regrtest began. 46# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. 47# The point is to have some flavor of stdout the user can actually see. 48_original_stdout = None 49def record_original_stdout(stdout): 50 global _original_stdout 51 _original_stdout = stdout 52 53def get_original_stdout(): 54 return _original_stdout or sys.stdout 55 56def unload(name): 57 try: 58 del sys.modules[name] 59 except KeyError: 60 pass 61 62def unlink(filename): 63 try: 64 os.unlink(filename) 65 except OSError: 66 pass 67 68def rmtree(path): 69 try: 70 shutil.rmtree(path) 71 except OSError, e: 72 # Unix returns ENOENT, Windows returns ESRCH. 73 if e.errno not in (errno.ENOENT, errno.ESRCH): 74 raise 75 76def forget(modname): 77 '''"Forget" a module was ever imported by removing it from sys.modules and 78 deleting any .pyc and .pyo files.''' 79 unload(modname) 80 for dirname in sys.path: 81 unlink(os.path.join(dirname, modname + os.extsep + 'pyc')) 82 # Deleting the .pyo file cannot be within the 'try' for the .pyc since 83 # the chance exists that there is no .pyc (and thus the 'try' statement 84 # is exited) but there is a .pyo file. 85 unlink(os.path.join(dirname, modname + os.extsep + 'pyo')) 86 87def is_resource_enabled(resource): 88 """Test whether a resource is enabled. Known resources are set by 89 regrtest.py.""" 90 return use_resources is not None and resource in use_resources 91 92def requires(resource, msg=None): 93 """Raise ResourceDenied if the specified resource is not available. 94 95 If the caller's module is __main__ then automatically return True. The 96 possibility of False being returned occurs when regrtest.py is executing.""" 97 # see if the caller's module is __main__ - if so, treat as if 98 # the resource was set 99 if sys._getframe().f_back.f_globals.get("__name__") == "__main__": 100 return 101 if not is_resource_enabled(resource): 102 if msg is None: 103 msg = "Use of the `%s' resource not enabled" % resource 104 raise ResourceDenied(msg) 105 106def bind_port(sock, host='', preferred_port=54321): 107 """Try to bind the sock to a port. If we are running multiple 108 tests and we don't try multiple ports, the test can fails. This 109 makes the test more robust.""" 110 111 # Find some random ports that hopefully no one is listening on. 112 # Ideally each test would clean up after itself and not continue listening 113 # on any ports. However, this isn't the case. The last port (0) is 114 # a stop-gap that asks the O/S to assign a port. Whenever the warning 115 # message below is printed, the test that is listening on the port should 116 # be fixed to close the socket at the end of the test. 117 # Another reason why we can't use a port is another process (possibly 118 # another instance of the test suite) is using the same port. 119 for port in [preferred_port, 9907, 10243, 32999, 0]: 120 try: 121 sock.bind((host, port)) 122 if port == 0: 123 port = sock.getsockname()[1] 124 return port 125 except socket.error, (err, msg): 126 if err != errno.EADDRINUSE: 127 raise 128 print >>sys.__stderr__, \ 129 ' WARNING: failed to listen on port %d, trying another' % port 130 raise TestFailed('unable to find port to listen on') 131 132FUZZ = 1e-6 133 134def fcmp(x, y): # fuzzy comparison function 135 if type(x) == type(0.0) or type(y) == type(0.0): 136 try: 137 x, y = coerce(x, y) 138 fuzz = (abs(x) + abs(y)) * FUZZ 139 if abs(x-y) <= fuzz: 140 return 0 141 except: 142 pass 143 elif type(x) == type(y) and type(x) in (type(()), type([])): 144 for i in range(min(len(x), len(y))): 145 outcome = fcmp(x[i], y[i]) 146 if outcome != 0: 147 return outcome 148 return cmp(len(x), len(y)) 149 return cmp(x, y) 150 151try: 152 unicode 153 have_unicode = True 154except NameError: 155 have_unicode = False 156 157is_jython = sys.platform.startswith('java') 158 159# Filename used for testing 160if os.name == 'java': 161 # Jython disallows @ in module names 162 TESTFN = '$test' 163elif os.name == 'riscos': 164 TESTFN = 'testfile' 165else: 166 TESTFN = '@test' 167 # Unicode name only used if TEST_FN_ENCODING exists for the platform. 168 if have_unicode: 169 # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding() 170 # TESTFN_UNICODE is a filename that can be encoded using the 171 # file system encoding, but *not* with the default (ascii) encoding 172 if isinstance('', unicode): 173 # python -U 174 # XXX perhaps unicode() should accept Unicode strings? 175 TESTFN_UNICODE = "@test-\xe0\xf2" 176 else: 177 # 2 latin characters. 178 TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1") 179 TESTFN_ENCODING = sys.getfilesystemencoding() 180 # TESTFN_UNICODE_UNENCODEABLE is a filename that should *not* be 181 # able to be encoded by *either* the default or filesystem encoding. 182 # This test really only makes sense on Windows NT platforms 183 # which have special Unicode support in posixmodule. 184 if (not hasattr(sys, "getwindowsversion") or 185 sys.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME 186 TESTFN_UNICODE_UNENCODEABLE = None 187 else: 188 # Japanese characters (I think - from bug 846133) 189 TESTFN_UNICODE_UNENCODEABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"') 190 try: 191 # XXX - Note - should be using TESTFN_ENCODING here - but for 192 # Windows, "mbcs" currently always operates as if in 193 # errors=ignore' mode - hence we get '?' characters rather than 194 # the exception. 'Latin1' operates as we expect - ie, fails. 195 # See [ 850997 ] mbcs encoding ignores errors 196 TESTFN_UNICODE_UNENCODEABLE.encode("Latin1") 197 except UnicodeEncodeError: 198 pass 199 else: 200 print \ 201 'WARNING: The filename %r CAN be encoded by the filesystem. ' \ 202 'Unicode filename tests may not be effective' \ 203 % TESTFN_UNICODE_UNENCODEABLE 204 205# Make sure we can write to TESTFN, try in /tmp if we can't 206fp = None 207try: 208 fp = open(TESTFN, 'w+') 209except IOError: 210 TMP_TESTFN = os.path.join('/tmp', TESTFN) 211 try: 212 fp = open(TMP_TESTFN, 'w+') 213 TESTFN = TMP_TESTFN 214 del TMP_TESTFN 215 except IOError: 216 print ('WARNING: tests will fail, unable to write to: %s or %s' % 217 (TESTFN, TMP_TESTFN)) 218if fp is not None: 219 fp.close() 220 unlink(TESTFN) 221del fp 222 223def findfile(file, here=__file__): 224 """Try to find a file on sys.path and the working directory. If it is not 225 found the argument passed to the function is returned (this does not 226 necessarily signal failure; could still be the legitimate path).""" 227 if os.path.isabs(file): 228 return file 229 path = sys.path 230 path = [os.path.dirname(here)] + path 231 for dn in path: 232 fn = os.path.join(dn, file) 233 if os.path.exists(fn): return fn 234 return file 235 236def verify(condition, reason='test failed'): 237 """Verify that condition is true. If not, raise TestFailed. 238 239 The optional argument reason can be given to provide 240 a better error text. 241 """ 242 243 if not condition: 244 raise TestFailed(reason) 245 246def vereq(a, b): 247 """Raise TestFailed if a == b is false. 248 249 This is better than verify(a == b) because, in case of failure, the 250 error message incorporates repr(a) and repr(b) so you can see the 251 inputs. 252 253 Note that "not (a == b)" isn't necessarily the same as "a != b"; the 254 former is tested. 255 """ 256 257 if not (a == b): 258 raise TestFailed("%r == %r" % (a, b)) 259 260def sortdict(dict): 261 "Like repr(dict), but in sorted order." 262 items = dict.items() 263 items.sort() 264 reprpairs = ["%r: %r" % pair for pair in items] 265 withcommas = ", ".join(reprpairs) 266 return "{%s}" % withcommas 267 268def check_syntax_error(testcase, statement): 269 try: 270 compile(statement, '<test string>', 'exec') 271 except SyntaxError: 272 pass 273 else: 274 testcase.fail('Missing SyntaxError: "%s"' % statement) 275 276def open_urlresource(url): 277 import urllib, urlparse 278 279 requires('urlfetch') 280 filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL! 281 282 for path in [os.path.curdir, os.path.pardir]: 283 fn = os.path.join(path, filename) 284 if os.path.exists(fn): 285 return open(fn) 286 287 print >> get_original_stdout(), '\tfetching %s ...' % url 288 fn, _ = urllib.urlretrieve(url, filename) 289 return open(fn) 290 291 292class WarningMessage(object): 293 "Holds the result of the latest showwarning() call" 294 def __init__(self): 295 self.message = None 296 self.category = None 297 self.filename = None 298 self.lineno = None 299 300 def _showwarning(self, message, category, filename, lineno, file=None): 301 self.message = message 302 self.category = category 303 self.filename = filename 304 self.lineno = lineno 305 306@contextlib.contextmanager 307def catch_warning(): 308 """ 309 Guard the warnings filter from being permanently changed and record the 310 data of the last warning that has been issued. 311 312 Use like this: 313 314 with catch_warning() as w: 315 warnings.warn("foo") 316 assert str(w.message) == "foo" 317 """ 318 warning = WarningMessage() 319 original_filters = warnings.filters[:] 320 original_showwarning = warnings.showwarning 321 warnings.showwarning = warning._showwarning 322 try: 323 yield warning 324 finally: 325 warnings.showwarning = original_showwarning 326 warnings.filters = original_filters 327 328class EnvironmentVarGuard(object): 329 330 """Class to help protect the environment variable properly. Can be used as 331 a context manager.""" 332 333 def __init__(self): 334 self._environ = os.environ 335 self._unset = set() 336 self._reset = dict() 337 338 def set(self, envvar, value): 339 if envvar not in self._environ: 340 self._unset.add(envvar) 341 else: 342 self._reset[envvar] = self._environ[envvar] 343 self._environ[envvar] = value 344 345 def unset(self, envvar): 346 if envvar in self._environ: 347 self._reset[envvar] = self._environ[envvar] 348 del self._environ[envvar] 349 350 def __enter__(self): 351 return self 352 353 def __exit__(self, *ignore_exc): 354 for envvar, value in self._reset.iteritems(): 355 self._environ[envvar] = value 356 for unset in self._unset: 357 del self._environ[unset] 358 359class TransientResource(object): 360 361 """Raise ResourceDenied if an exception is raised while the context manager 362 is in effect that matches the specified exception and attributes.""" 363 364 def __init__(self, exc, **kwargs): 365 self.exc = exc 366 self.attrs = kwargs 367 368 def __enter__(self): 369 return self 370 371 def __exit__(self, type_=None, value=None, traceback=None): 372 """If type_ is a subclass of self.exc and value has attributes matching 373 self.attrs, raise ResourceDenied. Otherwise let the exception 374 propagate (if any).""" 375 if type_ is not None and issubclass(self.exc, type_): 376 for attr, attr_value in self.attrs.iteritems(): 377 if not hasattr(value, attr): 378 break 379 if getattr(value, attr) != attr_value: 380 break 381 else: 382 raise ResourceDenied("an optional resource is not available") 383 384 385def transient_internet(): 386 """Return a context manager that raises ResourceDenied when various issues 387 with the Internet connection manifest themselves as exceptions.""" 388 time_out = TransientResource(IOError, errno=errno.ETIMEDOUT) 389 socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET) 390 ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET) 391 return contextlib.nested(time_out, socket_peer_reset, ioerror_peer_reset) 392 393 394@contextlib.contextmanager 395def captured_stdout(): 396 """Run the with statement body using a StringIO object as sys.stdout. 397 Example use:: 398 399 with captured_stdout() as s: 400 print "hello" 401 assert s.getvalue() == "hello" 402 """ 403 import StringIO 404 orig_stdout = sys.stdout 405 sys.stdout = StringIO.StringIO() 406 yield sys.stdout 407 sys.stdout = orig_stdout 408 409 410#======================================================================= 411# Decorator for running a function in a different locale, correctly resetting 412# it afterwards. 413 414def run_with_locale(catstr, *locales): 415 def decorator(func): 416 def inner(*args, **kwds): 417 try: 418 import locale 419 category = getattr(locale, catstr) 420 orig_locale = locale.setlocale(category) 421 except AttributeError: 422 # if the test author gives us an invalid category string 423 raise 424 except: 425 # cannot retrieve original locale, so do nothing 426 locale = orig_locale = None 427 else: 428 for loc in locales: 429 try: 430 locale.setlocale(category, loc) 431 break 432 except: 433 pass 434 435 # now run the function, resetting the locale on exceptions 436 try: 437 return func(*args, **kwds) 438 finally: 439 if locale and orig_locale: 440 locale.setlocale(category, orig_locale) 441 inner.func_name = func.func_name 442 inner.__doc__ = func.__doc__ 443 return inner 444 return decorator 445 446#======================================================================= 447# Big-memory-test support. Separate from 'resources' because memory use should be configurable. 448 449# Some handy shorthands. Note that these are used for byte-limits as well 450# as size-limits, in the various bigmem tests 451_1M = 1024*1024 452_1G = 1024 * _1M 453_2G = 2 * _1G 454 455# Hack to get at the maximum value an internal index can take. 456class _Dummy: 457 def __getslice__(self, i, j): 458 return j 459MAX_Py_ssize_t = _Dummy()[:] 460 461def set_memlimit(limit): 462 import re 463 global max_memuse 464 sizes = { 465 'k': 1024, 466 'm': _1M, 467 'g': _1G, 468 't': 1024*_1G, 469 } 470 m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, 471 re.IGNORECASE | re.VERBOSE) 472 if m is None: 473 raise ValueError('Invalid memory limit %r' % (limit,)) 474 memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) 475 if memlimit > MAX_Py_ssize_t: 476 memlimit = MAX_Py_ssize_t 477 if memlimit < _2G - 1: 478 raise ValueError('Memory limit %r too low to be useful' % (limit,)) 479 max_memuse = memlimit 480 481def bigmemtest(minsize, memuse, overhead=5*_1M): 482 """Decorator for bigmem tests. 483 484 'minsize' is the minimum useful size for the test (in arbitrary, 485 test-interpreted units.) 'memuse' is the number of 'bytes per size' for 486 the test, or a good estimate of it. 'overhead' specifies fixed overhead, 487 independant of the testsize, and defaults to 5Mb. 488 489 The decorator tries to guess a good value for 'size' and passes it to 490 the decorated test function. If minsize * memuse is more than the 491 allowed memory use (as defined by max_memuse), the test is skipped. 492 Otherwise, minsize is adjusted upward to use up to max_memuse. 493 """ 494 def decorator(f): 495 def wrapper(self): 496 if not max_memuse: 497 # If max_memuse is 0 (the default), 498 # we still want to run the tests with size set to a few kb, 499 # to make sure they work. We still want to avoid using 500 # too much memory, though, but we do that noisily. 501 maxsize = 5147 502 self.failIf(maxsize * memuse + overhead > 20 * _1M) 503 else: 504 maxsize = int((max_memuse - overhead) / memuse) 505 if maxsize < minsize: 506 # Really ought to print 'test skipped' or something 507 if verbose: 508 sys.stderr.write("Skipping %s because of memory " 509 "constraint\n" % (f.__name__,)) 510 return 511 # Try to keep some breathing room in memory use 512 maxsize = max(maxsize - 50 * _1M, minsize) 513 return f(self, maxsize) 514 wrapper.minsize = minsize 515 wrapper.memuse = memuse 516 wrapper.overhead = overhead 517 return wrapper 518 return decorator 519 520def bigaddrspacetest(f): 521 """Decorator for tests that fill the address space.""" 522 def wrapper(self): 523 if max_memuse < MAX_Py_ssize_t: 524 if verbose: 525 sys.stderr.write("Skipping %s because of memory " 526 "constraint\n" % (f.__name__,)) 527 else: 528 return f(self) 529 return wrapper 530 531#======================================================================= 532# unittest integration. 533 534class BasicTestRunner: 535 def run(self, test): 536 result = unittest.TestResult() 537 test(result) 538 return result 539 540 541def _run_suite(suite): 542 """Run tests from a unittest.TestSuite-derived class.""" 543 if verbose: 544 runner = unittest.TextTestRunner(sys.stdout, verbosity=2) 545 else: 546 runner = BasicTestRunner() 547 548 result = runner.run(suite) 549 if not result.wasSuccessful(): 550 if len(result.errors) == 1 and not result.failures: 551 err = result.errors[0][1] 552 elif len(result.failures) == 1 and not result.errors: 553 err = result.failures[0][1] 554 else: 555 err = "errors occurred; run in verbose mode for details" 556 raise TestFailed(err) 557 558 559def run_unittest(*classes): 560 """Run tests from unittest.TestCase-derived classes.""" 561 valid_types = (unittest.TestSuite, unittest.TestCase) 562 suite = unittest.TestSuite() 563 for cls in classes: 564 if isinstance(cls, str): 565 if cls in sys.modules: 566 suite.addTest(unittest.findTestCases(sys.modules[cls])) 567 else: 568 raise ValueError("str arguments must be keys in sys.modules") 569 elif isinstance(cls, valid_types): 570 suite.addTest(cls) 571 else: 572 suite.addTest(unittest.makeSuite(cls)) 573 _run_suite(suite) 574 575 576#======================================================================= 577# doctest driver. 578 579def run_doctest(module, verbosity=None): 580 """Run doctest on the given module. Return (#failures, #tests). 581 582 If optional argument verbosity is not specified (or is None), pass 583 test_support's belief about verbosity on to doctest. Else doctest's 584 usual behavior is used (it searches sys.argv for -v). 585 """ 586 587 import doctest 588 589 if verbosity is None: 590 verbosity = verbose 591 else: 592 verbosity = None 593 594 # Direct doctest output (normally just errors) to real stdout; doctest 595 # output shouldn't be compared by regrtest. 596 save_stdout = sys.stdout 597 sys.stdout = get_original_stdout() 598 try: 599 f, t = doctest.testmod(module, verbose=verbosity) 600 if f: 601 raise TestFailed("%d of %d doctests failed" % (f, t)) 602 finally: 603 sys.stdout = save_stdout 604 if verbose: 605 print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t) 606 return f, t 607 608#======================================================================= 609# Threading support to prevent reporting refleaks when running regrtest.py -R 610 611def threading_setup(): 612 import threading 613 return len(threading._active), len(threading._limbo) 614 615def threading_cleanup(num_active, num_limbo): 616 import threading 617 import time 618 619 _MAX_COUNT = 10 620 count = 0 621 while len(threading._active) != num_active and count < _MAX_COUNT: 622 count += 1 623 time.sleep(0.1) 624 625 count = 0 626 while len(threading._limbo) != num_limbo and count < _MAX_COUNT: 627 count += 1 628 time.sleep(0.1) 629 630def reap_children(): 631 """Use this function at the end of test_main() whenever sub-processes 632 are started. This will help ensure that no extra children (zombies) 633 stick around to hog resources and create problems when looking 634 for refleaks. 635 """ 636 637 # Reap all our dead child processes so we don't leave zombies around. 638 # These hog resources and might be causing some of the buildbots to die. 639 if hasattr(os, 'waitpid'): 640 any_process = -1 641 while True: 642 try: 643 # This will raise an exception on Windows. That's ok. 644 pid, status = os.waitpid(any_process, os.WNOHANG) 645 if pid == 0: 646 break 647 except: 648 break 649