test_support.py revision d8f2d0bdb3d3b9abe53cda9add979e2fc4be7da0
1"""Supporting definitions for the Python regression tests.""" 2 3if __name__ != 'test.test_support': 4 raise ImportError('test_support must be imported from the test package') 5 6import contextlib 7import errno 8import socket 9import sys 10import os 11import os.path 12import shutil 13import warnings 14import unittest 15 16class Error(Exception): 17 """Base class for regression test exceptions.""" 18 19class TestFailed(Error): 20 """Test failed.""" 21 22class TestSkipped(Error): 23 """Test skipped. 24 25 This can be raised to indicate that a test was deliberatly 26 skipped, but not because a feature wasn't available. For 27 example, if some resource can't be used, such as the network 28 appears to be unavailable, this should be raised instead of 29 TestFailed. 30 """ 31 32class ResourceDenied(TestSkipped): 33 """Test skipped because it requested a disallowed resource. 34 35 This is raised when a test calls requires() for a resource that 36 has not be enabled. It is used to distinguish between expected 37 and unexpected skips. 38 """ 39 40verbose = 1 # Flag set to 0 by regrtest.py 41use_resources = None # Flag set to [] by regrtest.py 42max_memuse = 0 # Disable bigmem tests (they will still be run with 43 # small sizes, to make sure they work.) 44 45# _original_stdout is meant to hold stdout at the time regrtest began. 46# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. 47# The point is to have some flavor of stdout the user can actually see. 48_original_stdout = None 49def record_original_stdout(stdout): 50 global _original_stdout 51 _original_stdout = stdout 52 53def get_original_stdout(): 54 return _original_stdout or sys.stdout 55 56def unload(name): 57 try: 58 del sys.modules[name] 59 except KeyError: 60 pass 61 62def unlink(filename): 63 try: 64 os.unlink(filename) 65 except OSError: 66 pass 67 68def rmtree(path): 69 try: 70 shutil.rmtree(path) 71 except OSError, e: 72 # Unix returns ENOENT, Windows returns ESRCH. 73 if e.errno not in (errno.ENOENT, errno.ESRCH): 74 raise 75 76def forget(modname): 77 '''"Forget" a module was ever imported by removing it from sys.modules and 78 deleting any .pyc and .pyo files.''' 79 unload(modname) 80 for dirname in sys.path: 81 unlink(os.path.join(dirname, modname + os.extsep + 'pyc')) 82 # Deleting the .pyo file cannot be within the 'try' for the .pyc since 83 # the chance exists that there is no .pyc (and thus the 'try' statement 84 # is exited) but there is a .pyo file. 85 unlink(os.path.join(dirname, modname + os.extsep + 'pyo')) 86 87def is_resource_enabled(resource): 88 """Test whether a resource is enabled. Known resources are set by 89 regrtest.py.""" 90 return use_resources is not None and resource in use_resources 91 92def requires(resource, msg=None): 93 """Raise ResourceDenied if the specified resource is not available. 94 95 If the caller's module is __main__ then automatically return True. The 96 possibility of False being returned occurs when regrtest.py is executing.""" 97 # see if the caller's module is __main__ - if so, treat as if 98 # the resource was set 99 if sys._getframe().f_back.f_globals.get("__name__") == "__main__": 100 return 101 if not is_resource_enabled(resource): 102 if msg is None: 103 msg = "Use of the `%s' resource not enabled" % resource 104 raise ResourceDenied(msg) 105 106HOST = 'localhost' 107 108def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): 109 """Returns an unused port that should be suitable for binding. This is 110 achieved by creating a temporary socket with the same family and type as 111 the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to 112 the specified host address (defaults to 0.0.0.0) with the port set to 0, 113 eliciting an unused ephemeral port from the OS. The temporary socket is 114 then closed and deleted, and the ephemeral port is returned. 115 116 Either this method or bind_port() should be used for any tests where a 117 server socket needs to be bound to a particular port for the duration of 118 the test. Which one to use depends on whether the calling code is creating 119 a python socket, or if an unused port needs to be provided in a constructor 120 or passed to an external program (i.e. the -accept argument to openssl's 121 s_server mode). Always prefer bind_port() over find_unused_port() where 122 possible. Hard coded ports should *NEVER* be used. As soon as a server 123 socket is bound to a hard coded port, the ability to run multiple instances 124 of the test simultaneously on the same host is compromised, which makes the 125 test a ticking time bomb in a buildbot environment. On Unix buildbots, this 126 may simply manifest as a failed test, which can be recovered from without 127 intervention in most cases, but on Windows, the entire python process can 128 completely and utterly wedge, requiring someone to log in to the buildbot 129 and manually kill the affected process. 130 131 (This is easy to reproduce on Windows, unfortunately, and can be traced to 132 the SO_REUSEADDR socket option having different semantics on Windows versus 133 Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind, 134 listen and then accept connections on identical host/ports. An EADDRINUSE 135 socket.error will be raised at some point (depending on the platform and 136 the order bind and listen were called on each socket). 137 138 However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE 139 will ever be raised when attempting to bind two identical host/ports. When 140 accept() is called on each socket, the second caller's process will steal 141 the port from the first caller, leaving them both in an awkwardly wedged 142 state where they'll no longer respond to any signals or graceful kills, and 143 must be forcibly killed via OpenProcess()/TerminateProcess(). 144 145 The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option 146 instead of SO_REUSEADDR, which effectively affords the same semantics as 147 SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open 148 Source world compared to Windows ones, this is a common mistake. A quick 149 look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when 150 openssl.exe is called with the 's_server' option, for example. See 151 http://bugs.python.org/issue2550 for more info. The following site also 152 has a very thorough description about the implications of both REUSEADDR 153 and EXCLUSIVEADDRUSE on Windows: 154 http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx) 155 156 XXX: although this approach is a vast improvement on previous attempts to 157 elicit unused ports, it rests heavily on the assumption that the ephemeral 158 port returned to us by the OS won't immediately be dished back out to some 159 other process when we close and delete our temporary socket but before our 160 calling code has a chance to bind the returned port. We can deal with this 161 issue if/when we come across it.""" 162 tempsock = socket.socket(family, socktype) 163 port = bind_port(tempsock) 164 tempsock.close() 165 del tempsock 166 return port 167 168def bind_port(sock, host=HOST): 169 """Bind the socket to a free port and return the port number. Relies on 170 ephemeral ports in order to ensure we are using an unbound port. This is 171 important as many tests may be running simultaneously, especially in a 172 buildbot environment. This method raises an exception if the sock.family 173 is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR 174 or SO_REUSEPORT set on it. Tests should *never* set these socket options 175 for TCP/IP sockets. The only case for setting these options is testing 176 multicasting via multiple UDP sockets. 177 178 Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. 179 on Windows), it will be set on the socket. This will prevent anyone else 180 from bind()'ing to our host/port for the duration of the test. 181 """ 182 if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: 183 if hasattr(socket, 'SO_REUSEADDR'): 184 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: 185 raise TestFailed("tests should never set the SO_REUSEADDR " \ 186 "socket option on TCP/IP sockets!") 187 if hasattr(socket, 'SO_REUSEPORT'): 188 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: 189 raise TestFailed("tests should never set the SO_REUSEPORT " \ 190 "socket option on TCP/IP sockets!") 191 if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): 192 sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) 193 194 sock.bind((host, 0)) 195 port = sock.getsockname()[1] 196 return port 197 198FUZZ = 1e-6 199 200def fcmp(x, y): # fuzzy comparison function 201 if type(x) == type(0.0) or type(y) == type(0.0): 202 try: 203 x, y = coerce(x, y) 204 fuzz = (abs(x) + abs(y)) * FUZZ 205 if abs(x-y) <= fuzz: 206 return 0 207 except: 208 pass 209 elif type(x) == type(y) and type(x) in (type(()), type([])): 210 for i in range(min(len(x), len(y))): 211 outcome = fcmp(x[i], y[i]) 212 if outcome != 0: 213 return outcome 214 return cmp(len(x), len(y)) 215 return cmp(x, y) 216 217try: 218 unicode 219 have_unicode = True 220except NameError: 221 have_unicode = False 222 223is_jython = sys.platform.startswith('java') 224 225# Filename used for testing 226if os.name == 'java': 227 # Jython disallows @ in module names 228 TESTFN = '$test' 229elif os.name == 'riscos': 230 TESTFN = 'testfile' 231else: 232 TESTFN = '@test' 233 # Unicode name only used if TEST_FN_ENCODING exists for the platform. 234 if have_unicode: 235 # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding() 236 # TESTFN_UNICODE is a filename that can be encoded using the 237 # file system encoding, but *not* with the default (ascii) encoding 238 if isinstance('', unicode): 239 # python -U 240 # XXX perhaps unicode() should accept Unicode strings? 241 TESTFN_UNICODE = "@test-\xe0\xf2" 242 else: 243 # 2 latin characters. 244 TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1") 245 TESTFN_ENCODING = sys.getfilesystemencoding() 246 # TESTFN_UNICODE_UNENCODEABLE is a filename that should *not* be 247 # able to be encoded by *either* the default or filesystem encoding. 248 # This test really only makes sense on Windows NT platforms 249 # which have special Unicode support in posixmodule. 250 if (not hasattr(sys, "getwindowsversion") or 251 sys.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME 252 TESTFN_UNICODE_UNENCODEABLE = None 253 else: 254 # Japanese characters (I think - from bug 846133) 255 TESTFN_UNICODE_UNENCODEABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"') 256 try: 257 # XXX - Note - should be using TESTFN_ENCODING here - but for 258 # Windows, "mbcs" currently always operates as if in 259 # errors=ignore' mode - hence we get '?' characters rather than 260 # the exception. 'Latin1' operates as we expect - ie, fails. 261 # See [ 850997 ] mbcs encoding ignores errors 262 TESTFN_UNICODE_UNENCODEABLE.encode("Latin1") 263 except UnicodeEncodeError: 264 pass 265 else: 266 print \ 267 'WARNING: The filename %r CAN be encoded by the filesystem. ' \ 268 'Unicode filename tests may not be effective' \ 269 % TESTFN_UNICODE_UNENCODEABLE 270 271# Make sure we can write to TESTFN, try in /tmp if we can't 272fp = None 273try: 274 fp = open(TESTFN, 'w+') 275except IOError: 276 TMP_TESTFN = os.path.join('/tmp', TESTFN) 277 try: 278 fp = open(TMP_TESTFN, 'w+') 279 TESTFN = TMP_TESTFN 280 del TMP_TESTFN 281 except IOError: 282 print ('WARNING: tests will fail, unable to write to: %s or %s' % 283 (TESTFN, TMP_TESTFN)) 284if fp is not None: 285 fp.close() 286 unlink(TESTFN) 287del fp 288 289def findfile(file, here=__file__): 290 """Try to find a file on sys.path and the working directory. If it is not 291 found the argument passed to the function is returned (this does not 292 necessarily signal failure; could still be the legitimate path).""" 293 if os.path.isabs(file): 294 return file 295 path = sys.path 296 path = [os.path.dirname(here)] + path 297 for dn in path: 298 fn = os.path.join(dn, file) 299 if os.path.exists(fn): return fn 300 return file 301 302def verify(condition, reason='test failed'): 303 """Verify that condition is true. If not, raise TestFailed. 304 305 The optional argument reason can be given to provide 306 a better error text. 307 """ 308 309 if not condition: 310 raise TestFailed(reason) 311 312def vereq(a, b): 313 """Raise TestFailed if a == b is false. 314 315 This is better than verify(a == b) because, in case of failure, the 316 error message incorporates repr(a) and repr(b) so you can see the 317 inputs. 318 319 Note that "not (a == b)" isn't necessarily the same as "a != b"; the 320 former is tested. 321 """ 322 323 if not (a == b): 324 raise TestFailed("%r == %r" % (a, b)) 325 326def sortdict(dict): 327 "Like repr(dict), but in sorted order." 328 items = dict.items() 329 items.sort() 330 reprpairs = ["%r: %r" % pair for pair in items] 331 withcommas = ", ".join(reprpairs) 332 return "{%s}" % withcommas 333 334def check_syntax_error(testcase, statement): 335 try: 336 compile(statement, '<test string>', 'exec') 337 except SyntaxError: 338 pass 339 else: 340 testcase.fail('Missing SyntaxError: "%s"' % statement) 341 342def open_urlresource(url): 343 import urllib, urlparse 344 345 requires('urlfetch') 346 filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL! 347 348 for path in [os.path.curdir, os.path.pardir]: 349 fn = os.path.join(path, filename) 350 if os.path.exists(fn): 351 return open(fn) 352 353 print >> get_original_stdout(), '\tfetching %s ...' % url 354 fn, _ = urllib.urlretrieve(url, filename) 355 return open(fn) 356 357 358class WarningMessage(object): 359 "Holds the result of the latest showwarning() call" 360 def __init__(self): 361 self.message = None 362 self.category = None 363 self.filename = None 364 self.lineno = None 365 366 def _showwarning(self, message, category, filename, lineno, file=None, 367 line=None): 368 self.message = message 369 self.category = category 370 self.filename = filename 371 self.lineno = lineno 372 self.line = line 373 374 def reset(self): 375 self._showwarning(*((None,)*6)) 376 377 def __str__(self): 378 return ("{message : %r, category : %r, filename : %r, lineno : %s, " 379 "line : %r}" % (self.message, 380 self.category.__name__ if self.category else None, 381 self.filename, self.lineno, self.line)) 382 383 384@contextlib.contextmanager 385def catch_warning(module=warnings): 386 """ 387 Guard the warnings filter from being permanently changed and record the 388 data of the last warning that has been issued. 389 390 Use like this: 391 392 with catch_warning() as w: 393 warnings.warn("foo") 394 assert str(w.message) == "foo" 395 """ 396 warning_obj = WarningMessage() 397 original_filters = module.filters[:] 398 original_showwarning = module.showwarning 399 module.showwarning = warning_obj._showwarning 400 try: 401 yield warning_obj 402 finally: 403 module.showwarning = original_showwarning 404 module.filters = original_filters 405 406class EnvironmentVarGuard(object): 407 408 """Class to help protect the environment variable properly. Can be used as 409 a context manager.""" 410 411 def __init__(self): 412 self._environ = os.environ 413 self._unset = set() 414 self._reset = dict() 415 416 def set(self, envvar, value): 417 if envvar not in self._environ: 418 self._unset.add(envvar) 419 else: 420 self._reset[envvar] = self._environ[envvar] 421 self._environ[envvar] = value 422 423 def unset(self, envvar): 424 if envvar in self._environ: 425 self._reset[envvar] = self._environ[envvar] 426 del self._environ[envvar] 427 428 def __enter__(self): 429 return self 430 431 def __exit__(self, *ignore_exc): 432 for envvar, value in self._reset.iteritems(): 433 self._environ[envvar] = value 434 for unset in self._unset: 435 del self._environ[unset] 436 437class TransientResource(object): 438 439 """Raise ResourceDenied if an exception is raised while the context manager 440 is in effect that matches the specified exception and attributes.""" 441 442 def __init__(self, exc, **kwargs): 443 self.exc = exc 444 self.attrs = kwargs 445 446 def __enter__(self): 447 return self 448 449 def __exit__(self, type_=None, value=None, traceback=None): 450 """If type_ is a subclass of self.exc and value has attributes matching 451 self.attrs, raise ResourceDenied. Otherwise let the exception 452 propagate (if any).""" 453 if type_ is not None and issubclass(self.exc, type_): 454 for attr, attr_value in self.attrs.iteritems(): 455 if not hasattr(value, attr): 456 break 457 if getattr(value, attr) != attr_value: 458 break 459 else: 460 raise ResourceDenied("an optional resource is not available") 461 462 463def transient_internet(): 464 """Return a context manager that raises ResourceDenied when various issues 465 with the Internet connection manifest themselves as exceptions.""" 466 time_out = TransientResource(IOError, errno=errno.ETIMEDOUT) 467 socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET) 468 ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET) 469 return contextlib.nested(time_out, socket_peer_reset, ioerror_peer_reset) 470 471 472@contextlib.contextmanager 473def captured_output(stream_name): 474 """Run the 'with' statement body using a StringIO object in place of a 475 specific attribute on the sys module. 476 Example use (with 'stream_name=stdout'):: 477 478 with captured_stdout() as s: 479 print "hello" 480 assert s.getvalue() == "hello" 481 """ 482 import StringIO 483 orig_stdout = getattr(sys, stream_name) 484 setattr(sys, stream_name, StringIO.StringIO()) 485 try: 486 yield getattr(sys, stream_name) 487 finally: 488 setattr(sys, stream_name, orig_stdout) 489 490def captured_stdout(): 491 return captured_output("stdout") 492 493 494#======================================================================= 495# Decorator for running a function in a different locale, correctly resetting 496# it afterwards. 497 498def run_with_locale(catstr, *locales): 499 def decorator(func): 500 def inner(*args, **kwds): 501 try: 502 import locale 503 category = getattr(locale, catstr) 504 orig_locale = locale.setlocale(category) 505 except AttributeError: 506 # if the test author gives us an invalid category string 507 raise 508 except: 509 # cannot retrieve original locale, so do nothing 510 locale = orig_locale = None 511 else: 512 for loc in locales: 513 try: 514 locale.setlocale(category, loc) 515 break 516 except: 517 pass 518 519 # now run the function, resetting the locale on exceptions 520 try: 521 return func(*args, **kwds) 522 finally: 523 if locale and orig_locale: 524 locale.setlocale(category, orig_locale) 525 inner.func_name = func.func_name 526 inner.__doc__ = func.__doc__ 527 return inner 528 return decorator 529 530#======================================================================= 531# Big-memory-test support. Separate from 'resources' because memory use should be configurable. 532 533# Some handy shorthands. Note that these are used for byte-limits as well 534# as size-limits, in the various bigmem tests 535_1M = 1024*1024 536_1G = 1024 * _1M 537_2G = 2 * _1G 538 539# Hack to get at the maximum value an internal index can take. 540class _Dummy: 541 def __getslice__(self, i, j): 542 return j 543MAX_Py_ssize_t = _Dummy()[:] 544 545def set_memlimit(limit): 546 import re 547 global max_memuse 548 sizes = { 549 'k': 1024, 550 'm': _1M, 551 'g': _1G, 552 't': 1024*_1G, 553 } 554 m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, 555 re.IGNORECASE | re.VERBOSE) 556 if m is None: 557 raise ValueError('Invalid memory limit %r' % (limit,)) 558 memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) 559 if memlimit > MAX_Py_ssize_t: 560 memlimit = MAX_Py_ssize_t 561 if memlimit < _2G - 1: 562 raise ValueError('Memory limit %r too low to be useful' % (limit,)) 563 max_memuse = memlimit 564 565def bigmemtest(minsize, memuse, overhead=5*_1M): 566 """Decorator for bigmem tests. 567 568 'minsize' is the minimum useful size for the test (in arbitrary, 569 test-interpreted units.) 'memuse' is the number of 'bytes per size' for 570 the test, or a good estimate of it. 'overhead' specifies fixed overhead, 571 independent of the testsize, and defaults to 5Mb. 572 573 The decorator tries to guess a good value for 'size' and passes it to 574 the decorated test function. If minsize * memuse is more than the 575 allowed memory use (as defined by max_memuse), the test is skipped. 576 Otherwise, minsize is adjusted upward to use up to max_memuse. 577 """ 578 def decorator(f): 579 def wrapper(self): 580 if not max_memuse: 581 # If max_memuse is 0 (the default), 582 # we still want to run the tests with size set to a few kb, 583 # to make sure they work. We still want to avoid using 584 # too much memory, though, but we do that noisily. 585 maxsize = 5147 586 self.failIf(maxsize * memuse + overhead > 20 * _1M) 587 else: 588 maxsize = int((max_memuse - overhead) / memuse) 589 if maxsize < minsize: 590 # Really ought to print 'test skipped' or something 591 if verbose: 592 sys.stderr.write("Skipping %s because of memory " 593 "constraint\n" % (f.__name__,)) 594 return 595 # Try to keep some breathing room in memory use 596 maxsize = max(maxsize - 50 * _1M, minsize) 597 return f(self, maxsize) 598 wrapper.minsize = minsize 599 wrapper.memuse = memuse 600 wrapper.overhead = overhead 601 return wrapper 602 return decorator 603 604def bigaddrspacetest(f): 605 """Decorator for tests that fill the address space.""" 606 def wrapper(self): 607 if max_memuse < MAX_Py_ssize_t: 608 if verbose: 609 sys.stderr.write("Skipping %s because of memory " 610 "constraint\n" % (f.__name__,)) 611 else: 612 return f(self) 613 return wrapper 614 615#======================================================================= 616# unittest integration. 617 618class BasicTestRunner: 619 def run(self, test): 620 result = unittest.TestResult() 621 test(result) 622 return result 623 624 625def _run_suite(suite): 626 """Run tests from a unittest.TestSuite-derived class.""" 627 if verbose: 628 runner = unittest.TextTestRunner(sys.stdout, verbosity=2) 629 else: 630 runner = BasicTestRunner() 631 632 result = runner.run(suite) 633 if not result.wasSuccessful(): 634 if len(result.errors) == 1 and not result.failures: 635 err = result.errors[0][1] 636 elif len(result.failures) == 1 and not result.errors: 637 err = result.failures[0][1] 638 else: 639 err = "errors occurred; run in verbose mode for details" 640 raise TestFailed(err) 641 642 643def run_unittest(*classes): 644 """Run tests from unittest.TestCase-derived classes.""" 645 valid_types = (unittest.TestSuite, unittest.TestCase) 646 suite = unittest.TestSuite() 647 for cls in classes: 648 if isinstance(cls, str): 649 if cls in sys.modules: 650 suite.addTest(unittest.findTestCases(sys.modules[cls])) 651 else: 652 raise ValueError("str arguments must be keys in sys.modules") 653 elif isinstance(cls, valid_types): 654 suite.addTest(cls) 655 else: 656 suite.addTest(unittest.makeSuite(cls)) 657 _run_suite(suite) 658 659 660#======================================================================= 661# doctest driver. 662 663def run_doctest(module, verbosity=None): 664 """Run doctest on the given module. Return (#failures, #tests). 665 666 If optional argument verbosity is not specified (or is None), pass 667 test_support's belief about verbosity on to doctest. Else doctest's 668 usual behavior is used (it searches sys.argv for -v). 669 """ 670 671 import doctest 672 673 if verbosity is None: 674 verbosity = verbose 675 else: 676 verbosity = None 677 678 # Direct doctest output (normally just errors) to real stdout; doctest 679 # output shouldn't be compared by regrtest. 680 save_stdout = sys.stdout 681 sys.stdout = get_original_stdout() 682 try: 683 f, t = doctest.testmod(module, verbose=verbosity) 684 if f: 685 raise TestFailed("%d of %d doctests failed" % (f, t)) 686 finally: 687 sys.stdout = save_stdout 688 if verbose: 689 print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t) 690 return f, t 691 692#======================================================================= 693# Threading support to prevent reporting refleaks when running regrtest.py -R 694 695def threading_setup(): 696 import threading 697 return len(threading._active), len(threading._limbo) 698 699def threading_cleanup(num_active, num_limbo): 700 import threading 701 import time 702 703 _MAX_COUNT = 10 704 count = 0 705 while len(threading._active) != num_active and count < _MAX_COUNT: 706 count += 1 707 time.sleep(0.1) 708 709 count = 0 710 while len(threading._limbo) != num_limbo and count < _MAX_COUNT: 711 count += 1 712 time.sleep(0.1) 713 714def reap_children(): 715 """Use this function at the end of test_main() whenever sub-processes 716 are started. This will help ensure that no extra children (zombies) 717 stick around to hog resources and create problems when looking 718 for refleaks. 719 """ 720 721 # Reap all our dead child processes so we don't leave zombies around. 722 # These hog resources and might be causing some of the buildbots to die. 723 if hasattr(os, 'waitpid'): 724 any_process = -1 725 while True: 726 try: 727 # This will raise an exception on Windows. That's ok. 728 pid, status = os.waitpid(any_process, os.WNOHANG) 729 if pid == 0: 730 break 731 except: 732 break 733