test_support.py revision 6de9e938a5f70128a77c589b3ec40936f6335c1c
1"""Supporting definitions for the Python regression tests.""" 2 3if __name__ != 'test.test_support': 4 raise ImportError('test_support must be imported from the test package') 5 6import contextlib 7import errno 8import functools 9import gc 10import socket 11import sys 12import os 13import platform 14import shutil 15import warnings 16import unittest 17import importlib 18import UserDict 19import re 20 21__all__ = ["Error", "TestFailed", "ResourceDenied", "import_module", 22 "verbose", "use_resources", "max_memuse", "record_original_stdout", 23 "get_original_stdout", "unload", "unlink", "rmtree", "forget", 24 "is_resource_enabled", "requires", "find_unused_port", "bind_port", 25 "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ", 26 "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error", 27 "open_urlresource", "check_warnings", "check_py3k_warnings", 28 "CleanImport", "EnvironmentVarGuard", "captured_output", 29 "captured_stdout", "TransientResource", "transient_internet", 30 "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest", 31 "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup", 32 "threading_cleanup", "reap_children", "cpython_only", 33 "check_impl_detail", "get_attribute", "py3k_bytes"] 34 35class Error(Exception): 36 """Base class for regression test exceptions.""" 37 38class TestFailed(Error): 39 """Test failed.""" 40 41class ResourceDenied(unittest.SkipTest): 42 """Test skipped because it requested a disallowed resource. 43 44 This is raised when a test calls requires() for a resource that 45 has not be enabled. It is used to distinguish between expected 46 and unexpected skips. 47 """ 48 49@contextlib.contextmanager 50def _ignore_deprecated_imports(ignore=True): 51 """Context manager to suppress package and module deprecation 52 warnings when importing them. 53 54 If ignore is False, this context manager has no effect.""" 55 if ignore: 56 with warnings.catch_warnings(): 57 warnings.filterwarnings("ignore", ".+ (module|package)", 58 DeprecationWarning) 59 yield 60 else: 61 yield 62 63 64def import_module(name, deprecated=False): 65 """Import and return the module to be tested, raising SkipTest if 66 it is not available. 67 68 If deprecated is True, any module or package deprecation messages 69 will be suppressed.""" 70 with _ignore_deprecated_imports(deprecated): 71 try: 72 return importlib.import_module(name) 73 except ImportError, msg: 74 raise unittest.SkipTest(str(msg)) 75 76 77def _save_and_remove_module(name, orig_modules): 78 """Helper function to save and remove a module from sys.modules 79 80 Return value is True if the module was in sys.modules and 81 False otherwise.""" 82 saved = True 83 try: 84 orig_modules[name] = sys.modules[name] 85 except KeyError: 86 saved = False 87 else: 88 del sys.modules[name] 89 return saved 90 91 92def _save_and_block_module(name, orig_modules): 93 """Helper function to save and block a module in sys.modules 94 95 Return value is True if the module was in sys.modules and 96 False otherwise.""" 97 saved = True 98 try: 99 orig_modules[name] = sys.modules[name] 100 except KeyError: 101 saved = False 102 sys.modules[name] = 0 103 return saved 104 105 106def import_fresh_module(name, fresh=(), blocked=(), deprecated=False): 107 """Imports and returns a module, deliberately bypassing the sys.modules cache 108 and importing a fresh copy of the module. Once the import is complete, 109 the sys.modules cache is restored to its original state. 110 111 Modules named in fresh are also imported anew if needed by the import. 112 113 Importing of modules named in blocked is prevented while the fresh import 114 takes place. 115 116 If deprecated is True, any module or package deprecation messages 117 will be suppressed.""" 118 # NOTE: test_heapq and test_warnings include extra sanity checks to make 119 # sure that this utility function is working as expected 120 with _ignore_deprecated_imports(deprecated): 121 # Keep track of modules saved for later restoration as well 122 # as those which just need a blocking entry removed 123 orig_modules = {} 124 names_to_remove = [] 125 _save_and_remove_module(name, orig_modules) 126 try: 127 for fresh_name in fresh: 128 _save_and_remove_module(fresh_name, orig_modules) 129 for blocked_name in blocked: 130 if not _save_and_block_module(blocked_name, orig_modules): 131 names_to_remove.append(blocked_name) 132 fresh_module = importlib.import_module(name) 133 finally: 134 for orig_name, module in orig_modules.items(): 135 sys.modules[orig_name] = module 136 for name_to_remove in names_to_remove: 137 del sys.modules[name_to_remove] 138 return fresh_module 139 140 141def get_attribute(obj, name): 142 """Get an attribute, raising SkipTest if AttributeError is raised.""" 143 try: 144 attribute = getattr(obj, name) 145 except AttributeError: 146 raise unittest.SkipTest("module %s has no attribute %s" % ( 147 obj.__name__, name)) 148 else: 149 return attribute 150 151 152verbose = 1 # Flag set to 0 by regrtest.py 153use_resources = None # Flag set to [] by regrtest.py 154max_memuse = 0 # Disable bigmem tests (they will still be run with 155 # small sizes, to make sure they work.) 156real_max_memuse = 0 157 158# _original_stdout is meant to hold stdout at the time regrtest began. 159# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. 160# The point is to have some flavor of stdout the user can actually see. 161_original_stdout = None 162def record_original_stdout(stdout): 163 global _original_stdout 164 _original_stdout = stdout 165 166def get_original_stdout(): 167 return _original_stdout or sys.stdout 168 169def unload(name): 170 try: 171 del sys.modules[name] 172 except KeyError: 173 pass 174 175def unlink(filename): 176 try: 177 os.unlink(filename) 178 except OSError: 179 pass 180 181def rmtree(path): 182 try: 183 shutil.rmtree(path) 184 except OSError, e: 185 # Unix returns ENOENT, Windows returns ESRCH. 186 if e.errno not in (errno.ENOENT, errno.ESRCH): 187 raise 188 189def forget(modname): 190 '''"Forget" a module was ever imported by removing it from sys.modules and 191 deleting any .pyc and .pyo files.''' 192 unload(modname) 193 for dirname in sys.path: 194 unlink(os.path.join(dirname, modname + os.extsep + 'pyc')) 195 # Deleting the .pyo file cannot be within the 'try' for the .pyc since 196 # the chance exists that there is no .pyc (and thus the 'try' statement 197 # is exited) but there is a .pyo file. 198 unlink(os.path.join(dirname, modname + os.extsep + 'pyo')) 199 200def is_resource_enabled(resource): 201 """Test whether a resource is enabled. Known resources are set by 202 regrtest.py.""" 203 return use_resources is not None and resource in use_resources 204 205def requires(resource, msg=None): 206 """Raise ResourceDenied if the specified resource is not available. 207 208 If the caller's module is __main__ then automatically return True. The 209 possibility of False being returned occurs when regrtest.py is executing.""" 210 # see if the caller's module is __main__ - if so, treat as if 211 # the resource was set 212 if sys._getframe(1).f_globals.get("__name__") == "__main__": 213 return 214 if not is_resource_enabled(resource): 215 if msg is None: 216 msg = "Use of the `%s' resource not enabled" % resource 217 raise ResourceDenied(msg) 218 219HOST = 'localhost' 220 221def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): 222 """Returns an unused port that should be suitable for binding. This is 223 achieved by creating a temporary socket with the same family and type as 224 the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to 225 the specified host address (defaults to 0.0.0.0) with the port set to 0, 226 eliciting an unused ephemeral port from the OS. The temporary socket is 227 then closed and deleted, and the ephemeral port is returned. 228 229 Either this method or bind_port() should be used for any tests where a 230 server socket needs to be bound to a particular port for the duration of 231 the test. Which one to use depends on whether the calling code is creating 232 a python socket, or if an unused port needs to be provided in a constructor 233 or passed to an external program (i.e. the -accept argument to openssl's 234 s_server mode). Always prefer bind_port() over find_unused_port() where 235 possible. Hard coded ports should *NEVER* be used. As soon as a server 236 socket is bound to a hard coded port, the ability to run multiple instances 237 of the test simultaneously on the same host is compromised, which makes the 238 test a ticking time bomb in a buildbot environment. On Unix buildbots, this 239 may simply manifest as a failed test, which can be recovered from without 240 intervention in most cases, but on Windows, the entire python process can 241 completely and utterly wedge, requiring someone to log in to the buildbot 242 and manually kill the affected process. 243 244 (This is easy to reproduce on Windows, unfortunately, and can be traced to 245 the SO_REUSEADDR socket option having different semantics on Windows versus 246 Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind, 247 listen and then accept connections on identical host/ports. An EADDRINUSE 248 socket.error will be raised at some point (depending on the platform and 249 the order bind and listen were called on each socket). 250 251 However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE 252 will ever be raised when attempting to bind two identical host/ports. When 253 accept() is called on each socket, the second caller's process will steal 254 the port from the first caller, leaving them both in an awkwardly wedged 255 state where they'll no longer respond to any signals or graceful kills, and 256 must be forcibly killed via OpenProcess()/TerminateProcess(). 257 258 The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option 259 instead of SO_REUSEADDR, which effectively affords the same semantics as 260 SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open 261 Source world compared to Windows ones, this is a common mistake. A quick 262 look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when 263 openssl.exe is called with the 's_server' option, for example. See 264 http://bugs.python.org/issue2550 for more info. The following site also 265 has a very thorough description about the implications of both REUSEADDR 266 and EXCLUSIVEADDRUSE on Windows: 267 http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx) 268 269 XXX: although this approach is a vast improvement on previous attempts to 270 elicit unused ports, it rests heavily on the assumption that the ephemeral 271 port returned to us by the OS won't immediately be dished back out to some 272 other process when we close and delete our temporary socket but before our 273 calling code has a chance to bind the returned port. We can deal with this 274 issue if/when we come across it.""" 275 tempsock = socket.socket(family, socktype) 276 port = bind_port(tempsock) 277 tempsock.close() 278 del tempsock 279 return port 280 281def bind_port(sock, host=HOST): 282 """Bind the socket to a free port and return the port number. Relies on 283 ephemeral ports in order to ensure we are using an unbound port. This is 284 important as many tests may be running simultaneously, especially in a 285 buildbot environment. This method raises an exception if the sock.family 286 is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR 287 or SO_REUSEPORT set on it. Tests should *never* set these socket options 288 for TCP/IP sockets. The only case for setting these options is testing 289 multicasting via multiple UDP sockets. 290 291 Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. 292 on Windows), it will be set on the socket. This will prevent anyone else 293 from bind()'ing to our host/port for the duration of the test. 294 """ 295 if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: 296 if hasattr(socket, 'SO_REUSEADDR'): 297 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: 298 raise TestFailed("tests should never set the SO_REUSEADDR " \ 299 "socket option on TCP/IP sockets!") 300 if hasattr(socket, 'SO_REUSEPORT'): 301 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: 302 raise TestFailed("tests should never set the SO_REUSEPORT " \ 303 "socket option on TCP/IP sockets!") 304 if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): 305 sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) 306 307 sock.bind((host, 0)) 308 port = sock.getsockname()[1] 309 return port 310 311FUZZ = 1e-6 312 313def fcmp(x, y): # fuzzy comparison function 314 if isinstance(x, float) or isinstance(y, float): 315 try: 316 fuzz = (abs(x) + abs(y)) * FUZZ 317 if abs(x-y) <= fuzz: 318 return 0 319 except: 320 pass 321 elif type(x) == type(y) and isinstance(x, (tuple, list)): 322 for i in range(min(len(x), len(y))): 323 outcome = fcmp(x[i], y[i]) 324 if outcome != 0: 325 return outcome 326 return (len(x) > len(y)) - (len(x) < len(y)) 327 return (x > y) - (x < y) 328 329try: 330 unicode 331 have_unicode = True 332except NameError: 333 have_unicode = False 334 335is_jython = sys.platform.startswith('java') 336 337# Filename used for testing 338if os.name == 'java': 339 # Jython disallows @ in module names 340 TESTFN = '$test' 341elif os.name == 'riscos': 342 TESTFN = 'testfile' 343else: 344 TESTFN = '@test' 345 # Unicode name only used if TEST_FN_ENCODING exists for the platform. 346 if have_unicode: 347 # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding() 348 # TESTFN_UNICODE is a filename that can be encoded using the 349 # file system encoding, but *not* with the default (ascii) encoding 350 if isinstance('', unicode): 351 # python -U 352 # XXX perhaps unicode() should accept Unicode strings? 353 TESTFN_UNICODE = "@test-\xe0\xf2" 354 else: 355 # 2 latin characters. 356 TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1") 357 TESTFN_ENCODING = sys.getfilesystemencoding() 358 # TESTFN_UNICODE_UNENCODEABLE is a filename that should *not* be 359 # able to be encoded by *either* the default or filesystem encoding. 360 # This test really only makes sense on Windows NT platforms 361 # which have special Unicode support in posixmodule. 362 if (not hasattr(sys, "getwindowsversion") or 363 sys.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME 364 TESTFN_UNICODE_UNENCODEABLE = None 365 else: 366 # Japanese characters (I think - from bug 846133) 367 TESTFN_UNICODE_UNENCODEABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"') 368 try: 369 # XXX - Note - should be using TESTFN_ENCODING here - but for 370 # Windows, "mbcs" currently always operates as if in 371 # errors=ignore' mode - hence we get '?' characters rather than 372 # the exception. 'Latin1' operates as we expect - ie, fails. 373 # See [ 850997 ] mbcs encoding ignores errors 374 TESTFN_UNICODE_UNENCODEABLE.encode("Latin1") 375 except UnicodeEncodeError: 376 pass 377 else: 378 print \ 379 'WARNING: The filename %r CAN be encoded by the filesystem. ' \ 380 'Unicode filename tests may not be effective' \ 381 % TESTFN_UNICODE_UNENCODEABLE 382 383 384# Disambiguate TESTFN for parallel testing, while letting it remain a valid 385# module name. 386TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid()) 387 388# Save the initial cwd 389SAVEDCWD = os.getcwd() 390 391@contextlib.contextmanager 392def temp_cwd(name='tempcwd', quiet=False): 393 """ 394 Context manager that creates a temporary directory and set it as CWD. 395 396 The new CWD is created in the current directory and it's named *name*. 397 If *quiet* is False (default) and it's not possible to create or change 398 the CWD, an error is raised. If it's True, only a warning is raised 399 and the original CWD is used. 400 """ 401 if isinstance(name, unicode): 402 try: 403 name = name.encode(sys.getfilesystemencoding() or 'ascii') 404 except UnicodeEncodeError: 405 if not quiet: 406 raise unittest.SkipTest('unable to encode the cwd name with ' 407 'the filesystem encoding.') 408 saved_dir = os.getcwd() 409 is_temporary = False 410 try: 411 os.mkdir(name) 412 os.chdir(name) 413 is_temporary = True 414 except OSError: 415 if not quiet: 416 raise 417 warnings.warn('tests may fail, unable to change the CWD to ' + name, 418 RuntimeWarning, stacklevel=3) 419 try: 420 yield os.getcwd() 421 finally: 422 os.chdir(saved_dir) 423 if is_temporary: 424 rmtree(name) 425 426 427def findfile(file, here=__file__): 428 """Try to find a file on sys.path and the working directory. If it is not 429 found the argument passed to the function is returned (this does not 430 necessarily signal failure; could still be the legitimate path).""" 431 if os.path.isabs(file): 432 return file 433 path = sys.path 434 path = [os.path.dirname(here)] + path 435 for dn in path: 436 fn = os.path.join(dn, file) 437 if os.path.exists(fn): return fn 438 return file 439 440def sortdict(dict): 441 "Like repr(dict), but in sorted order." 442 items = dict.items() 443 items.sort() 444 reprpairs = ["%r: %r" % pair for pair in items] 445 withcommas = ", ".join(reprpairs) 446 return "{%s}" % withcommas 447 448def make_bad_fd(): 449 """ 450 Create an invalid file descriptor by opening and closing a file and return 451 its fd. 452 """ 453 file = open(TESTFN, "wb") 454 try: 455 return file.fileno() 456 finally: 457 file.close() 458 unlink(TESTFN) 459 460def check_syntax_error(testcase, statement): 461 testcase.assertRaises(SyntaxError, compile, statement, 462 '<test string>', 'exec') 463 464def open_urlresource(url): 465 import urlparse, urllib2 466 467 requires('urlfetch') 468 filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL! 469 470 fn = os.path.join(os.path.dirname(__file__), "data", filename) 471 if os.path.exists(fn): 472 return open(fn) 473 474 print >> get_original_stdout(), '\tfetching %s ...' % url 475 f = urllib2.urlopen(url, timeout=15) 476 try: 477 with open(fn, "wb") as out: 478 s = f.read() 479 while s: 480 out.write(s) 481 s = f.read() 482 finally: 483 f.close() 484 return open(fn) 485 486 487class WarningsRecorder(object): 488 """Convenience wrapper for the warnings list returned on 489 entry to the warnings.catch_warnings() context manager. 490 """ 491 def __init__(self, warnings_list): 492 self._warnings = warnings_list 493 self._last = 0 494 495 def __getattr__(self, attr): 496 if len(self._warnings) > self._last: 497 return getattr(self._warnings[-1], attr) 498 elif attr in warnings.WarningMessage._WARNING_DETAILS: 499 return None 500 raise AttributeError("%r has no attribute %r" % (self, attr)) 501 502 @property 503 def warnings(self): 504 return self._warnings[self._last:] 505 506 def reset(self): 507 self._last = len(self._warnings) 508 509 510def _filterwarnings(filters, quiet=False): 511 """Catch the warnings, then check if all the expected 512 warnings have been raised and re-raise unexpected warnings. 513 If 'quiet' is True, only re-raise the unexpected warnings. 514 """ 515 # Clear the warning registry of the calling module 516 # in order to re-raise the warnings. 517 frame = sys._getframe(2) 518 registry = frame.f_globals.get('__warningregistry__') 519 if registry: 520 registry.clear() 521 with warnings.catch_warnings(record=True) as w: 522 # Disable filters, to record all warnings. Because 523 # test_warnings swap the module, we need to look up 524 # in the sys.modules dictionary. 525 sys.modules['warnings'].resetwarnings() 526 yield WarningsRecorder(w) 527 # Filter the recorded warnings 528 reraise = [warning.message for warning in w] 529 missing = [] 530 for msg, cat in filters: 531 seen = False 532 for exc in reraise[:]: 533 message = str(exc) 534 # Filter out the matching messages 535 if (re.match(msg, message, re.I) and 536 issubclass(exc.__class__, cat)): 537 seen = True 538 reraise.remove(exc) 539 if not seen and not quiet: 540 # This filter caught nothing 541 missing.append((msg, cat.__name__)) 542 for exc in reraise: 543 raise AssertionError("unhandled warning %r" % exc) 544 for filter in missing: 545 raise AssertionError("filter (%r, %s) did not caught any warning" % 546 filter) 547 548 549@contextlib.contextmanager 550def check_warnings(*filters, **kwargs): 551 """Context manager to silence warnings. 552 553 Accept 2-tuples as positional arguments: 554 ("message regexp", WarningCategory) 555 556 Optional argument: 557 - if 'quiet' is True, it does not fail if a filter catches nothing 558 (default False) 559 560 Without argument, it defaults to: 561 check_warnings(("", Warning), quiet=False) 562 """ 563 if not filters: 564 filters = (("", Warning),) 565 return _filterwarnings(filters, kwargs.get('quiet')) 566 567 568@contextlib.contextmanager 569def check_py3k_warnings(*filters, **kwargs): 570 """Context manager to silence py3k warnings. 571 572 Accept 2-tuples as positional arguments: 573 ("message regexp", WarningCategory) 574 575 Optional argument: 576 - if 'quiet' is True, it does not fail if a filter catches nothing 577 (default False) 578 579 Without argument, it defaults to: 580 check_py3k_warnings(("", DeprecationWarning), quiet=False) 581 """ 582 if sys.py3kwarning: 583 if not filters: 584 filters = (("", DeprecationWarning),) 585 else: 586 # It should not raise any py3k warning 587 filters = () 588 return _filterwarnings(filters, kwargs.get('quiet')) 589 590 591class CleanImport(object): 592 """Context manager to force import to return a new module reference. 593 594 This is useful for testing module-level behaviours, such as 595 the emission of a DeprecationWarning on import. 596 597 Use like this: 598 599 with CleanImport("foo"): 600 importlib.import_module("foo") # new reference 601 """ 602 603 def __init__(self, *module_names): 604 self.original_modules = sys.modules.copy() 605 for module_name in module_names: 606 if module_name in sys.modules: 607 module = sys.modules[module_name] 608 # It is possible that module_name is just an alias for 609 # another module (e.g. stub for modules renamed in 3.x). 610 # In that case, we also need delete the real module to clear 611 # the import cache. 612 if module.__name__ != module_name: 613 del sys.modules[module.__name__] 614 del sys.modules[module_name] 615 616 def __enter__(self): 617 return self 618 619 def __exit__(self, *ignore_exc): 620 sys.modules.update(self.original_modules) 621 622 623class EnvironmentVarGuard(UserDict.DictMixin): 624 625 """Class to help protect the environment variable properly. Can be used as 626 a context manager.""" 627 628 def __init__(self): 629 self._environ = os.environ 630 self._changed = {} 631 632 def __getitem__(self, envvar): 633 return self._environ[envvar] 634 635 def __setitem__(self, envvar, value): 636 # Remember the initial value on the first access 637 if envvar not in self._changed: 638 self._changed[envvar] = self._environ.get(envvar) 639 self._environ[envvar] = value 640 641 def __delitem__(self, envvar): 642 # Remember the initial value on the first access 643 if envvar not in self._changed: 644 self._changed[envvar] = self._environ.get(envvar) 645 if envvar in self._environ: 646 del self._environ[envvar] 647 648 def keys(self): 649 return self._environ.keys() 650 651 def set(self, envvar, value): 652 self[envvar] = value 653 654 def unset(self, envvar): 655 del self[envvar] 656 657 def __enter__(self): 658 return self 659 660 def __exit__(self, *ignore_exc): 661 for (k, v) in self._changed.items(): 662 if v is None: 663 if k in self._environ: 664 del self._environ[k] 665 else: 666 self._environ[k] = v 667 os.environ = self._environ 668 669 670class DirsOnSysPath(object): 671 """Context manager to temporarily add directories to sys.path. 672 673 This makes a copy of sys.path, appends any directories given 674 as positional arguments, then reverts sys.path to the copied 675 settings when the context ends. 676 677 Note that *all* sys.path modifications in the body of the 678 context manager, including replacement of the object, 679 will be reverted at the end of the block. 680 """ 681 682 def __init__(self, *paths): 683 self.original_value = sys.path[:] 684 self.original_object = sys.path 685 sys.path.extend(paths) 686 687 def __enter__(self): 688 return self 689 690 def __exit__(self, *ignore_exc): 691 sys.path = self.original_object 692 sys.path[:] = self.original_value 693 694 695class TransientResource(object): 696 697 """Raise ResourceDenied if an exception is raised while the context manager 698 is in effect that matches the specified exception and attributes.""" 699 700 def __init__(self, exc, **kwargs): 701 self.exc = exc 702 self.attrs = kwargs 703 704 def __enter__(self): 705 return self 706 707 def __exit__(self, type_=None, value=None, traceback=None): 708 """If type_ is a subclass of self.exc and value has attributes matching 709 self.attrs, raise ResourceDenied. Otherwise let the exception 710 propagate (if any).""" 711 if type_ is not None and issubclass(self.exc, type_): 712 for attr, attr_value in self.attrs.iteritems(): 713 if not hasattr(value, attr): 714 break 715 if getattr(value, attr) != attr_value: 716 break 717 else: 718 raise ResourceDenied("an optional resource is not available") 719 720 721@contextlib.contextmanager 722def transient_internet(): 723 """Return a context manager that raises ResourceDenied when various issues 724 with the Internet connection manifest themselves as exceptions.""" 725 time_out = TransientResource(IOError, errno=errno.ETIMEDOUT) 726 socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET) 727 ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET) 728 with time_out, socket_peer_reset, ioerror_peer_reset: 729 yield 730 731 732@contextlib.contextmanager 733def captured_output(stream_name): 734 """Run the 'with' statement body using a StringIO object in place of a 735 specific attribute on the sys module. 736 Example use (with 'stream_name=stdout'):: 737 738 with captured_stdout() as s: 739 print "hello" 740 assert s.getvalue() == "hello" 741 """ 742 import StringIO 743 orig_stdout = getattr(sys, stream_name) 744 setattr(sys, stream_name, StringIO.StringIO()) 745 try: 746 yield getattr(sys, stream_name) 747 finally: 748 setattr(sys, stream_name, orig_stdout) 749 750def captured_stdout(): 751 return captured_output("stdout") 752 753def captured_stdin(): 754 return captured_output("stdin") 755 756def gc_collect(): 757 """Force as many objects as possible to be collected. 758 759 In non-CPython implementations of Python, this is needed because timely 760 deallocation is not guaranteed by the garbage collector. (Even in CPython 761 this can be the case in case of reference cycles.) This means that __del__ 762 methods may be called later than expected and weakrefs may remain alive for 763 longer than expected. This function tries its best to force all garbage 764 objects to disappear. 765 """ 766 gc.collect() 767 gc.collect() 768 gc.collect() 769 770 771#======================================================================= 772# Decorator for running a function in a different locale, correctly resetting 773# it afterwards. 774 775def run_with_locale(catstr, *locales): 776 def decorator(func): 777 def inner(*args, **kwds): 778 try: 779 import locale 780 category = getattr(locale, catstr) 781 orig_locale = locale.setlocale(category) 782 except AttributeError: 783 # if the test author gives us an invalid category string 784 raise 785 except: 786 # cannot retrieve original locale, so do nothing 787 locale = orig_locale = None 788 else: 789 for loc in locales: 790 try: 791 locale.setlocale(category, loc) 792 break 793 except: 794 pass 795 796 # now run the function, resetting the locale on exceptions 797 try: 798 return func(*args, **kwds) 799 finally: 800 if locale and orig_locale: 801 locale.setlocale(category, orig_locale) 802 inner.func_name = func.func_name 803 inner.__doc__ = func.__doc__ 804 return inner 805 return decorator 806 807#======================================================================= 808# Big-memory-test support. Separate from 'resources' because memory use should be configurable. 809 810# Some handy shorthands. Note that these are used for byte-limits as well 811# as size-limits, in the various bigmem tests 812_1M = 1024*1024 813_1G = 1024 * _1M 814_2G = 2 * _1G 815_4G = 4 * _1G 816 817MAX_Py_ssize_t = sys.maxsize 818 819def set_memlimit(limit): 820 global max_memuse 821 global real_max_memuse 822 sizes = { 823 'k': 1024, 824 'm': _1M, 825 'g': _1G, 826 't': 1024*_1G, 827 } 828 m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, 829 re.IGNORECASE | re.VERBOSE) 830 if m is None: 831 raise ValueError('Invalid memory limit %r' % (limit,)) 832 memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) 833 real_max_memuse = memlimit 834 if memlimit > MAX_Py_ssize_t: 835 memlimit = MAX_Py_ssize_t 836 if memlimit < _2G - 1: 837 raise ValueError('Memory limit %r too low to be useful' % (limit,)) 838 max_memuse = memlimit 839 840def bigmemtest(minsize, memuse, overhead=5*_1M): 841 """Decorator for bigmem tests. 842 843 'minsize' is the minimum useful size for the test (in arbitrary, 844 test-interpreted units.) 'memuse' is the number of 'bytes per size' for 845 the test, or a good estimate of it. 'overhead' specifies fixed overhead, 846 independent of the testsize, and defaults to 5Mb. 847 848 The decorator tries to guess a good value for 'size' and passes it to 849 the decorated test function. If minsize * memuse is more than the 850 allowed memory use (as defined by max_memuse), the test is skipped. 851 Otherwise, minsize is adjusted upward to use up to max_memuse. 852 """ 853 def decorator(f): 854 def wrapper(self): 855 if not max_memuse: 856 # If max_memuse is 0 (the default), 857 # we still want to run the tests with size set to a few kb, 858 # to make sure they work. We still want to avoid using 859 # too much memory, though, but we do that noisily. 860 maxsize = 5147 861 self.assertFalse(maxsize * memuse + overhead > 20 * _1M) 862 else: 863 maxsize = int((max_memuse - overhead) / memuse) 864 if maxsize < minsize: 865 # Really ought to print 'test skipped' or something 866 if verbose: 867 sys.stderr.write("Skipping %s because of memory " 868 "constraint\n" % (f.__name__,)) 869 return 870 # Try to keep some breathing room in memory use 871 maxsize = max(maxsize - 50 * _1M, minsize) 872 return f(self, maxsize) 873 wrapper.minsize = minsize 874 wrapper.memuse = memuse 875 wrapper.overhead = overhead 876 return wrapper 877 return decorator 878 879def precisionbigmemtest(size, memuse, overhead=5*_1M): 880 def decorator(f): 881 def wrapper(self): 882 if not real_max_memuse: 883 maxsize = 5147 884 else: 885 maxsize = size 886 887 if real_max_memuse and real_max_memuse < maxsize * memuse: 888 if verbose: 889 sys.stderr.write("Skipping %s because of memory " 890 "constraint\n" % (f.__name__,)) 891 return 892 893 return f(self, maxsize) 894 wrapper.size = size 895 wrapper.memuse = memuse 896 wrapper.overhead = overhead 897 return wrapper 898 return decorator 899 900def bigaddrspacetest(f): 901 """Decorator for tests that fill the address space.""" 902 def wrapper(self): 903 if max_memuse < MAX_Py_ssize_t: 904 if verbose: 905 sys.stderr.write("Skipping %s because of memory " 906 "constraint\n" % (f.__name__,)) 907 else: 908 return f(self) 909 return wrapper 910 911#======================================================================= 912# unittest integration. 913 914class BasicTestRunner: 915 def run(self, test): 916 result = unittest.TestResult() 917 test(result) 918 return result 919 920def _id(obj): 921 return obj 922 923def requires_resource(resource): 924 if resource_is_enabled(resource): 925 return _id 926 else: 927 return unittest.skip("resource {0!r} is not enabled".format(resource)) 928 929def cpython_only(test): 930 """ 931 Decorator for tests only applicable on CPython. 932 """ 933 return impl_detail(cpython=True)(test) 934 935def impl_detail(msg=None, **guards): 936 if check_impl_detail(**guards): 937 return _id 938 if msg is None: 939 guardnames, default = _parse_guards(guards) 940 if default: 941 msg = "implementation detail not available on {0}" 942 else: 943 msg = "implementation detail specific to {0}" 944 guardnames = sorted(guardnames.keys()) 945 msg = msg.format(' or '.join(guardnames)) 946 return unittest.skip(msg) 947 948def _parse_guards(guards): 949 # Returns a tuple ({platform_name: run_me}, default_value) 950 if not guards: 951 return ({'cpython': True}, False) 952 is_true = guards.values()[0] 953 assert guards.values() == [is_true] * len(guards) # all True or all False 954 return (guards, not is_true) 955 956# Use the following check to guard CPython's implementation-specific tests -- 957# or to run them only on the implementation(s) guarded by the arguments. 958def check_impl_detail(**guards): 959 """This function returns True or False depending on the host platform. 960 Examples: 961 if check_impl_detail(): # only on CPython (default) 962 if check_impl_detail(jython=True): # only on Jython 963 if check_impl_detail(cpython=False): # everywhere except on CPython 964 """ 965 guards, default = _parse_guards(guards) 966 return guards.get(platform.python_implementation().lower(), default) 967 968 969 970def _run_suite(suite): 971 """Run tests from a unittest.TestSuite-derived class.""" 972 if verbose: 973 runner = unittest.TextTestRunner(sys.stdout, verbosity=2) 974 else: 975 runner = BasicTestRunner() 976 977 result = runner.run(suite) 978 if not result.wasSuccessful(): 979 if len(result.errors) == 1 and not result.failures: 980 err = result.errors[0][1] 981 elif len(result.failures) == 1 and not result.errors: 982 err = result.failures[0][1] 983 else: 984 err = "multiple errors occurred" 985 if not verbose: 986 err += "; run in verbose mode for details" 987 raise TestFailed(err) 988 989 990def run_unittest(*classes): 991 """Run tests from unittest.TestCase-derived classes.""" 992 valid_types = (unittest.TestSuite, unittest.TestCase) 993 suite = unittest.TestSuite() 994 for cls in classes: 995 if isinstance(cls, str): 996 if cls in sys.modules: 997 suite.addTest(unittest.findTestCases(sys.modules[cls])) 998 else: 999 raise ValueError("str arguments must be keys in sys.modules") 1000 elif isinstance(cls, valid_types): 1001 suite.addTest(cls) 1002 else: 1003 suite.addTest(unittest.makeSuite(cls)) 1004 _run_suite(suite) 1005 1006 1007#======================================================================= 1008# doctest driver. 1009 1010def run_doctest(module, verbosity=None): 1011 """Run doctest on the given module. Return (#failures, #tests). 1012 1013 If optional argument verbosity is not specified (or is None), pass 1014 test_support's belief about verbosity on to doctest. Else doctest's 1015 usual behavior is used (it searches sys.argv for -v). 1016 """ 1017 1018 import doctest 1019 1020 if verbosity is None: 1021 verbosity = verbose 1022 else: 1023 verbosity = None 1024 1025 # Direct doctest output (normally just errors) to real stdout; doctest 1026 # output shouldn't be compared by regrtest. 1027 save_stdout = sys.stdout 1028 sys.stdout = get_original_stdout() 1029 try: 1030 f, t = doctest.testmod(module, verbose=verbosity) 1031 if f: 1032 raise TestFailed("%d of %d doctests failed" % (f, t)) 1033 finally: 1034 sys.stdout = save_stdout 1035 if verbose: 1036 print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t) 1037 return f, t 1038 1039#======================================================================= 1040# Threading support to prevent reporting refleaks when running regrtest.py -R 1041 1042# NOTE: we use thread._count() rather than threading.enumerate() (or the 1043# moral equivalent thereof) because a threading.Thread object is still alive 1044# until its __bootstrap() method has returned, even after it has been 1045# unregistered from the threading module. 1046# thread._count(), on the other hand, only gets decremented *after* the 1047# __bootstrap() method has returned, which gives us reliable reference counts 1048# at the end of a test run. 1049 1050def threading_setup(): 1051 import thread 1052 return thread._count(), 1053 1054def threading_cleanup(nb_threads): 1055 import thread 1056 import time 1057 1058 _MAX_COUNT = 10 1059 for count in range(_MAX_COUNT): 1060 n = thread._count() 1061 if n == nb_threads: 1062 break 1063 time.sleep(0.1) 1064 # XXX print a warning in case of failure? 1065 1066def reap_threads(func): 1067 @functools.wraps(func) 1068 def decorator(*args): 1069 key = threading_setup() 1070 try: 1071 return func(*args) 1072 finally: 1073 threading_cleanup(*key) 1074 return decorator 1075 1076def reap_children(): 1077 """Use this function at the end of test_main() whenever sub-processes 1078 are started. This will help ensure that no extra children (zombies) 1079 stick around to hog resources and create problems when looking 1080 for refleaks. 1081 """ 1082 1083 # Reap all our dead child processes so we don't leave zombies around. 1084 # These hog resources and might be causing some of the buildbots to die. 1085 if hasattr(os, 'waitpid'): 1086 any_process = -1 1087 while True: 1088 try: 1089 # This will raise an exception on Windows. That's ok. 1090 pid, status = os.waitpid(any_process, os.WNOHANG) 1091 if pid == 0: 1092 break 1093 except: 1094 break 1095 1096def py3k_bytes(b): 1097 """Emulate the py3k bytes() constructor. 1098 1099 NOTE: This is only a best effort function. 1100 """ 1101 try: 1102 # memoryview? 1103 return b.tobytes() 1104 except AttributeError: 1105 try: 1106 # iterable of ints? 1107 return b"".join(chr(x) for x in b) 1108 except TypeError: 1109 return bytes(b) 1110