test_support.py revision e7d8be80ba634fa15ece6f503c33592e0d333361
1"""Supporting definitions for the Python regression tests.""" 2 3if __name__ != 'test.test_support': 4 raise ImportError('test_support must be imported from the test package') 5 6import contextlib 7import errno 8import socket 9import sys 10import os 11import shutil 12import warnings 13import unittest 14 15__all__ = ["Error", "TestFailed", "TestSkipped", "ResourceDenied", "import_module", 16 "verbose", "use_resources", "max_memuse", "record_original_stdout", 17 "get_original_stdout", "unload", "unlink", "rmtree", "forget", 18 "is_resource_enabled", "requires", "find_unused_port", "bind_port", 19 "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ", 20 "findfile", "verify", "vereq", "sortdict", "check_syntax_error", 21 "open_urlresource", "WarningMessage", "catch_warning", "CleanImport", 22 "EnvironmentVarGuard", "captured_output", 23 "captured_stdout", "TransientResource", "transient_internet", 24 "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest", 25 "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup", 26 "threading_cleanup", "reap_children"] 27 28class Error(Exception): 29 """Base class for regression test exceptions.""" 30 31class TestFailed(Error): 32 """Test failed.""" 33 34class TestSkipped(Error): 35 """Test skipped. 36 37 This can be raised to indicate that a test was deliberatly 38 skipped, but not because a feature wasn't available. For 39 example, if some resource can't be used, such as the network 40 appears to be unavailable, this should be raised instead of 41 TestFailed. 42 """ 43 44class ResourceDenied(TestSkipped): 45 """Test skipped because it requested a disallowed resource. 46 47 This is raised when a test calls requires() for a resource that 48 has not be enabled. It is used to distinguish between expected 49 and unexpected skips. 50 """ 51 52def import_module(name, deprecated=False): 53 """Import the module to be tested, raising TestSkipped if it is not 54 available.""" 55 with catch_warning(record=False): 56 if deprecated: 57 warnings.filterwarnings("ignore", ".+ (module|package)", 58 DeprecationWarning) 59 try: 60 module = __import__(name, level=0) 61 except ImportError: 62 raise TestSkipped("No module named " + name) 63 else: 64 return module 65 66verbose = 1 # Flag set to 0 by regrtest.py 67use_resources = None # Flag set to [] by regrtest.py 68max_memuse = 0 # Disable bigmem tests (they will still be run with 69 # small sizes, to make sure they work.) 70real_max_memuse = 0 71 72# _original_stdout is meant to hold stdout at the time regrtest began. 73# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. 74# The point is to have some flavor of stdout the user can actually see. 75_original_stdout = None 76def record_original_stdout(stdout): 77 global _original_stdout 78 _original_stdout = stdout 79 80def get_original_stdout(): 81 return _original_stdout or sys.stdout 82 83def unload(name): 84 try: 85 del sys.modules[name] 86 except KeyError: 87 pass 88 89def unlink(filename): 90 try: 91 os.unlink(filename) 92 except OSError: 93 pass 94 95def rmtree(path): 96 try: 97 shutil.rmtree(path) 98 except OSError, e: 99 # Unix returns ENOENT, Windows returns ESRCH. 100 if e.errno not in (errno.ENOENT, errno.ESRCH): 101 raise 102 103def forget(modname): 104 '''"Forget" a module was ever imported by removing it from sys.modules and 105 deleting any .pyc and .pyo files.''' 106 unload(modname) 107 for dirname in sys.path: 108 unlink(os.path.join(dirname, modname + os.extsep + 'pyc')) 109 # Deleting the .pyo file cannot be within the 'try' for the .pyc since 110 # the chance exists that there is no .pyc (and thus the 'try' statement 111 # is exited) but there is a .pyo file. 112 unlink(os.path.join(dirname, modname + os.extsep + 'pyo')) 113 114def is_resource_enabled(resource): 115 """Test whether a resource is enabled. Known resources are set by 116 regrtest.py.""" 117 return use_resources is not None and resource in use_resources 118 119def requires(resource, msg=None): 120 """Raise ResourceDenied if the specified resource is not available. 121 122 If the caller's module is __main__ then automatically return True. The 123 possibility of False being returned occurs when regrtest.py is executing.""" 124 # see if the caller's module is __main__ - if so, treat as if 125 # the resource was set 126 if sys._getframe().f_back.f_globals.get("__name__") == "__main__": 127 return 128 if not is_resource_enabled(resource): 129 if msg is None: 130 msg = "Use of the `%s' resource not enabled" % resource 131 raise ResourceDenied(msg) 132 133HOST = 'localhost' 134 135def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): 136 """Returns an unused port that should be suitable for binding. This is 137 achieved by creating a temporary socket with the same family and type as 138 the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to 139 the specified host address (defaults to 0.0.0.0) with the port set to 0, 140 eliciting an unused ephemeral port from the OS. The temporary socket is 141 then closed and deleted, and the ephemeral port is returned. 142 143 Either this method or bind_port() should be used for any tests where a 144 server socket needs to be bound to a particular port for the duration of 145 the test. Which one to use depends on whether the calling code is creating 146 a python socket, or if an unused port needs to be provided in a constructor 147 or passed to an external program (i.e. the -accept argument to openssl's 148 s_server mode). Always prefer bind_port() over find_unused_port() where 149 possible. Hard coded ports should *NEVER* be used. As soon as a server 150 socket is bound to a hard coded port, the ability to run multiple instances 151 of the test simultaneously on the same host is compromised, which makes the 152 test a ticking time bomb in a buildbot environment. On Unix buildbots, this 153 may simply manifest as a failed test, which can be recovered from without 154 intervention in most cases, but on Windows, the entire python process can 155 completely and utterly wedge, requiring someone to log in to the buildbot 156 and manually kill the affected process. 157 158 (This is easy to reproduce on Windows, unfortunately, and can be traced to 159 the SO_REUSEADDR socket option having different semantics on Windows versus 160 Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind, 161 listen and then accept connections on identical host/ports. An EADDRINUSE 162 socket.error will be raised at some point (depending on the platform and 163 the order bind and listen were called on each socket). 164 165 However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE 166 will ever be raised when attempting to bind two identical host/ports. When 167 accept() is called on each socket, the second caller's process will steal 168 the port from the first caller, leaving them both in an awkwardly wedged 169 state where they'll no longer respond to any signals or graceful kills, and 170 must be forcibly killed via OpenProcess()/TerminateProcess(). 171 172 The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option 173 instead of SO_REUSEADDR, which effectively affords the same semantics as 174 SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open 175 Source world compared to Windows ones, this is a common mistake. A quick 176 look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when 177 openssl.exe is called with the 's_server' option, for example. See 178 http://bugs.python.org/issue2550 for more info. The following site also 179 has a very thorough description about the implications of both REUSEADDR 180 and EXCLUSIVEADDRUSE on Windows: 181 http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx) 182 183 XXX: although this approach is a vast improvement on previous attempts to 184 elicit unused ports, it rests heavily on the assumption that the ephemeral 185 port returned to us by the OS won't immediately be dished back out to some 186 other process when we close and delete our temporary socket but before our 187 calling code has a chance to bind the returned port. We can deal with this 188 issue if/when we come across it.""" 189 tempsock = socket.socket(family, socktype) 190 port = bind_port(tempsock) 191 tempsock.close() 192 del tempsock 193 return port 194 195def bind_port(sock, host=HOST): 196 """Bind the socket to a free port and return the port number. Relies on 197 ephemeral ports in order to ensure we are using an unbound port. This is 198 important as many tests may be running simultaneously, especially in a 199 buildbot environment. This method raises an exception if the sock.family 200 is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR 201 or SO_REUSEPORT set on it. Tests should *never* set these socket options 202 for TCP/IP sockets. The only case for setting these options is testing 203 multicasting via multiple UDP sockets. 204 205 Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. 206 on Windows), it will be set on the socket. This will prevent anyone else 207 from bind()'ing to our host/port for the duration of the test. 208 """ 209 if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: 210 if hasattr(socket, 'SO_REUSEADDR'): 211 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: 212 raise TestFailed("tests should never set the SO_REUSEADDR " \ 213 "socket option on TCP/IP sockets!") 214 if hasattr(socket, 'SO_REUSEPORT'): 215 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: 216 raise TestFailed("tests should never set the SO_REUSEPORT " \ 217 "socket option on TCP/IP sockets!") 218 if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): 219 sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) 220 221 sock.bind((host, 0)) 222 port = sock.getsockname()[1] 223 return port 224 225FUZZ = 1e-6 226 227def fcmp(x, y): # fuzzy comparison function 228 if isinstance(x, float) or isinstance(y, float): 229 try: 230 fuzz = (abs(x) + abs(y)) * FUZZ 231 if abs(x-y) <= fuzz: 232 return 0 233 except: 234 pass 235 elif type(x) == type(y) and isinstance(x, (tuple, list)): 236 for i in range(min(len(x), len(y))): 237 outcome = fcmp(x[i], y[i]) 238 if outcome != 0: 239 return outcome 240 return (len(x) > len(y)) - (len(x) < len(y)) 241 return (x > y) - (x < y) 242 243try: 244 unicode 245 have_unicode = True 246except NameError: 247 have_unicode = False 248 249is_jython = sys.platform.startswith('java') 250 251# Filename used for testing 252if os.name == 'java': 253 # Jython disallows @ in module names 254 TESTFN = '$test' 255elif os.name == 'riscos': 256 TESTFN = 'testfile' 257else: 258 TESTFN = '@test' 259 # Unicode name only used if TEST_FN_ENCODING exists for the platform. 260 if have_unicode: 261 # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding() 262 # TESTFN_UNICODE is a filename that can be encoded using the 263 # file system encoding, but *not* with the default (ascii) encoding 264 if isinstance('', unicode): 265 # python -U 266 # XXX perhaps unicode() should accept Unicode strings? 267 TESTFN_UNICODE = "@test-\xe0\xf2" 268 else: 269 # 2 latin characters. 270 TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1") 271 TESTFN_ENCODING = sys.getfilesystemencoding() 272 # TESTFN_UNICODE_UNENCODEABLE is a filename that should *not* be 273 # able to be encoded by *either* the default or filesystem encoding. 274 # This test really only makes sense on Windows NT platforms 275 # which have special Unicode support in posixmodule. 276 if (not hasattr(sys, "getwindowsversion") or 277 sys.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME 278 TESTFN_UNICODE_UNENCODEABLE = None 279 else: 280 # Japanese characters (I think - from bug 846133) 281 TESTFN_UNICODE_UNENCODEABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"') 282 try: 283 # XXX - Note - should be using TESTFN_ENCODING here - but for 284 # Windows, "mbcs" currently always operates as if in 285 # errors=ignore' mode - hence we get '?' characters rather than 286 # the exception. 'Latin1' operates as we expect - ie, fails. 287 # See [ 850997 ] mbcs encoding ignores errors 288 TESTFN_UNICODE_UNENCODEABLE.encode("Latin1") 289 except UnicodeEncodeError: 290 pass 291 else: 292 print \ 293 'WARNING: The filename %r CAN be encoded by the filesystem. ' \ 294 'Unicode filename tests may not be effective' \ 295 % TESTFN_UNICODE_UNENCODEABLE 296 297# Make sure we can write to TESTFN, try in /tmp if we can't 298fp = None 299try: 300 fp = open(TESTFN, 'w+') 301except IOError: 302 TMP_TESTFN = os.path.join('/tmp', TESTFN) 303 try: 304 fp = open(TMP_TESTFN, 'w+') 305 TESTFN = TMP_TESTFN 306 del TMP_TESTFN 307 except IOError: 308 print ('WARNING: tests will fail, unable to write to: %s or %s' % 309 (TESTFN, TMP_TESTFN)) 310if fp is not None: 311 fp.close() 312 unlink(TESTFN) 313del fp 314 315def findfile(file, here=__file__): 316 """Try to find a file on sys.path and the working directory. If it is not 317 found the argument passed to the function is returned (this does not 318 necessarily signal failure; could still be the legitimate path).""" 319 if os.path.isabs(file): 320 return file 321 path = sys.path 322 path = [os.path.dirname(here)] + path 323 for dn in path: 324 fn = os.path.join(dn, file) 325 if os.path.exists(fn): return fn 326 return file 327 328def verify(condition, reason='test failed'): 329 """Verify that condition is true. If not, raise TestFailed. 330 331 The optional argument reason can be given to provide 332 a better error text. 333 """ 334 335 if not condition: 336 raise TestFailed(reason) 337 338def vereq(a, b): 339 """Raise TestFailed if a == b is false. 340 341 This is better than verify(a == b) because, in case of failure, the 342 error message incorporates repr(a) and repr(b) so you can see the 343 inputs. 344 345 Note that "not (a == b)" isn't necessarily the same as "a != b"; the 346 former is tested. 347 """ 348 349 if not (a == b): 350 raise TestFailed("%r == %r" % (a, b)) 351 352def sortdict(dict): 353 "Like repr(dict), but in sorted order." 354 items = dict.items() 355 items.sort() 356 reprpairs = ["%r: %r" % pair for pair in items] 357 withcommas = ", ".join(reprpairs) 358 return "{%s}" % withcommas 359 360def check_syntax_error(testcase, statement): 361 try: 362 compile(statement, '<test string>', 'exec') 363 except SyntaxError: 364 pass 365 else: 366 testcase.fail('Missing SyntaxError: "%s"' % statement) 367 368def open_urlresource(url): 369 import urllib, urlparse 370 371 requires('urlfetch') 372 filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL! 373 374 for path in [os.path.curdir, os.path.pardir]: 375 fn = os.path.join(path, filename) 376 if os.path.exists(fn): 377 return open(fn) 378 379 print >> get_original_stdout(), '\tfetching %s ...' % url 380 fn, _ = urllib.urlretrieve(url, filename) 381 return open(fn) 382 383 384class WarningMessage(object): 385 "Holds the result of a single showwarning() call" 386 _WARNING_DETAILS = "message category filename lineno line".split() 387 def __init__(self, message, category, filename, lineno, line=None): 388 for attr in self._WARNING_DETAILS: 389 setattr(self, attr, locals()[attr]) 390 self._category_name = category.__name__ if category else None 391 392 def __str__(self): 393 return ("{message : %r, category : %r, filename : %r, lineno : %s, " 394 "line : %r}" % (self.message, self._category_name, 395 self.filename, self.lineno, self.line)) 396 397class WarningRecorder(object): 398 "Records the result of any showwarning calls" 399 def __init__(self): 400 self.warnings = [] 401 self._set_last(None) 402 403 def _showwarning(self, message, category, filename, lineno, 404 file=None, line=None): 405 wm = WarningMessage(message, category, filename, lineno, line) 406 self.warnings.append(wm) 407 self._set_last(wm) 408 409 def _set_last(self, last_warning): 410 if last_warning is None: 411 for attr in WarningMessage._WARNING_DETAILS: 412 setattr(self, attr, None) 413 else: 414 for attr in WarningMessage._WARNING_DETAILS: 415 setattr(self, attr, getattr(last_warning, attr)) 416 417 def reset(self): 418 self.warnings = [] 419 self._set_last(None) 420 421 def __str__(self): 422 return '[%s]' % (', '.join(map(str, self.warnings))) 423 424@contextlib.contextmanager 425def catch_warning(module=warnings, record=True): 426 """Guard the warnings filter from being permanently changed and 427 optionally record the details of any warnings that are issued. 428 429 Use like this: 430 431 with catch_warning() as w: 432 warnings.warn("foo") 433 assert str(w.message) == "foo" 434 """ 435 original_filters = module.filters 436 original_showwarning = module.showwarning 437 if record: 438 recorder = WarningRecorder() 439 module.showwarning = recorder._showwarning 440 else: 441 recorder = None 442 try: 443 # Replace the filters with a copy of the original 444 module.filters = module.filters[:] 445 yield recorder 446 finally: 447 module.showwarning = original_showwarning 448 module.filters = original_filters 449 450 451class CleanImport(object): 452 """Context manager to force import to return a new module reference. 453 454 This is useful for testing module-level behaviours, such as 455 the emission of a DeprecationWarning on import. 456 457 Use like this: 458 459 with CleanImport("foo"): 460 __import__("foo") # new reference 461 """ 462 463 def __init__(self, *module_names): 464 self.original_modules = sys.modules.copy() 465 for module_name in module_names: 466 if module_name in sys.modules: 467 module = sys.modules[module_name] 468 # It is possible that module_name is just an alias for 469 # another module (e.g. stub for modules renamed in 3.x). 470 # In that case, we also need delete the real module to clear 471 # the import cache. 472 if module.__name__ != module_name: 473 del sys.modules[module.__name__] 474 del sys.modules[module_name] 475 476 def __enter__(self): 477 return self 478 479 def __exit__(self, *ignore_exc): 480 sys.modules.update(self.original_modules) 481 482 483class EnvironmentVarGuard(object): 484 485 """Class to help protect the environment variable properly. Can be used as 486 a context manager.""" 487 488 def __init__(self): 489 self._environ = os.environ 490 self._unset = set() 491 self._reset = dict() 492 493 def set(self, envvar, value): 494 if envvar not in self._environ: 495 self._unset.add(envvar) 496 else: 497 self._reset[envvar] = self._environ[envvar] 498 self._environ[envvar] = value 499 500 def unset(self, envvar): 501 if envvar in self._environ: 502 self._reset[envvar] = self._environ[envvar] 503 del self._environ[envvar] 504 505 def __enter__(self): 506 return self 507 508 def __exit__(self, *ignore_exc): 509 for envvar, value in self._reset.iteritems(): 510 self._environ[envvar] = value 511 for unset in self._unset: 512 del self._environ[unset] 513 514class TransientResource(object): 515 516 """Raise ResourceDenied if an exception is raised while the context manager 517 is in effect that matches the specified exception and attributes.""" 518 519 def __init__(self, exc, **kwargs): 520 self.exc = exc 521 self.attrs = kwargs 522 523 def __enter__(self): 524 return self 525 526 def __exit__(self, type_=None, value=None, traceback=None): 527 """If type_ is a subclass of self.exc and value has attributes matching 528 self.attrs, raise ResourceDenied. Otherwise let the exception 529 propagate (if any).""" 530 if type_ is not None and issubclass(self.exc, type_): 531 for attr, attr_value in self.attrs.iteritems(): 532 if not hasattr(value, attr): 533 break 534 if getattr(value, attr) != attr_value: 535 break 536 else: 537 raise ResourceDenied("an optional resource is not available") 538 539 540def transient_internet(): 541 """Return a context manager that raises ResourceDenied when various issues 542 with the Internet connection manifest themselves as exceptions.""" 543 time_out = TransientResource(IOError, errno=errno.ETIMEDOUT) 544 socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET) 545 ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET) 546 return contextlib.nested(time_out, socket_peer_reset, ioerror_peer_reset) 547 548 549@contextlib.contextmanager 550def captured_output(stream_name): 551 """Run the 'with' statement body using a StringIO object in place of a 552 specific attribute on the sys module. 553 Example use (with 'stream_name=stdout'):: 554 555 with captured_stdout() as s: 556 print "hello" 557 assert s.getvalue() == "hello" 558 """ 559 import StringIO 560 orig_stdout = getattr(sys, stream_name) 561 setattr(sys, stream_name, StringIO.StringIO()) 562 try: 563 yield getattr(sys, stream_name) 564 finally: 565 setattr(sys, stream_name, orig_stdout) 566 567def captured_stdout(): 568 return captured_output("stdout") 569 570 571#======================================================================= 572# Decorator for running a function in a different locale, correctly resetting 573# it afterwards. 574 575def run_with_locale(catstr, *locales): 576 def decorator(func): 577 def inner(*args, **kwds): 578 try: 579 import locale 580 category = getattr(locale, catstr) 581 orig_locale = locale.setlocale(category) 582 except AttributeError: 583 # if the test author gives us an invalid category string 584 raise 585 except: 586 # cannot retrieve original locale, so do nothing 587 locale = orig_locale = None 588 else: 589 for loc in locales: 590 try: 591 locale.setlocale(category, loc) 592 break 593 except: 594 pass 595 596 # now run the function, resetting the locale on exceptions 597 try: 598 return func(*args, **kwds) 599 finally: 600 if locale and orig_locale: 601 locale.setlocale(category, orig_locale) 602 inner.func_name = func.func_name 603 inner.__doc__ = func.__doc__ 604 return inner 605 return decorator 606 607#======================================================================= 608# Big-memory-test support. Separate from 'resources' because memory use should be configurable. 609 610# Some handy shorthands. Note that these are used for byte-limits as well 611# as size-limits, in the various bigmem tests 612_1M = 1024*1024 613_1G = 1024 * _1M 614_2G = 2 * _1G 615_4G = 4 * _1G 616 617MAX_Py_ssize_t = sys.maxsize 618 619def set_memlimit(limit): 620 import re 621 global max_memuse 622 global real_max_memuse 623 sizes = { 624 'k': 1024, 625 'm': _1M, 626 'g': _1G, 627 't': 1024*_1G, 628 } 629 m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, 630 re.IGNORECASE | re.VERBOSE) 631 if m is None: 632 raise ValueError('Invalid memory limit %r' % (limit,)) 633 memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) 634 real_max_memuse = memlimit 635 if memlimit > MAX_Py_ssize_t: 636 memlimit = MAX_Py_ssize_t 637 if memlimit < _2G - 1: 638 raise ValueError('Memory limit %r too low to be useful' % (limit,)) 639 max_memuse = memlimit 640 641def bigmemtest(minsize, memuse, overhead=5*_1M): 642 """Decorator for bigmem tests. 643 644 'minsize' is the minimum useful size for the test (in arbitrary, 645 test-interpreted units.) 'memuse' is the number of 'bytes per size' for 646 the test, or a good estimate of it. 'overhead' specifies fixed overhead, 647 independent of the testsize, and defaults to 5Mb. 648 649 The decorator tries to guess a good value for 'size' and passes it to 650 the decorated test function. If minsize * memuse is more than the 651 allowed memory use (as defined by max_memuse), the test is skipped. 652 Otherwise, minsize is adjusted upward to use up to max_memuse. 653 """ 654 def decorator(f): 655 def wrapper(self): 656 if not max_memuse: 657 # If max_memuse is 0 (the default), 658 # we still want to run the tests with size set to a few kb, 659 # to make sure they work. We still want to avoid using 660 # too much memory, though, but we do that noisily. 661 maxsize = 5147 662 self.failIf(maxsize * memuse + overhead > 20 * _1M) 663 else: 664 maxsize = int((max_memuse - overhead) / memuse) 665 if maxsize < minsize: 666 # Really ought to print 'test skipped' or something 667 if verbose: 668 sys.stderr.write("Skipping %s because of memory " 669 "constraint\n" % (f.__name__,)) 670 return 671 # Try to keep some breathing room in memory use 672 maxsize = max(maxsize - 50 * _1M, minsize) 673 return f(self, maxsize) 674 wrapper.minsize = minsize 675 wrapper.memuse = memuse 676 wrapper.overhead = overhead 677 return wrapper 678 return decorator 679 680def precisionbigmemtest(size, memuse, overhead=5*_1M): 681 def decorator(f): 682 def wrapper(self): 683 if not real_max_memuse: 684 maxsize = 5147 685 else: 686 maxsize = size 687 688 if real_max_memuse and real_max_memuse < maxsize * memuse: 689 if verbose: 690 sys.stderr.write("Skipping %s because of memory " 691 "constraint\n" % (f.__name__,)) 692 return 693 694 return f(self, maxsize) 695 wrapper.size = size 696 wrapper.memuse = memuse 697 wrapper.overhead = overhead 698 return wrapper 699 return decorator 700 701def bigaddrspacetest(f): 702 """Decorator for tests that fill the address space.""" 703 def wrapper(self): 704 if max_memuse < MAX_Py_ssize_t: 705 if verbose: 706 sys.stderr.write("Skipping %s because of memory " 707 "constraint\n" % (f.__name__,)) 708 else: 709 return f(self) 710 return wrapper 711 712#======================================================================= 713# unittest integration. 714 715class BasicTestRunner: 716 def run(self, test): 717 result = unittest.TestResult() 718 test(result) 719 return result 720 721 722def _run_suite(suite): 723 """Run tests from a unittest.TestSuite-derived class.""" 724 if verbose: 725 runner = unittest.TextTestRunner(sys.stdout, verbosity=2) 726 else: 727 runner = BasicTestRunner() 728 729 result = runner.run(suite) 730 if not result.wasSuccessful(): 731 if len(result.errors) == 1 and not result.failures: 732 err = result.errors[0][1] 733 elif len(result.failures) == 1 and not result.errors: 734 err = result.failures[0][1] 735 else: 736 err = "errors occurred; run in verbose mode for details" 737 raise TestFailed(err) 738 739 740def run_unittest(*classes): 741 """Run tests from unittest.TestCase-derived classes.""" 742 valid_types = (unittest.TestSuite, unittest.TestCase) 743 suite = unittest.TestSuite() 744 for cls in classes: 745 if isinstance(cls, str): 746 if cls in sys.modules: 747 suite.addTest(unittest.findTestCases(sys.modules[cls])) 748 else: 749 raise ValueError("str arguments must be keys in sys.modules") 750 elif isinstance(cls, valid_types): 751 suite.addTest(cls) 752 else: 753 suite.addTest(unittest.makeSuite(cls)) 754 _run_suite(suite) 755 756 757#======================================================================= 758# doctest driver. 759 760def run_doctest(module, verbosity=None): 761 """Run doctest on the given module. Return (#failures, #tests). 762 763 If optional argument verbosity is not specified (or is None), pass 764 test_support's belief about verbosity on to doctest. Else doctest's 765 usual behavior is used (it searches sys.argv for -v). 766 """ 767 768 import doctest 769 770 if verbosity is None: 771 verbosity = verbose 772 else: 773 verbosity = None 774 775 # Direct doctest output (normally just errors) to real stdout; doctest 776 # output shouldn't be compared by regrtest. 777 save_stdout = sys.stdout 778 sys.stdout = get_original_stdout() 779 try: 780 f, t = doctest.testmod(module, verbose=verbosity) 781 if f: 782 raise TestFailed("%d of %d doctests failed" % (f, t)) 783 finally: 784 sys.stdout = save_stdout 785 if verbose: 786 print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t) 787 return f, t 788 789#======================================================================= 790# Threading support to prevent reporting refleaks when running regrtest.py -R 791 792def threading_setup(): 793 import threading 794 return len(threading._active), len(threading._limbo) 795 796def threading_cleanup(num_active, num_limbo): 797 import threading 798 import time 799 800 _MAX_COUNT = 10 801 count = 0 802 while len(threading._active) != num_active and count < _MAX_COUNT: 803 count += 1 804 time.sleep(0.1) 805 806 count = 0 807 while len(threading._limbo) != num_limbo and count < _MAX_COUNT: 808 count += 1 809 time.sleep(0.1) 810 811def reap_children(): 812 """Use this function at the end of test_main() whenever sub-processes 813 are started. This will help ensure that no extra children (zombies) 814 stick around to hog resources and create problems when looking 815 for refleaks. 816 """ 817 818 # Reap all our dead child processes so we don't leave zombies around. 819 # These hog resources and might be causing some of the buildbots to die. 820 if hasattr(os, 'waitpid'): 821 any_process = -1 822 while True: 823 try: 824 # This will raise an exception on Windows. That's ok. 825 pid, status = os.waitpid(any_process, os.WNOHANG) 826 if pid == 0: 827 break 828 except: 829 break 830