test_support.py revision 76249ea4a7ab1cb0fa41d967b2fb8975916cb955
1"""Supporting definitions for the Python regression tests.""" 2 3if __name__ != 'test.test_support': 4 raise ImportError('test_support must be imported from the test package') 5 6import contextlib 7import errno 8import functools 9import gc 10import socket 11import sys 12import os 13import platform 14import shutil 15import warnings 16import unittest 17import importlib 18import UserDict 19import re 20import time 21import struct 22import sysconfig 23try: 24 import thread 25except ImportError: 26 thread = None 27 28__all__ = ["Error", "TestFailed", "ResourceDenied", "import_module", 29 "verbose", "use_resources", "max_memuse", "record_original_stdout", 30 "get_original_stdout", "unload", "unlink", "rmtree", "forget", 31 "is_resource_enabled", "requires", "find_unused_port", "bind_port", 32 "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ", 33 "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error", 34 "open_urlresource", "check_warnings", "check_py3k_warnings", 35 "CleanImport", "EnvironmentVarGuard", "captured_output", 36 "captured_stdout", "TransientResource", "transient_internet", 37 "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest", 38 "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup", 39 "threading_cleanup", "reap_children", "cpython_only", 40 "check_impl_detail", "get_attribute", "py3k_bytes", 41 "import_fresh_module", "threading_cleanup", "reap_children", 42 "strip_python_stderr"] 43 44class Error(Exception): 45 """Base class for regression test exceptions.""" 46 47class TestFailed(Error): 48 """Test failed.""" 49 50class ResourceDenied(unittest.SkipTest): 51 """Test skipped because it requested a disallowed resource. 52 53 This is raised when a test calls requires() for a resource that 54 has not been enabled. It is used to distinguish between expected 55 and unexpected skips. 56 """ 57 58@contextlib.contextmanager 59def _ignore_deprecated_imports(ignore=True): 60 """Context manager to suppress package and module deprecation 61 warnings when importing them. 62 63 If ignore is False, this context manager has no effect.""" 64 if ignore: 65 with warnings.catch_warnings(): 66 warnings.filterwarnings("ignore", ".+ (module|package)", 67 DeprecationWarning) 68 yield 69 else: 70 yield 71 72 73def import_module(name, deprecated=False): 74 """Import and return the module to be tested, raising SkipTest if 75 it is not available. 76 77 If deprecated is True, any module or package deprecation messages 78 will be suppressed.""" 79 with _ignore_deprecated_imports(deprecated): 80 try: 81 return importlib.import_module(name) 82 except ImportError, msg: 83 raise unittest.SkipTest(str(msg)) 84 85 86def _save_and_remove_module(name, orig_modules): 87 """Helper function to save and remove a module from sys.modules 88 89 Raise ImportError if the module can't be imported.""" 90 # try to import the module and raise an error if it can't be imported 91 if name not in sys.modules: 92 __import__(name) 93 del sys.modules[name] 94 for modname in list(sys.modules): 95 if modname == name or modname.startswith(name + '.'): 96 orig_modules[modname] = sys.modules[modname] 97 del sys.modules[modname] 98 99def _save_and_block_module(name, orig_modules): 100 """Helper function to save and block a module in sys.modules 101 102 Return True if the module was in sys.modules, False otherwise.""" 103 saved = True 104 try: 105 orig_modules[name] = sys.modules[name] 106 except KeyError: 107 saved = False 108 sys.modules[name] = None 109 return saved 110 111 112def import_fresh_module(name, fresh=(), blocked=(), deprecated=False): 113 """Imports and returns a module, deliberately bypassing the sys.modules cache 114 and importing a fresh copy of the module. Once the import is complete, 115 the sys.modules cache is restored to its original state. 116 117 Modules named in fresh are also imported anew if needed by the import. 118 If one of these modules can't be imported, None is returned. 119 120 Importing of modules named in blocked is prevented while the fresh import 121 takes place. 122 123 If deprecated is True, any module or package deprecation messages 124 will be suppressed.""" 125 # NOTE: test_heapq, test_json, and test_warnings include extra sanity 126 # checks to make sure that this utility function is working as expected 127 with _ignore_deprecated_imports(deprecated): 128 # Keep track of modules saved for later restoration as well 129 # as those which just need a blocking entry removed 130 orig_modules = {} 131 names_to_remove = [] 132 _save_and_remove_module(name, orig_modules) 133 try: 134 for fresh_name in fresh: 135 _save_and_remove_module(fresh_name, orig_modules) 136 for blocked_name in blocked: 137 if not _save_and_block_module(blocked_name, orig_modules): 138 names_to_remove.append(blocked_name) 139 fresh_module = importlib.import_module(name) 140 except ImportError: 141 fresh_module = None 142 finally: 143 for orig_name, module in orig_modules.items(): 144 sys.modules[orig_name] = module 145 for name_to_remove in names_to_remove: 146 del sys.modules[name_to_remove] 147 return fresh_module 148 149 150def get_attribute(obj, name): 151 """Get an attribute, raising SkipTest if AttributeError is raised.""" 152 try: 153 attribute = getattr(obj, name) 154 except AttributeError: 155 raise unittest.SkipTest("module %s has no attribute %s" % ( 156 obj.__name__, name)) 157 else: 158 return attribute 159 160 161verbose = 1 # Flag set to 0 by regrtest.py 162use_resources = None # Flag set to [] by regrtest.py 163max_memuse = 0 # Disable bigmem tests (they will still be run with 164 # small sizes, to make sure they work.) 165real_max_memuse = 0 166 167# _original_stdout is meant to hold stdout at the time regrtest began. 168# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. 169# The point is to have some flavor of stdout the user can actually see. 170_original_stdout = None 171def record_original_stdout(stdout): 172 global _original_stdout 173 _original_stdout = stdout 174 175def get_original_stdout(): 176 return _original_stdout or sys.stdout 177 178def unload(name): 179 try: 180 del sys.modules[name] 181 except KeyError: 182 pass 183 184if sys.platform.startswith("win"): 185 def _waitfor(func, pathname, waitall=False): 186 # Perform the operation 187 func(pathname) 188 # Now setup the wait loop 189 if waitall: 190 dirname = pathname 191 else: 192 dirname, name = os.path.split(pathname) 193 dirname = dirname or '.' 194 # Check for `pathname` to be removed from the filesystem. 195 # The exponential backoff of the timeout amounts to a total 196 # of ~1 second after which the deletion is probably an error 197 # anyway. 198 # Testing on a i7@4.3GHz shows that usually only 1 iteration is 199 # required when contention occurs. 200 timeout = 0.001 201 while timeout < 1.0: 202 # Note we are only testing for the existence of the file(s) in 203 # the contents of the directory regardless of any security or 204 # access rights. If we have made it this far, we have sufficient 205 # permissions to do that much using Python's equivalent of the 206 # Windows API FindFirstFile. 207 # Other Windows APIs can fail or give incorrect results when 208 # dealing with files that are pending deletion. 209 L = os.listdir(dirname) 210 if not (L if waitall else name in L): 211 return 212 # Increase the timeout and try again 213 time.sleep(timeout) 214 timeout *= 2 215 warnings.warn('tests may fail, delete still pending for ' + pathname, 216 RuntimeWarning, stacklevel=4) 217 218 def _unlink(filename): 219 _waitfor(os.unlink, filename) 220 221 def _rmdir(dirname): 222 _waitfor(os.rmdir, dirname) 223 224 def _rmtree(path): 225 def _rmtree_inner(path): 226 for name in os.listdir(path): 227 fullname = os.path.join(path, name) 228 if os.path.isdir(fullname): 229 _waitfor(_rmtree_inner, fullname, waitall=True) 230 os.rmdir(fullname) 231 else: 232 os.unlink(fullname) 233 _waitfor(_rmtree_inner, path, waitall=True) 234 _waitfor(os.rmdir, path) 235else: 236 _unlink = os.unlink 237 _rmdir = os.rmdir 238 _rmtree = shutil.rmtree 239 240def unlink(filename): 241 try: 242 _unlink(filename) 243 except OSError: 244 pass 245 246def rmdir(dirname): 247 try: 248 _rmdir(dirname) 249 except OSError as error: 250 # The directory need not exist. 251 if error.errno != errno.ENOENT: 252 raise 253 254def rmtree(path): 255 try: 256 _rmtree(path) 257 except OSError, e: 258 # Unix returns ENOENT, Windows returns ESRCH. 259 if e.errno not in (errno.ENOENT, errno.ESRCH): 260 raise 261 262def forget(modname): 263 '''"Forget" a module was ever imported by removing it from sys.modules and 264 deleting any .pyc and .pyo files.''' 265 unload(modname) 266 for dirname in sys.path: 267 unlink(os.path.join(dirname, modname + os.extsep + 'pyc')) 268 # Deleting the .pyo file cannot be within the 'try' for the .pyc since 269 # the chance exists that there is no .pyc (and thus the 'try' statement 270 # is exited) but there is a .pyo file. 271 unlink(os.path.join(dirname, modname + os.extsep + 'pyo')) 272 273# On some platforms, should not run gui test even if it is allowed 274# in `use_resources'. 275if sys.platform.startswith('win'): 276 import ctypes 277 import ctypes.wintypes 278 def _is_gui_available(): 279 UOI_FLAGS = 1 280 WSF_VISIBLE = 0x0001 281 class USEROBJECTFLAGS(ctypes.Structure): 282 _fields_ = [("fInherit", ctypes.wintypes.BOOL), 283 ("fReserved", ctypes.wintypes.BOOL), 284 ("dwFlags", ctypes.wintypes.DWORD)] 285 dll = ctypes.windll.user32 286 h = dll.GetProcessWindowStation() 287 if not h: 288 raise ctypes.WinError() 289 uof = USEROBJECTFLAGS() 290 needed = ctypes.wintypes.DWORD() 291 res = dll.GetUserObjectInformationW(h, 292 UOI_FLAGS, 293 ctypes.byref(uof), 294 ctypes.sizeof(uof), 295 ctypes.byref(needed)) 296 if not res: 297 raise ctypes.WinError() 298 return bool(uof.dwFlags & WSF_VISIBLE) 299else: 300 def _is_gui_available(): 301 return True 302 303def is_resource_enabled(resource): 304 """Test whether a resource is enabled. Known resources are set by 305 regrtest.py.""" 306 return use_resources is not None and resource in use_resources 307 308def requires(resource, msg=None): 309 """Raise ResourceDenied if the specified resource is not available. 310 311 If the caller's module is __main__ then automatically return True. The 312 possibility of False being returned occurs when regrtest.py is executing.""" 313 if resource == 'gui' and not _is_gui_available(): 314 raise unittest.SkipTest("Cannot use the 'gui' resource") 315 # see if the caller's module is __main__ - if so, treat as if 316 # the resource was set 317 if sys._getframe(1).f_globals.get("__name__") == "__main__": 318 return 319 if not is_resource_enabled(resource): 320 if msg is None: 321 msg = "Use of the `%s' resource not enabled" % resource 322 raise ResourceDenied(msg) 323 324 325# Don't use "localhost", since resolving it uses the DNS under recent 326# Windows versions (see issue #18792). 327HOST = "127.0.0.1" 328HOSTv6 = "::1" 329 330 331def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): 332 """Returns an unused port that should be suitable for binding. This is 333 achieved by creating a temporary socket with the same family and type as 334 the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to 335 the specified host address (defaults to 0.0.0.0) with the port set to 0, 336 eliciting an unused ephemeral port from the OS. The temporary socket is 337 then closed and deleted, and the ephemeral port is returned. 338 339 Either this method or bind_port() should be used for any tests where a 340 server socket needs to be bound to a particular port for the duration of 341 the test. Which one to use depends on whether the calling code is creating 342 a python socket, or if an unused port needs to be provided in a constructor 343 or passed to an external program (i.e. the -accept argument to openssl's 344 s_server mode). Always prefer bind_port() over find_unused_port() where 345 possible. Hard coded ports should *NEVER* be used. As soon as a server 346 socket is bound to a hard coded port, the ability to run multiple instances 347 of the test simultaneously on the same host is compromised, which makes the 348 test a ticking time bomb in a buildbot environment. On Unix buildbots, this 349 may simply manifest as a failed test, which can be recovered from without 350 intervention in most cases, but on Windows, the entire python process can 351 completely and utterly wedge, requiring someone to log in to the buildbot 352 and manually kill the affected process. 353 354 (This is easy to reproduce on Windows, unfortunately, and can be traced to 355 the SO_REUSEADDR socket option having different semantics on Windows versus 356 Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind, 357 listen and then accept connections on identical host/ports. An EADDRINUSE 358 socket.error will be raised at some point (depending on the platform and 359 the order bind and listen were called on each socket). 360 361 However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE 362 will ever be raised when attempting to bind two identical host/ports. When 363 accept() is called on each socket, the second caller's process will steal 364 the port from the first caller, leaving them both in an awkwardly wedged 365 state where they'll no longer respond to any signals or graceful kills, and 366 must be forcibly killed via OpenProcess()/TerminateProcess(). 367 368 The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option 369 instead of SO_REUSEADDR, which effectively affords the same semantics as 370 SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open 371 Source world compared to Windows ones, this is a common mistake. A quick 372 look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when 373 openssl.exe is called with the 's_server' option, for example. See 374 http://bugs.python.org/issue2550 for more info. The following site also 375 has a very thorough description about the implications of both REUSEADDR 376 and EXCLUSIVEADDRUSE on Windows: 377 http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx) 378 379 XXX: although this approach is a vast improvement on previous attempts to 380 elicit unused ports, it rests heavily on the assumption that the ephemeral 381 port returned to us by the OS won't immediately be dished back out to some 382 other process when we close and delete our temporary socket but before our 383 calling code has a chance to bind the returned port. We can deal with this 384 issue if/when we come across it.""" 385 tempsock = socket.socket(family, socktype) 386 port = bind_port(tempsock) 387 tempsock.close() 388 del tempsock 389 return port 390 391def bind_port(sock, host=HOST): 392 """Bind the socket to a free port and return the port number. Relies on 393 ephemeral ports in order to ensure we are using an unbound port. This is 394 important as many tests may be running simultaneously, especially in a 395 buildbot environment. This method raises an exception if the sock.family 396 is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR 397 or SO_REUSEPORT set on it. Tests should *never* set these socket options 398 for TCP/IP sockets. The only case for setting these options is testing 399 multicasting via multiple UDP sockets. 400 401 Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. 402 on Windows), it will be set on the socket. This will prevent anyone else 403 from bind()'ing to our host/port for the duration of the test. 404 """ 405 if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: 406 if hasattr(socket, 'SO_REUSEADDR'): 407 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: 408 raise TestFailed("tests should never set the SO_REUSEADDR " \ 409 "socket option on TCP/IP sockets!") 410 if hasattr(socket, 'SO_REUSEPORT'): 411 try: 412 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: 413 raise TestFailed("tests should never set the SO_REUSEPORT " \ 414 "socket option on TCP/IP sockets!") 415 except EnvironmentError: 416 # Python's socket module was compiled using modern headers 417 # thus defining SO_REUSEPORT but this process is running 418 # under an older kernel that does not support SO_REUSEPORT. 419 pass 420 if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): 421 sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) 422 423 sock.bind((host, 0)) 424 port = sock.getsockname()[1] 425 return port 426 427FUZZ = 1e-6 428 429def fcmp(x, y): # fuzzy comparison function 430 if isinstance(x, float) or isinstance(y, float): 431 try: 432 fuzz = (abs(x) + abs(y)) * FUZZ 433 if abs(x-y) <= fuzz: 434 return 0 435 except: 436 pass 437 elif type(x) == type(y) and isinstance(x, (tuple, list)): 438 for i in range(min(len(x), len(y))): 439 outcome = fcmp(x[i], y[i]) 440 if outcome != 0: 441 return outcome 442 return (len(x) > len(y)) - (len(x) < len(y)) 443 return (x > y) - (x < y) 444 445 446# A constant likely larger than the underlying OS pipe buffer size, to 447# make writes blocking. 448# Windows limit seems to be around 512 B, and many Unix kernels have a 449# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure. 450# (see issue #17835 for a discussion of this number). 451PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1 452 453# A constant likely larger than the underlying OS socket buffer size, to make 454# writes blocking. 455# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl 456# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643 457# for a discussion of this number). 458SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1 459 460try: 461 unicode 462 have_unicode = True 463except NameError: 464 have_unicode = False 465 466is_jython = sys.platform.startswith('java') 467 468# Filename used for testing 469if os.name == 'java': 470 # Jython disallows @ in module names 471 TESTFN = '$test' 472elif os.name == 'riscos': 473 TESTFN = 'testfile' 474else: 475 TESTFN = '@test' 476 # Unicode name only used if TEST_FN_ENCODING exists for the platform. 477 if have_unicode: 478 # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding() 479 # TESTFN_UNICODE is a filename that can be encoded using the 480 # file system encoding, but *not* with the default (ascii) encoding 481 if isinstance('', unicode): 482 # python -U 483 # XXX perhaps unicode() should accept Unicode strings? 484 TESTFN_UNICODE = "@test-\xe0\xf2" 485 else: 486 # 2 latin characters. 487 TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1") 488 TESTFN_ENCODING = sys.getfilesystemencoding() 489 # TESTFN_UNENCODABLE is a filename that should *not* be 490 # able to be encoded by *either* the default or filesystem encoding. 491 # This test really only makes sense on Windows NT platforms 492 # which have special Unicode support in posixmodule. 493 if (not hasattr(sys, "getwindowsversion") or 494 sys.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME 495 TESTFN_UNENCODABLE = None 496 else: 497 # Japanese characters (I think - from bug 846133) 498 TESTFN_UNENCODABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"') 499 try: 500 # XXX - Note - should be using TESTFN_ENCODING here - but for 501 # Windows, "mbcs" currently always operates as if in 502 # errors=ignore' mode - hence we get '?' characters rather than 503 # the exception. 'Latin1' operates as we expect - ie, fails. 504 # See [ 850997 ] mbcs encoding ignores errors 505 TESTFN_UNENCODABLE.encode("Latin1") 506 except UnicodeEncodeError: 507 pass 508 else: 509 print \ 510 'WARNING: The filename %r CAN be encoded by the filesystem. ' \ 511 'Unicode filename tests may not be effective' \ 512 % TESTFN_UNENCODABLE 513 514 515# Disambiguate TESTFN for parallel testing, while letting it remain a valid 516# module name. 517TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid()) 518 519# Save the initial cwd 520SAVEDCWD = os.getcwd() 521 522@contextlib.contextmanager 523def temp_cwd(name='tempcwd', quiet=False): 524 """ 525 Context manager that creates a temporary directory and set it as CWD. 526 527 The new CWD is created in the current directory and it's named *name*. 528 If *quiet* is False (default) and it's not possible to create or change 529 the CWD, an error is raised. If it's True, only a warning is raised 530 and the original CWD is used. 531 """ 532 if have_unicode and isinstance(name, unicode): 533 try: 534 name = name.encode(sys.getfilesystemencoding() or 'ascii') 535 except UnicodeEncodeError: 536 if not quiet: 537 raise unittest.SkipTest('unable to encode the cwd name with ' 538 'the filesystem encoding.') 539 saved_dir = os.getcwd() 540 is_temporary = False 541 try: 542 os.mkdir(name) 543 os.chdir(name) 544 is_temporary = True 545 except OSError: 546 if not quiet: 547 raise 548 warnings.warn('tests may fail, unable to change the CWD to ' + name, 549 RuntimeWarning, stacklevel=3) 550 try: 551 yield os.getcwd() 552 finally: 553 os.chdir(saved_dir) 554 if is_temporary: 555 rmtree(name) 556 557 558def findfile(file, here=__file__, subdir=None): 559 """Try to find a file on sys.path and the working directory. If it is not 560 found the argument passed to the function is returned (this does not 561 necessarily signal failure; could still be the legitimate path).""" 562 if os.path.isabs(file): 563 return file 564 if subdir is not None: 565 file = os.path.join(subdir, file) 566 path = sys.path 567 path = [os.path.dirname(here)] + path 568 for dn in path: 569 fn = os.path.join(dn, file) 570 if os.path.exists(fn): return fn 571 return file 572 573def sortdict(dict): 574 "Like repr(dict), but in sorted order." 575 items = dict.items() 576 items.sort() 577 reprpairs = ["%r: %r" % pair for pair in items] 578 withcommas = ", ".join(reprpairs) 579 return "{%s}" % withcommas 580 581def make_bad_fd(): 582 """ 583 Create an invalid file descriptor by opening and closing a file and return 584 its fd. 585 """ 586 file = open(TESTFN, "wb") 587 try: 588 return file.fileno() 589 finally: 590 file.close() 591 unlink(TESTFN) 592 593def check_syntax_error(testcase, statement): 594 testcase.assertRaises(SyntaxError, compile, statement, 595 '<test string>', 'exec') 596 597def open_urlresource(url, check=None): 598 import urlparse, urllib2 599 600 filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL! 601 602 fn = os.path.join(os.path.dirname(__file__), "data", filename) 603 604 def check_valid_file(fn): 605 f = open(fn) 606 if check is None: 607 return f 608 elif check(f): 609 f.seek(0) 610 return f 611 f.close() 612 613 if os.path.exists(fn): 614 f = check_valid_file(fn) 615 if f is not None: 616 return f 617 unlink(fn) 618 619 # Verify the requirement before downloading the file 620 requires('urlfetch') 621 622 print >> get_original_stdout(), '\tfetching %s ...' % url 623 f = urllib2.urlopen(url, timeout=15) 624 try: 625 with open(fn, "wb") as out: 626 s = f.read() 627 while s: 628 out.write(s) 629 s = f.read() 630 finally: 631 f.close() 632 633 f = check_valid_file(fn) 634 if f is not None: 635 return f 636 raise TestFailed('invalid resource "%s"' % fn) 637 638 639class WarningsRecorder(object): 640 """Convenience wrapper for the warnings list returned on 641 entry to the warnings.catch_warnings() context manager. 642 """ 643 def __init__(self, warnings_list): 644 self._warnings = warnings_list 645 self._last = 0 646 647 def __getattr__(self, attr): 648 if len(self._warnings) > self._last: 649 return getattr(self._warnings[-1], attr) 650 elif attr in warnings.WarningMessage._WARNING_DETAILS: 651 return None 652 raise AttributeError("%r has no attribute %r" % (self, attr)) 653 654 @property 655 def warnings(self): 656 return self._warnings[self._last:] 657 658 def reset(self): 659 self._last = len(self._warnings) 660 661 662def _filterwarnings(filters, quiet=False): 663 """Catch the warnings, then check if all the expected 664 warnings have been raised and re-raise unexpected warnings. 665 If 'quiet' is True, only re-raise the unexpected warnings. 666 """ 667 # Clear the warning registry of the calling module 668 # in order to re-raise the warnings. 669 frame = sys._getframe(2) 670 registry = frame.f_globals.get('__warningregistry__') 671 if registry: 672 registry.clear() 673 with warnings.catch_warnings(record=True) as w: 674 # Set filter "always" to record all warnings. Because 675 # test_warnings swap the module, we need to look up in 676 # the sys.modules dictionary. 677 sys.modules['warnings'].simplefilter("always") 678 yield WarningsRecorder(w) 679 # Filter the recorded warnings 680 reraise = [warning.message for warning in w] 681 missing = [] 682 for msg, cat in filters: 683 seen = False 684 for exc in reraise[:]: 685 message = str(exc) 686 # Filter out the matching messages 687 if (re.match(msg, message, re.I) and 688 issubclass(exc.__class__, cat)): 689 seen = True 690 reraise.remove(exc) 691 if not seen and not quiet: 692 # This filter caught nothing 693 missing.append((msg, cat.__name__)) 694 if reraise: 695 raise AssertionError("unhandled warning %r" % reraise[0]) 696 if missing: 697 raise AssertionError("filter (%r, %s) did not catch any warning" % 698 missing[0]) 699 700 701@contextlib.contextmanager 702def check_warnings(*filters, **kwargs): 703 """Context manager to silence warnings. 704 705 Accept 2-tuples as positional arguments: 706 ("message regexp", WarningCategory) 707 708 Optional argument: 709 - if 'quiet' is True, it does not fail if a filter catches nothing 710 (default True without argument, 711 default False if some filters are defined) 712 713 Without argument, it defaults to: 714 check_warnings(("", Warning), quiet=True) 715 """ 716 quiet = kwargs.get('quiet') 717 if not filters: 718 filters = (("", Warning),) 719 # Preserve backward compatibility 720 if quiet is None: 721 quiet = True 722 return _filterwarnings(filters, quiet) 723 724 725@contextlib.contextmanager 726def check_py3k_warnings(*filters, **kwargs): 727 """Context manager to silence py3k warnings. 728 729 Accept 2-tuples as positional arguments: 730 ("message regexp", WarningCategory) 731 732 Optional argument: 733 - if 'quiet' is True, it does not fail if a filter catches nothing 734 (default False) 735 736 Without argument, it defaults to: 737 check_py3k_warnings(("", DeprecationWarning), quiet=False) 738 """ 739 if sys.py3kwarning: 740 if not filters: 741 filters = (("", DeprecationWarning),) 742 else: 743 # It should not raise any py3k warning 744 filters = () 745 return _filterwarnings(filters, kwargs.get('quiet')) 746 747 748class CleanImport(object): 749 """Context manager to force import to return a new module reference. 750 751 This is useful for testing module-level behaviours, such as 752 the emission of a DeprecationWarning on import. 753 754 Use like this: 755 756 with CleanImport("foo"): 757 importlib.import_module("foo") # new reference 758 """ 759 760 def __init__(self, *module_names): 761 self.original_modules = sys.modules.copy() 762 for module_name in module_names: 763 if module_name in sys.modules: 764 module = sys.modules[module_name] 765 # It is possible that module_name is just an alias for 766 # another module (e.g. stub for modules renamed in 3.x). 767 # In that case, we also need delete the real module to clear 768 # the import cache. 769 if module.__name__ != module_name: 770 del sys.modules[module.__name__] 771 del sys.modules[module_name] 772 773 def __enter__(self): 774 return self 775 776 def __exit__(self, *ignore_exc): 777 sys.modules.update(self.original_modules) 778 779 780class EnvironmentVarGuard(UserDict.DictMixin): 781 782 """Class to help protect the environment variable properly. Can be used as 783 a context manager.""" 784 785 def __init__(self): 786 self._environ = os.environ 787 self._changed = {} 788 789 def __getitem__(self, envvar): 790 return self._environ[envvar] 791 792 def __setitem__(self, envvar, value): 793 # Remember the initial value on the first access 794 if envvar not in self._changed: 795 self._changed[envvar] = self._environ.get(envvar) 796 self._environ[envvar] = value 797 798 def __delitem__(self, envvar): 799 # Remember the initial value on the first access 800 if envvar not in self._changed: 801 self._changed[envvar] = self._environ.get(envvar) 802 if envvar in self._environ: 803 del self._environ[envvar] 804 805 def keys(self): 806 return self._environ.keys() 807 808 def set(self, envvar, value): 809 self[envvar] = value 810 811 def unset(self, envvar): 812 del self[envvar] 813 814 def __enter__(self): 815 return self 816 817 def __exit__(self, *ignore_exc): 818 for (k, v) in self._changed.items(): 819 if v is None: 820 if k in self._environ: 821 del self._environ[k] 822 else: 823 self._environ[k] = v 824 os.environ = self._environ 825 826 827class DirsOnSysPath(object): 828 """Context manager to temporarily add directories to sys.path. 829 830 This makes a copy of sys.path, appends any directories given 831 as positional arguments, then reverts sys.path to the copied 832 settings when the context ends. 833 834 Note that *all* sys.path modifications in the body of the 835 context manager, including replacement of the object, 836 will be reverted at the end of the block. 837 """ 838 839 def __init__(self, *paths): 840 self.original_value = sys.path[:] 841 self.original_object = sys.path 842 sys.path.extend(paths) 843 844 def __enter__(self): 845 return self 846 847 def __exit__(self, *ignore_exc): 848 sys.path = self.original_object 849 sys.path[:] = self.original_value 850 851 852class TransientResource(object): 853 854 """Raise ResourceDenied if an exception is raised while the context manager 855 is in effect that matches the specified exception and attributes.""" 856 857 def __init__(self, exc, **kwargs): 858 self.exc = exc 859 self.attrs = kwargs 860 861 def __enter__(self): 862 return self 863 864 def __exit__(self, type_=None, value=None, traceback=None): 865 """If type_ is a subclass of self.exc and value has attributes matching 866 self.attrs, raise ResourceDenied. Otherwise let the exception 867 propagate (if any).""" 868 if type_ is not None and issubclass(self.exc, type_): 869 for attr, attr_value in self.attrs.iteritems(): 870 if not hasattr(value, attr): 871 break 872 if getattr(value, attr) != attr_value: 873 break 874 else: 875 raise ResourceDenied("an optional resource is not available") 876 877 878@contextlib.contextmanager 879def transient_internet(resource_name, timeout=30.0, errnos=()): 880 """Return a context manager that raises ResourceDenied when various issues 881 with the Internet connection manifest themselves as exceptions.""" 882 default_errnos = [ 883 ('ECONNREFUSED', 111), 884 ('ECONNRESET', 104), 885 ('EHOSTUNREACH', 113), 886 ('ENETUNREACH', 101), 887 ('ETIMEDOUT', 110), 888 ] 889 default_gai_errnos = [ 890 ('EAI_AGAIN', -3), 891 ('EAI_FAIL', -4), 892 ('EAI_NONAME', -2), 893 ('EAI_NODATA', -5), 894 # Windows defines EAI_NODATA as 11001 but idiotic getaddrinfo() 895 # implementation actually returns WSANO_DATA i.e. 11004. 896 ('WSANO_DATA', 11004), 897 ] 898 899 denied = ResourceDenied("Resource '%s' is not available" % resource_name) 900 captured_errnos = errnos 901 gai_errnos = [] 902 if not captured_errnos: 903 captured_errnos = [getattr(errno, name, num) 904 for (name, num) in default_errnos] 905 gai_errnos = [getattr(socket, name, num) 906 for (name, num) in default_gai_errnos] 907 908 def filter_error(err): 909 n = getattr(err, 'errno', None) 910 if (isinstance(err, socket.timeout) or 911 (isinstance(err, socket.gaierror) and n in gai_errnos) or 912 n in captured_errnos): 913 if not verbose: 914 sys.stderr.write(denied.args[0] + "\n") 915 raise denied 916 917 old_timeout = socket.getdefaulttimeout() 918 try: 919 if timeout is not None: 920 socket.setdefaulttimeout(timeout) 921 yield 922 except IOError as err: 923 # urllib can wrap original socket errors multiple times (!), we must 924 # unwrap to get at the original error. 925 while True: 926 a = err.args 927 if len(a) >= 1 and isinstance(a[0], IOError): 928 err = a[0] 929 # The error can also be wrapped as args[1]: 930 # except socket.error as msg: 931 # raise IOError('socket error', msg).with_traceback(sys.exc_info()[2]) 932 elif len(a) >= 2 and isinstance(a[1], IOError): 933 err = a[1] 934 else: 935 break 936 filter_error(err) 937 raise 938 # XXX should we catch generic exceptions and look for their 939 # __cause__ or __context__? 940 finally: 941 socket.setdefaulttimeout(old_timeout) 942 943 944@contextlib.contextmanager 945def captured_output(stream_name): 946 """Return a context manager used by captured_stdout and captured_stdin 947 that temporarily replaces the sys stream *stream_name* with a StringIO.""" 948 import StringIO 949 orig_stdout = getattr(sys, stream_name) 950 setattr(sys, stream_name, StringIO.StringIO()) 951 try: 952 yield getattr(sys, stream_name) 953 finally: 954 setattr(sys, stream_name, orig_stdout) 955 956def captured_stdout(): 957 """Capture the output of sys.stdout: 958 959 with captured_stdout() as s: 960 print "hello" 961 self.assertEqual(s.getvalue(), "hello") 962 """ 963 return captured_output("stdout") 964 965def captured_stderr(): 966 return captured_output("stderr") 967 968def captured_stdin(): 969 return captured_output("stdin") 970 971def gc_collect(): 972 """Force as many objects as possible to be collected. 973 974 In non-CPython implementations of Python, this is needed because timely 975 deallocation is not guaranteed by the garbage collector. (Even in CPython 976 this can be the case in case of reference cycles.) This means that __del__ 977 methods may be called later than expected and weakrefs may remain alive for 978 longer than expected. This function tries its best to force all garbage 979 objects to disappear. 980 """ 981 gc.collect() 982 if is_jython: 983 time.sleep(0.1) 984 gc.collect() 985 gc.collect() 986 987 988_header = '2P' 989if hasattr(sys, "gettotalrefcount"): 990 _header = '2P' + _header 991_vheader = _header + 'P' 992 993def calcobjsize(fmt): 994 return struct.calcsize(_header + fmt + '0P') 995 996def calcvobjsize(fmt): 997 return struct.calcsize(_vheader + fmt + '0P') 998 999 1000_TPFLAGS_HAVE_GC = 1<<14 1001_TPFLAGS_HEAPTYPE = 1<<9 1002 1003def check_sizeof(test, o, size): 1004 import _testcapi 1005 result = sys.getsizeof(o) 1006 # add GC header size 1007 if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\ 1008 ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))): 1009 size += _testcapi.SIZEOF_PYGC_HEAD 1010 msg = 'wrong size for %s: got %d, expected %d' \ 1011 % (type(o), result, size) 1012 test.assertEqual(result, size, msg) 1013 1014 1015#======================================================================= 1016# Decorator for running a function in a different locale, correctly resetting 1017# it afterwards. 1018 1019def run_with_locale(catstr, *locales): 1020 def decorator(func): 1021 def inner(*args, **kwds): 1022 try: 1023 import locale 1024 category = getattr(locale, catstr) 1025 orig_locale = locale.setlocale(category) 1026 except AttributeError: 1027 # if the test author gives us an invalid category string 1028 raise 1029 except: 1030 # cannot retrieve original locale, so do nothing 1031 locale = orig_locale = None 1032 else: 1033 for loc in locales: 1034 try: 1035 locale.setlocale(category, loc) 1036 break 1037 except: 1038 pass 1039 1040 # now run the function, resetting the locale on exceptions 1041 try: 1042 return func(*args, **kwds) 1043 finally: 1044 if locale and orig_locale: 1045 locale.setlocale(category, orig_locale) 1046 inner.func_name = func.func_name 1047 inner.__doc__ = func.__doc__ 1048 return inner 1049 return decorator 1050 1051#======================================================================= 1052# Big-memory-test support. Separate from 'resources' because memory use should be configurable. 1053 1054# Some handy shorthands. Note that these are used for byte-limits as well 1055# as size-limits, in the various bigmem tests 1056_1M = 1024*1024 1057_1G = 1024 * _1M 1058_2G = 2 * _1G 1059_4G = 4 * _1G 1060 1061MAX_Py_ssize_t = sys.maxsize 1062 1063def set_memlimit(limit): 1064 global max_memuse 1065 global real_max_memuse 1066 sizes = { 1067 'k': 1024, 1068 'm': _1M, 1069 'g': _1G, 1070 't': 1024*_1G, 1071 } 1072 m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, 1073 re.IGNORECASE | re.VERBOSE) 1074 if m is None: 1075 raise ValueError('Invalid memory limit %r' % (limit,)) 1076 memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) 1077 real_max_memuse = memlimit 1078 if memlimit > MAX_Py_ssize_t: 1079 memlimit = MAX_Py_ssize_t 1080 if memlimit < _2G - 1: 1081 raise ValueError('Memory limit %r too low to be useful' % (limit,)) 1082 max_memuse = memlimit 1083 1084def bigmemtest(minsize, memuse, overhead=5*_1M): 1085 """Decorator for bigmem tests. 1086 1087 'minsize' is the minimum useful size for the test (in arbitrary, 1088 test-interpreted units.) 'memuse' is the number of 'bytes per size' for 1089 the test, or a good estimate of it. 'overhead' specifies fixed overhead, 1090 independent of the testsize, and defaults to 5Mb. 1091 1092 The decorator tries to guess a good value for 'size' and passes it to 1093 the decorated test function. If minsize * memuse is more than the 1094 allowed memory use (as defined by max_memuse), the test is skipped. 1095 Otherwise, minsize is adjusted upward to use up to max_memuse. 1096 """ 1097 def decorator(f): 1098 def wrapper(self): 1099 if not max_memuse: 1100 # If max_memuse is 0 (the default), 1101 # we still want to run the tests with size set to a few kb, 1102 # to make sure they work. We still want to avoid using 1103 # too much memory, though, but we do that noisily. 1104 maxsize = 5147 1105 self.assertFalse(maxsize * memuse + overhead > 20 * _1M) 1106 else: 1107 maxsize = int((max_memuse - overhead) / memuse) 1108 if maxsize < minsize: 1109 # Really ought to print 'test skipped' or something 1110 if verbose: 1111 sys.stderr.write("Skipping %s because of memory " 1112 "constraint\n" % (f.__name__,)) 1113 return 1114 # Try to keep some breathing room in memory use 1115 maxsize = max(maxsize - 50 * _1M, minsize) 1116 return f(self, maxsize) 1117 wrapper.minsize = minsize 1118 wrapper.memuse = memuse 1119 wrapper.overhead = overhead 1120 return wrapper 1121 return decorator 1122 1123def precisionbigmemtest(size, memuse, overhead=5*_1M, dry_run=True): 1124 def decorator(f): 1125 def wrapper(self): 1126 if not real_max_memuse: 1127 maxsize = 5147 1128 else: 1129 maxsize = size 1130 1131 if ((real_max_memuse or not dry_run) 1132 and real_max_memuse < maxsize * memuse): 1133 if verbose: 1134 sys.stderr.write("Skipping %s because of memory " 1135 "constraint\n" % (f.__name__,)) 1136 return 1137 1138 return f(self, maxsize) 1139 wrapper.size = size 1140 wrapper.memuse = memuse 1141 wrapper.overhead = overhead 1142 return wrapper 1143 return decorator 1144 1145def bigaddrspacetest(f): 1146 """Decorator for tests that fill the address space.""" 1147 def wrapper(self): 1148 if max_memuse < MAX_Py_ssize_t: 1149 if verbose: 1150 sys.stderr.write("Skipping %s because of memory " 1151 "constraint\n" % (f.__name__,)) 1152 else: 1153 return f(self) 1154 return wrapper 1155 1156#======================================================================= 1157# unittest integration. 1158 1159class BasicTestRunner: 1160 def run(self, test): 1161 result = unittest.TestResult() 1162 test(result) 1163 return result 1164 1165def _id(obj): 1166 return obj 1167 1168def requires_resource(resource): 1169 if resource == 'gui' and not _is_gui_available(): 1170 return unittest.skip("resource 'gui' is not available") 1171 if is_resource_enabled(resource): 1172 return _id 1173 else: 1174 return unittest.skip("resource {0!r} is not enabled".format(resource)) 1175 1176def cpython_only(test): 1177 """ 1178 Decorator for tests only applicable on CPython. 1179 """ 1180 return impl_detail(cpython=True)(test) 1181 1182def impl_detail(msg=None, **guards): 1183 if check_impl_detail(**guards): 1184 return _id 1185 if msg is None: 1186 guardnames, default = _parse_guards(guards) 1187 if default: 1188 msg = "implementation detail not available on {0}" 1189 else: 1190 msg = "implementation detail specific to {0}" 1191 guardnames = sorted(guardnames.keys()) 1192 msg = msg.format(' or '.join(guardnames)) 1193 return unittest.skip(msg) 1194 1195def _parse_guards(guards): 1196 # Returns a tuple ({platform_name: run_me}, default_value) 1197 if not guards: 1198 return ({'cpython': True}, False) 1199 is_true = guards.values()[0] 1200 assert guards.values() == [is_true] * len(guards) # all True or all False 1201 return (guards, not is_true) 1202 1203# Use the following check to guard CPython's implementation-specific tests -- 1204# or to run them only on the implementation(s) guarded by the arguments. 1205def check_impl_detail(**guards): 1206 """This function returns True or False depending on the host platform. 1207 Examples: 1208 if check_impl_detail(): # only on CPython (default) 1209 if check_impl_detail(jython=True): # only on Jython 1210 if check_impl_detail(cpython=False): # everywhere except on CPython 1211 """ 1212 guards, default = _parse_guards(guards) 1213 return guards.get(platform.python_implementation().lower(), default) 1214 1215 1216 1217def _run_suite(suite): 1218 """Run tests from a unittest.TestSuite-derived class.""" 1219 if verbose: 1220 runner = unittest.TextTestRunner(sys.stdout, verbosity=2) 1221 else: 1222 runner = BasicTestRunner() 1223 1224 result = runner.run(suite) 1225 if not result.wasSuccessful(): 1226 if len(result.errors) == 1 and not result.failures: 1227 err = result.errors[0][1] 1228 elif len(result.failures) == 1 and not result.errors: 1229 err = result.failures[0][1] 1230 else: 1231 err = "multiple errors occurred" 1232 if not verbose: 1233 err += "; run in verbose mode for details" 1234 raise TestFailed(err) 1235 1236 1237def run_unittest(*classes): 1238 """Run tests from unittest.TestCase-derived classes.""" 1239 valid_types = (unittest.TestSuite, unittest.TestCase) 1240 suite = unittest.TestSuite() 1241 for cls in classes: 1242 if isinstance(cls, str): 1243 if cls in sys.modules: 1244 suite.addTest(unittest.findTestCases(sys.modules[cls])) 1245 else: 1246 raise ValueError("str arguments must be keys in sys.modules") 1247 elif isinstance(cls, valid_types): 1248 suite.addTest(cls) 1249 else: 1250 suite.addTest(unittest.makeSuite(cls)) 1251 _run_suite(suite) 1252 1253#======================================================================= 1254# Check for the presence of docstrings. 1255 1256HAVE_DOCSTRINGS = (check_impl_detail(cpython=False) or 1257 sys.platform == 'win32' or 1258 sysconfig.get_config_var('WITH_DOC_STRINGS')) 1259 1260requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS, 1261 "test requires docstrings") 1262 1263 1264#======================================================================= 1265# doctest driver. 1266 1267def run_doctest(module, verbosity=None): 1268 """Run doctest on the given module. Return (#failures, #tests). 1269 1270 If optional argument verbosity is not specified (or is None), pass 1271 test_support's belief about verbosity on to doctest. Else doctest's 1272 usual behavior is used (it searches sys.argv for -v). 1273 """ 1274 1275 import doctest 1276 1277 if verbosity is None: 1278 verbosity = verbose 1279 else: 1280 verbosity = None 1281 1282 # Direct doctest output (normally just errors) to real stdout; doctest 1283 # output shouldn't be compared by regrtest. 1284 save_stdout = sys.stdout 1285 sys.stdout = get_original_stdout() 1286 try: 1287 f, t = doctest.testmod(module, verbose=verbosity) 1288 if f: 1289 raise TestFailed("%d of %d doctests failed" % (f, t)) 1290 finally: 1291 sys.stdout = save_stdout 1292 if verbose: 1293 print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t) 1294 return f, t 1295 1296#======================================================================= 1297# Threading support to prevent reporting refleaks when running regrtest.py -R 1298 1299# NOTE: we use thread._count() rather than threading.enumerate() (or the 1300# moral equivalent thereof) because a threading.Thread object is still alive 1301# until its __bootstrap() method has returned, even after it has been 1302# unregistered from the threading module. 1303# thread._count(), on the other hand, only gets decremented *after* the 1304# __bootstrap() method has returned, which gives us reliable reference counts 1305# at the end of a test run. 1306 1307def threading_setup(): 1308 if thread: 1309 return thread._count(), 1310 else: 1311 return 1, 1312 1313def threading_cleanup(nb_threads): 1314 if not thread: 1315 return 1316 1317 _MAX_COUNT = 10 1318 for count in range(_MAX_COUNT): 1319 n = thread._count() 1320 if n == nb_threads: 1321 break 1322 time.sleep(0.1) 1323 # XXX print a warning in case of failure? 1324 1325def reap_threads(func): 1326 """Use this function when threads are being used. This will 1327 ensure that the threads are cleaned up even when the test fails. 1328 If threading is unavailable this function does nothing. 1329 """ 1330 if not thread: 1331 return func 1332 1333 @functools.wraps(func) 1334 def decorator(*args): 1335 key = threading_setup() 1336 try: 1337 return func(*args) 1338 finally: 1339 threading_cleanup(*key) 1340 return decorator 1341 1342def reap_children(): 1343 """Use this function at the end of test_main() whenever sub-processes 1344 are started. This will help ensure that no extra children (zombies) 1345 stick around to hog resources and create problems when looking 1346 for refleaks. 1347 """ 1348 1349 # Reap all our dead child processes so we don't leave zombies around. 1350 # These hog resources and might be causing some of the buildbots to die. 1351 if hasattr(os, 'waitpid'): 1352 any_process = -1 1353 while True: 1354 try: 1355 # This will raise an exception on Windows. That's ok. 1356 pid, status = os.waitpid(any_process, os.WNOHANG) 1357 if pid == 0: 1358 break 1359 except: 1360 break 1361 1362@contextlib.contextmanager 1363def swap_attr(obj, attr, new_val): 1364 """Temporary swap out an attribute with a new object. 1365 1366 Usage: 1367 with swap_attr(obj, "attr", 5): 1368 ... 1369 1370 This will set obj.attr to 5 for the duration of the with: block, 1371 restoring the old value at the end of the block. If `attr` doesn't 1372 exist on `obj`, it will be created and then deleted at the end of the 1373 block. 1374 """ 1375 if hasattr(obj, attr): 1376 real_val = getattr(obj, attr) 1377 setattr(obj, attr, new_val) 1378 try: 1379 yield 1380 finally: 1381 setattr(obj, attr, real_val) 1382 else: 1383 setattr(obj, attr, new_val) 1384 try: 1385 yield 1386 finally: 1387 delattr(obj, attr) 1388 1389def py3k_bytes(b): 1390 """Emulate the py3k bytes() constructor. 1391 1392 NOTE: This is only a best effort function. 1393 """ 1394 try: 1395 # memoryview? 1396 return b.tobytes() 1397 except AttributeError: 1398 try: 1399 # iterable of ints? 1400 return b"".join(chr(x) for x in b) 1401 except TypeError: 1402 return bytes(b) 1403 1404def args_from_interpreter_flags(): 1405 """Return a list of command-line arguments reproducing the current 1406 settings in sys.flags.""" 1407 import subprocess 1408 return subprocess._args_from_interpreter_flags() 1409 1410def strip_python_stderr(stderr): 1411 """Strip the stderr of a Python process from potential debug output 1412 emitted by the interpreter. 1413 1414 This will typically be run on the result of the communicate() method 1415 of a subprocess.Popen object. 1416 """ 1417 stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip() 1418 return stderr 1419