test_support.py revision 9d4418242757f616ca7041409c7015e166b2c9f9
1"""Supporting definitions for the Python regression tests.""" 2 3if __name__ != 'test.test_support': 4 raise ImportError('test_support must be imported from the test package') 5 6import contextlib 7import errno 8import socket 9import sys 10import os 11import os.path 12import shutil 13import warnings 14import unittest 15 16class Error(Exception): 17 """Base class for regression test exceptions.""" 18 19class TestFailed(Error): 20 """Test failed.""" 21 22class TestSkipped(Error): 23 """Test skipped. 24 25 This can be raised to indicate that a test was deliberatly 26 skipped, but not because a feature wasn't available. For 27 example, if some resource can't be used, such as the network 28 appears to be unavailable, this should be raised instead of 29 TestFailed. 30 """ 31 32class ResourceDenied(TestSkipped): 33 """Test skipped because it requested a disallowed resource. 34 35 This is raised when a test calls requires() for a resource that 36 has not be enabled. It is used to distinguish between expected 37 and unexpected skips. 38 """ 39 40def import_module(name, deprecated=False): 41 """Import the module to be tested, raising TestSkipped if it is not 42 available.""" 43 with catch_warning(): 44 if deprecated: 45 warnings.filterwarnings("ignore", ".+ module", DeprecationWarning) 46 try: 47 module = __import__(name, level=0) 48 except ImportError: 49 raise TestSkipped("No module named " + name) 50 else: 51 return module 52 53verbose = 1 # Flag set to 0 by regrtest.py 54use_resources = None # Flag set to [] by regrtest.py 55max_memuse = 0 # Disable bigmem tests (they will still be run with 56 # small sizes, to make sure they work.) 57 58# _original_stdout is meant to hold stdout at the time regrtest began. 59# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. 60# The point is to have some flavor of stdout the user can actually see. 61_original_stdout = None 62def record_original_stdout(stdout): 63 global _original_stdout 64 _original_stdout = stdout 65 66def get_original_stdout(): 67 return _original_stdout or sys.stdout 68 69def unload(name): 70 try: 71 del sys.modules[name] 72 except KeyError: 73 pass 74 75def unlink(filename): 76 try: 77 os.unlink(filename) 78 except OSError: 79 pass 80 81def rmtree(path): 82 try: 83 shutil.rmtree(path) 84 except OSError, e: 85 # Unix returns ENOENT, Windows returns ESRCH. 86 if e.errno not in (errno.ENOENT, errno.ESRCH): 87 raise 88 89def forget(modname): 90 '''"Forget" a module was ever imported by removing it from sys.modules and 91 deleting any .pyc and .pyo files.''' 92 unload(modname) 93 for dirname in sys.path: 94 unlink(os.path.join(dirname, modname + os.extsep + 'pyc')) 95 # Deleting the .pyo file cannot be within the 'try' for the .pyc since 96 # the chance exists that there is no .pyc (and thus the 'try' statement 97 # is exited) but there is a .pyo file. 98 unlink(os.path.join(dirname, modname + os.extsep + 'pyo')) 99 100def is_resource_enabled(resource): 101 """Test whether a resource is enabled. Known resources are set by 102 regrtest.py.""" 103 return use_resources is not None and resource in use_resources 104 105def requires(resource, msg=None): 106 """Raise ResourceDenied if the specified resource is not available. 107 108 If the caller's module is __main__ then automatically return True. The 109 possibility of False being returned occurs when regrtest.py is executing.""" 110 # see if the caller's module is __main__ - if so, treat as if 111 # the resource was set 112 if sys._getframe().f_back.f_globals.get("__name__") == "__main__": 113 return 114 if not is_resource_enabled(resource): 115 if msg is None: 116 msg = "Use of the `%s' resource not enabled" % resource 117 raise ResourceDenied(msg) 118 119HOST = 'localhost' 120 121def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): 122 """Returns an unused port that should be suitable for binding. This is 123 achieved by creating a temporary socket with the same family and type as 124 the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to 125 the specified host address (defaults to 0.0.0.0) with the port set to 0, 126 eliciting an unused ephemeral port from the OS. The temporary socket is 127 then closed and deleted, and the ephemeral port is returned. 128 129 Either this method or bind_port() should be used for any tests where a 130 server socket needs to be bound to a particular port for the duration of 131 the test. Which one to use depends on whether the calling code is creating 132 a python socket, or if an unused port needs to be provided in a constructor 133 or passed to an external program (i.e. the -accept argument to openssl's 134 s_server mode). Always prefer bind_port() over find_unused_port() where 135 possible. Hard coded ports should *NEVER* be used. As soon as a server 136 socket is bound to a hard coded port, the ability to run multiple instances 137 of the test simultaneously on the same host is compromised, which makes the 138 test a ticking time bomb in a buildbot environment. On Unix buildbots, this 139 may simply manifest as a failed test, which can be recovered from without 140 intervention in most cases, but on Windows, the entire python process can 141 completely and utterly wedge, requiring someone to log in to the buildbot 142 and manually kill the affected process. 143 144 (This is easy to reproduce on Windows, unfortunately, and can be traced to 145 the SO_REUSEADDR socket option having different semantics on Windows versus 146 Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind, 147 listen and then accept connections on identical host/ports. An EADDRINUSE 148 socket.error will be raised at some point (depending on the platform and 149 the order bind and listen were called on each socket). 150 151 However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE 152 will ever be raised when attempting to bind two identical host/ports. When 153 accept() is called on each socket, the second caller's process will steal 154 the port from the first caller, leaving them both in an awkwardly wedged 155 state where they'll no longer respond to any signals or graceful kills, and 156 must be forcibly killed via OpenProcess()/TerminateProcess(). 157 158 The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option 159 instead of SO_REUSEADDR, which effectively affords the same semantics as 160 SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open 161 Source world compared to Windows ones, this is a common mistake. A quick 162 look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when 163 openssl.exe is called with the 's_server' option, for example. See 164 http://bugs.python.org/issue2550 for more info. The following site also 165 has a very thorough description about the implications of both REUSEADDR 166 and EXCLUSIVEADDRUSE on Windows: 167 http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx) 168 169 XXX: although this approach is a vast improvement on previous attempts to 170 elicit unused ports, it rests heavily on the assumption that the ephemeral 171 port returned to us by the OS won't immediately be dished back out to some 172 other process when we close and delete our temporary socket but before our 173 calling code has a chance to bind the returned port. We can deal with this 174 issue if/when we come across it.""" 175 tempsock = socket.socket(family, socktype) 176 port = bind_port(tempsock) 177 tempsock.close() 178 del tempsock 179 return port 180 181def bind_port(sock, host=HOST): 182 """Bind the socket to a free port and return the port number. Relies on 183 ephemeral ports in order to ensure we are using an unbound port. This is 184 important as many tests may be running simultaneously, especially in a 185 buildbot environment. This method raises an exception if the sock.family 186 is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR 187 or SO_REUSEPORT set on it. Tests should *never* set these socket options 188 for TCP/IP sockets. The only case for setting these options is testing 189 multicasting via multiple UDP sockets. 190 191 Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. 192 on Windows), it will be set on the socket. This will prevent anyone else 193 from bind()'ing to our host/port for the duration of the test. 194 """ 195 if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: 196 if hasattr(socket, 'SO_REUSEADDR'): 197 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: 198 raise TestFailed("tests should never set the SO_REUSEADDR " \ 199 "socket option on TCP/IP sockets!") 200 if hasattr(socket, 'SO_REUSEPORT'): 201 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: 202 raise TestFailed("tests should never set the SO_REUSEPORT " \ 203 "socket option on TCP/IP sockets!") 204 if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): 205 sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) 206 207 sock.bind((host, 0)) 208 port = sock.getsockname()[1] 209 return port 210 211FUZZ = 1e-6 212 213def fcmp(x, y): # fuzzy comparison function 214 if type(x) == type(0.0) or type(y) == type(0.0): 215 try: 216 x, y = coerce(x, y) 217 fuzz = (abs(x) + abs(y)) * FUZZ 218 if abs(x-y) <= fuzz: 219 return 0 220 except: 221 pass 222 elif type(x) == type(y) and type(x) in (type(()), type([])): 223 for i in range(min(len(x), len(y))): 224 outcome = fcmp(x[i], y[i]) 225 if outcome != 0: 226 return outcome 227 return cmp(len(x), len(y)) 228 return cmp(x, y) 229 230try: 231 unicode 232 have_unicode = True 233except NameError: 234 have_unicode = False 235 236is_jython = sys.platform.startswith('java') 237 238# Filename used for testing 239if os.name == 'java': 240 # Jython disallows @ in module names 241 TESTFN = '$test' 242elif os.name == 'riscos': 243 TESTFN = 'testfile' 244else: 245 TESTFN = '@test' 246 # Unicode name only used if TEST_FN_ENCODING exists for the platform. 247 if have_unicode: 248 # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding() 249 # TESTFN_UNICODE is a filename that can be encoded using the 250 # file system encoding, but *not* with the default (ascii) encoding 251 if isinstance('', unicode): 252 # python -U 253 # XXX perhaps unicode() should accept Unicode strings? 254 TESTFN_UNICODE = "@test-\xe0\xf2" 255 else: 256 # 2 latin characters. 257 TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1") 258 TESTFN_ENCODING = sys.getfilesystemencoding() 259 # TESTFN_UNICODE_UNENCODEABLE is a filename that should *not* be 260 # able to be encoded by *either* the default or filesystem encoding. 261 # This test really only makes sense on Windows NT platforms 262 # which have special Unicode support in posixmodule. 263 if (not hasattr(sys, "getwindowsversion") or 264 sys.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME 265 TESTFN_UNICODE_UNENCODEABLE = None 266 else: 267 # Japanese characters (I think - from bug 846133) 268 TESTFN_UNICODE_UNENCODEABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"') 269 try: 270 # XXX - Note - should be using TESTFN_ENCODING here - but for 271 # Windows, "mbcs" currently always operates as if in 272 # errors=ignore' mode - hence we get '?' characters rather than 273 # the exception. 'Latin1' operates as we expect - ie, fails. 274 # See [ 850997 ] mbcs encoding ignores errors 275 TESTFN_UNICODE_UNENCODEABLE.encode("Latin1") 276 except UnicodeEncodeError: 277 pass 278 else: 279 print \ 280 'WARNING: The filename %r CAN be encoded by the filesystem. ' \ 281 'Unicode filename tests may not be effective' \ 282 % TESTFN_UNICODE_UNENCODEABLE 283 284# Make sure we can write to TESTFN, try in /tmp if we can't 285fp = None 286try: 287 fp = open(TESTFN, 'w+') 288except IOError: 289 TMP_TESTFN = os.path.join('/tmp', TESTFN) 290 try: 291 fp = open(TMP_TESTFN, 'w+') 292 TESTFN = TMP_TESTFN 293 del TMP_TESTFN 294 except IOError: 295 print ('WARNING: tests will fail, unable to write to: %s or %s' % 296 (TESTFN, TMP_TESTFN)) 297if fp is not None: 298 fp.close() 299 unlink(TESTFN) 300del fp 301 302def findfile(file, here=__file__): 303 """Try to find a file on sys.path and the working directory. If it is not 304 found the argument passed to the function is returned (this does not 305 necessarily signal failure; could still be the legitimate path).""" 306 if os.path.isabs(file): 307 return file 308 path = sys.path 309 path = [os.path.dirname(here)] + path 310 for dn in path: 311 fn = os.path.join(dn, file) 312 if os.path.exists(fn): return fn 313 return file 314 315def verify(condition, reason='test failed'): 316 """Verify that condition is true. If not, raise TestFailed. 317 318 The optional argument reason can be given to provide 319 a better error text. 320 """ 321 322 if not condition: 323 raise TestFailed(reason) 324 325def vereq(a, b): 326 """Raise TestFailed if a == b is false. 327 328 This is better than verify(a == b) because, in case of failure, the 329 error message incorporates repr(a) and repr(b) so you can see the 330 inputs. 331 332 Note that "not (a == b)" isn't necessarily the same as "a != b"; the 333 former is tested. 334 """ 335 336 if not (a == b): 337 raise TestFailed("%r == %r" % (a, b)) 338 339def sortdict(dict): 340 "Like repr(dict), but in sorted order." 341 items = dict.items() 342 items.sort() 343 reprpairs = ["%r: %r" % pair for pair in items] 344 withcommas = ", ".join(reprpairs) 345 return "{%s}" % withcommas 346 347def check_syntax_error(testcase, statement): 348 try: 349 compile(statement, '<test string>', 'exec') 350 except SyntaxError: 351 pass 352 else: 353 testcase.fail('Missing SyntaxError: "%s"' % statement) 354 355def open_urlresource(url): 356 import urllib, urlparse 357 358 requires('urlfetch') 359 filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL! 360 361 for path in [os.path.curdir, os.path.pardir]: 362 fn = os.path.join(path, filename) 363 if os.path.exists(fn): 364 return open(fn) 365 366 print >> get_original_stdout(), '\tfetching %s ...' % url 367 fn, _ = urllib.urlretrieve(url, filename) 368 return open(fn) 369 370 371class WarningMessage(object): 372 "Holds the result of the latest showwarning() call" 373 def __init__(self): 374 self.message = None 375 self.category = None 376 self.filename = None 377 self.lineno = None 378 379 def _showwarning(self, message, category, filename, lineno, file=None, 380 line=None): 381 self.message = message 382 self.category = category 383 self.filename = filename 384 self.lineno = lineno 385 self.line = line 386 387 def reset(self): 388 self._showwarning(*((None,)*6)) 389 390 def __str__(self): 391 return ("{message : %r, category : %r, filename : %r, lineno : %s, " 392 "line : %r}" % (self.message, 393 self.category.__name__ if self.category else None, 394 self.filename, self.lineno, self.line)) 395 396 397@contextlib.contextmanager 398def catch_warning(module=warnings): 399 """ 400 Guard the warnings filter from being permanently changed and record the 401 data of the last warning that has been issued. 402 403 Use like this: 404 405 with catch_warning() as w: 406 warnings.warn("foo") 407 assert str(w.message) == "foo" 408 """ 409 warning_obj = WarningMessage() 410 original_filters = module.filters[:] 411 original_showwarning = module.showwarning 412 module.showwarning = warning_obj._showwarning 413 try: 414 yield warning_obj 415 finally: 416 module.showwarning = original_showwarning 417 module.filters = original_filters 418 419class EnvironmentVarGuard(object): 420 421 """Class to help protect the environment variable properly. Can be used as 422 a context manager.""" 423 424 def __init__(self): 425 self._environ = os.environ 426 self._unset = set() 427 self._reset = dict() 428 429 def set(self, envvar, value): 430 if envvar not in self._environ: 431 self._unset.add(envvar) 432 else: 433 self._reset[envvar] = self._environ[envvar] 434 self._environ[envvar] = value 435 436 def unset(self, envvar): 437 if envvar in self._environ: 438 self._reset[envvar] = self._environ[envvar] 439 del self._environ[envvar] 440 441 def __enter__(self): 442 return self 443 444 def __exit__(self, *ignore_exc): 445 for envvar, value in self._reset.iteritems(): 446 self._environ[envvar] = value 447 for unset in self._unset: 448 del self._environ[unset] 449 450class TransientResource(object): 451 452 """Raise ResourceDenied if an exception is raised while the context manager 453 is in effect that matches the specified exception and attributes.""" 454 455 def __init__(self, exc, **kwargs): 456 self.exc = exc 457 self.attrs = kwargs 458 459 def __enter__(self): 460 return self 461 462 def __exit__(self, type_=None, value=None, traceback=None): 463 """If type_ is a subclass of self.exc and value has attributes matching 464 self.attrs, raise ResourceDenied. Otherwise let the exception 465 propagate (if any).""" 466 if type_ is not None and issubclass(self.exc, type_): 467 for attr, attr_value in self.attrs.iteritems(): 468 if not hasattr(value, attr): 469 break 470 if getattr(value, attr) != attr_value: 471 break 472 else: 473 raise ResourceDenied("an optional resource is not available") 474 475 476def transient_internet(): 477 """Return a context manager that raises ResourceDenied when various issues 478 with the Internet connection manifest themselves as exceptions.""" 479 time_out = TransientResource(IOError, errno=errno.ETIMEDOUT) 480 socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET) 481 ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET) 482 return contextlib.nested(time_out, socket_peer_reset, ioerror_peer_reset) 483 484 485@contextlib.contextmanager 486def captured_output(stream_name): 487 """Run the 'with' statement body using a StringIO object in place of a 488 specific attribute on the sys module. 489 Example use (with 'stream_name=stdout'):: 490 491 with captured_stdout() as s: 492 print "hello" 493 assert s.getvalue() == "hello" 494 """ 495 import StringIO 496 orig_stdout = getattr(sys, stream_name) 497 setattr(sys, stream_name, StringIO.StringIO()) 498 try: 499 yield getattr(sys, stream_name) 500 finally: 501 setattr(sys, stream_name, orig_stdout) 502 503def captured_stdout(): 504 return captured_output("stdout") 505 506 507#======================================================================= 508# Decorator for running a function in a different locale, correctly resetting 509# it afterwards. 510 511def run_with_locale(catstr, *locales): 512 def decorator(func): 513 def inner(*args, **kwds): 514 try: 515 import locale 516 category = getattr(locale, catstr) 517 orig_locale = locale.setlocale(category) 518 except AttributeError: 519 # if the test author gives us an invalid category string 520 raise 521 except: 522 # cannot retrieve original locale, so do nothing 523 locale = orig_locale = None 524 else: 525 for loc in locales: 526 try: 527 locale.setlocale(category, loc) 528 break 529 except: 530 pass 531 532 # now run the function, resetting the locale on exceptions 533 try: 534 return func(*args, **kwds) 535 finally: 536 if locale and orig_locale: 537 locale.setlocale(category, orig_locale) 538 inner.func_name = func.func_name 539 inner.__doc__ = func.__doc__ 540 return inner 541 return decorator 542 543#======================================================================= 544# Big-memory-test support. Separate from 'resources' because memory use should be configurable. 545 546# Some handy shorthands. Note that these are used for byte-limits as well 547# as size-limits, in the various bigmem tests 548_1M = 1024*1024 549_1G = 1024 * _1M 550_2G = 2 * _1G 551 552# Hack to get at the maximum value an internal index can take. 553class _Dummy: 554 def __getslice__(self, i, j): 555 return j 556MAX_Py_ssize_t = _Dummy()[:] 557 558def set_memlimit(limit): 559 import re 560 global max_memuse 561 sizes = { 562 'k': 1024, 563 'm': _1M, 564 'g': _1G, 565 't': 1024*_1G, 566 } 567 m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, 568 re.IGNORECASE | re.VERBOSE) 569 if m is None: 570 raise ValueError('Invalid memory limit %r' % (limit,)) 571 memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) 572 if memlimit > MAX_Py_ssize_t: 573 memlimit = MAX_Py_ssize_t 574 if memlimit < _2G - 1: 575 raise ValueError('Memory limit %r too low to be useful' % (limit,)) 576 max_memuse = memlimit 577 578def bigmemtest(minsize, memuse, overhead=5*_1M): 579 """Decorator for bigmem tests. 580 581 'minsize' is the minimum useful size for the test (in arbitrary, 582 test-interpreted units.) 'memuse' is the number of 'bytes per size' for 583 the test, or a good estimate of it. 'overhead' specifies fixed overhead, 584 independent of the testsize, and defaults to 5Mb. 585 586 The decorator tries to guess a good value for 'size' and passes it to 587 the decorated test function. If minsize * memuse is more than the 588 allowed memory use (as defined by max_memuse), the test is skipped. 589 Otherwise, minsize is adjusted upward to use up to max_memuse. 590 """ 591 def decorator(f): 592 def wrapper(self): 593 if not max_memuse: 594 # If max_memuse is 0 (the default), 595 # we still want to run the tests with size set to a few kb, 596 # to make sure they work. We still want to avoid using 597 # too much memory, though, but we do that noisily. 598 maxsize = 5147 599 self.failIf(maxsize * memuse + overhead > 20 * _1M) 600 else: 601 maxsize = int((max_memuse - overhead) / memuse) 602 if maxsize < minsize: 603 # Really ought to print 'test skipped' or something 604 if verbose: 605 sys.stderr.write("Skipping %s because of memory " 606 "constraint\n" % (f.__name__,)) 607 return 608 # Try to keep some breathing room in memory use 609 maxsize = max(maxsize - 50 * _1M, minsize) 610 return f(self, maxsize) 611 wrapper.minsize = minsize 612 wrapper.memuse = memuse 613 wrapper.overhead = overhead 614 return wrapper 615 return decorator 616 617def bigaddrspacetest(f): 618 """Decorator for tests that fill the address space.""" 619 def wrapper(self): 620 if max_memuse < MAX_Py_ssize_t: 621 if verbose: 622 sys.stderr.write("Skipping %s because of memory " 623 "constraint\n" % (f.__name__,)) 624 else: 625 return f(self) 626 return wrapper 627 628#======================================================================= 629# unittest integration. 630 631class BasicTestRunner: 632 def run(self, test): 633 result = unittest.TestResult() 634 test(result) 635 return result 636 637 638def _run_suite(suite): 639 """Run tests from a unittest.TestSuite-derived class.""" 640 if verbose: 641 runner = unittest.TextTestRunner(sys.stdout, verbosity=2) 642 else: 643 runner = BasicTestRunner() 644 645 result = runner.run(suite) 646 if not result.wasSuccessful(): 647 if len(result.errors) == 1 and not result.failures: 648 err = result.errors[0][1] 649 elif len(result.failures) == 1 and not result.errors: 650 err = result.failures[0][1] 651 else: 652 err = "errors occurred; run in verbose mode for details" 653 raise TestFailed(err) 654 655 656def run_unittest(*classes): 657 """Run tests from unittest.TestCase-derived classes.""" 658 valid_types = (unittest.TestSuite, unittest.TestCase) 659 suite = unittest.TestSuite() 660 for cls in classes: 661 if isinstance(cls, str): 662 if cls in sys.modules: 663 suite.addTest(unittest.findTestCases(sys.modules[cls])) 664 else: 665 raise ValueError("str arguments must be keys in sys.modules") 666 elif isinstance(cls, valid_types): 667 suite.addTest(cls) 668 else: 669 suite.addTest(unittest.makeSuite(cls)) 670 _run_suite(suite) 671 672 673#======================================================================= 674# doctest driver. 675 676def run_doctest(module, verbosity=None): 677 """Run doctest on the given module. Return (#failures, #tests). 678 679 If optional argument verbosity is not specified (or is None), pass 680 test_support's belief about verbosity on to doctest. Else doctest's 681 usual behavior is used (it searches sys.argv for -v). 682 """ 683 684 import doctest 685 686 if verbosity is None: 687 verbosity = verbose 688 else: 689 verbosity = None 690 691 # Direct doctest output (normally just errors) to real stdout; doctest 692 # output shouldn't be compared by regrtest. 693 save_stdout = sys.stdout 694 sys.stdout = get_original_stdout() 695 try: 696 f, t = doctest.testmod(module, verbose=verbosity) 697 if f: 698 raise TestFailed("%d of %d doctests failed" % (f, t)) 699 finally: 700 sys.stdout = save_stdout 701 if verbose: 702 print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t) 703 return f, t 704 705#======================================================================= 706# Threading support to prevent reporting refleaks when running regrtest.py -R 707 708def threading_setup(): 709 import threading 710 return len(threading._active), len(threading._limbo) 711 712def threading_cleanup(num_active, num_limbo): 713 import threading 714 import time 715 716 _MAX_COUNT = 10 717 count = 0 718 while len(threading._active) != num_active and count < _MAX_COUNT: 719 count += 1 720 time.sleep(0.1) 721 722 count = 0 723 while len(threading._limbo) != num_limbo and count < _MAX_COUNT: 724 count += 1 725 time.sleep(0.1) 726 727def reap_children(): 728 """Use this function at the end of test_main() whenever sub-processes 729 are started. This will help ensure that no extra children (zombies) 730 stick around to hog resources and create problems when looking 731 for refleaks. 732 """ 733 734 # Reap all our dead child processes so we don't leave zombies around. 735 # These hog resources and might be causing some of the buildbots to die. 736 if hasattr(os, 'waitpid'): 737 any_process = -1 738 while True: 739 try: 740 # This will raise an exception on Windows. That's ok. 741 pid, status = os.waitpid(any_process, os.WNOHANG) 742 if pid == 0: 743 break 744 except: 745 break 746