test_support.py revision 76249ea4a7ab1cb0fa41d967b2fb8975916cb955
1"""Supporting definitions for the Python regression tests."""
2
3if __name__ != 'test.test_support':
4    raise ImportError('test_support must be imported from the test package')
5
6import contextlib
7import errno
8import functools
9import gc
10import socket
11import sys
12import os
13import platform
14import shutil
15import warnings
16import unittest
17import importlib
18import UserDict
19import re
20import time
21import struct
22import sysconfig
23try:
24    import thread
25except ImportError:
26    thread = None
27
28__all__ = ["Error", "TestFailed", "ResourceDenied", "import_module",
29           "verbose", "use_resources", "max_memuse", "record_original_stdout",
30           "get_original_stdout", "unload", "unlink", "rmtree", "forget",
31           "is_resource_enabled", "requires", "find_unused_port", "bind_port",
32           "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ",
33           "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error",
34           "open_urlresource", "check_warnings", "check_py3k_warnings",
35           "CleanImport", "EnvironmentVarGuard", "captured_output",
36           "captured_stdout", "TransientResource", "transient_internet",
37           "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
38           "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
39           "threading_cleanup", "reap_children", "cpython_only",
40           "check_impl_detail", "get_attribute", "py3k_bytes",
41           "import_fresh_module", "threading_cleanup", "reap_children",
42           "strip_python_stderr"]
43
44class Error(Exception):
45    """Base class for regression test exceptions."""
46
47class TestFailed(Error):
48    """Test failed."""
49
50class ResourceDenied(unittest.SkipTest):
51    """Test skipped because it requested a disallowed resource.
52
53    This is raised when a test calls requires() for a resource that
54    has not been enabled.  It is used to distinguish between expected
55    and unexpected skips.
56    """
57
58@contextlib.contextmanager
59def _ignore_deprecated_imports(ignore=True):
60    """Context manager to suppress package and module deprecation
61    warnings when importing them.
62
63    If ignore is False, this context manager has no effect."""
64    if ignore:
65        with warnings.catch_warnings():
66            warnings.filterwarnings("ignore", ".+ (module|package)",
67                                    DeprecationWarning)
68            yield
69    else:
70        yield
71
72
73def import_module(name, deprecated=False):
74    """Import and return the module to be tested, raising SkipTest if
75    it is not available.
76
77    If deprecated is True, any module or package deprecation messages
78    will be suppressed."""
79    with _ignore_deprecated_imports(deprecated):
80        try:
81            return importlib.import_module(name)
82        except ImportError, msg:
83            raise unittest.SkipTest(str(msg))
84
85
86def _save_and_remove_module(name, orig_modules):
87    """Helper function to save and remove a module from sys.modules
88
89       Raise ImportError if the module can't be imported."""
90    # try to import the module and raise an error if it can't be imported
91    if name not in sys.modules:
92        __import__(name)
93        del sys.modules[name]
94    for modname in list(sys.modules):
95        if modname == name or modname.startswith(name + '.'):
96            orig_modules[modname] = sys.modules[modname]
97            del sys.modules[modname]
98
99def _save_and_block_module(name, orig_modules):
100    """Helper function to save and block a module in sys.modules
101
102       Return True if the module was in sys.modules, False otherwise."""
103    saved = True
104    try:
105        orig_modules[name] = sys.modules[name]
106    except KeyError:
107        saved = False
108    sys.modules[name] = None
109    return saved
110
111
112def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
113    """Imports and returns a module, deliberately bypassing the sys.modules cache
114    and importing a fresh copy of the module. Once the import is complete,
115    the sys.modules cache is restored to its original state.
116
117    Modules named in fresh are also imported anew if needed by the import.
118    If one of these modules can't be imported, None is returned.
119
120    Importing of modules named in blocked is prevented while the fresh import
121    takes place.
122
123    If deprecated is True, any module or package deprecation messages
124    will be suppressed."""
125    # NOTE: test_heapq, test_json, and test_warnings include extra sanity
126    # checks to make sure that this utility function is working as expected
127    with _ignore_deprecated_imports(deprecated):
128        # Keep track of modules saved for later restoration as well
129        # as those which just need a blocking entry removed
130        orig_modules = {}
131        names_to_remove = []
132        _save_and_remove_module(name, orig_modules)
133        try:
134            for fresh_name in fresh:
135                _save_and_remove_module(fresh_name, orig_modules)
136            for blocked_name in blocked:
137                if not _save_and_block_module(blocked_name, orig_modules):
138                    names_to_remove.append(blocked_name)
139            fresh_module = importlib.import_module(name)
140        except ImportError:
141            fresh_module = None
142        finally:
143            for orig_name, module in orig_modules.items():
144                sys.modules[orig_name] = module
145            for name_to_remove in names_to_remove:
146                del sys.modules[name_to_remove]
147        return fresh_module
148
149
150def get_attribute(obj, name):
151    """Get an attribute, raising SkipTest if AttributeError is raised."""
152    try:
153        attribute = getattr(obj, name)
154    except AttributeError:
155        raise unittest.SkipTest("module %s has no attribute %s" % (
156            obj.__name__, name))
157    else:
158        return attribute
159
160
161verbose = 1              # Flag set to 0 by regrtest.py
162use_resources = None     # Flag set to [] by regrtest.py
163max_memuse = 0           # Disable bigmem tests (they will still be run with
164                         # small sizes, to make sure they work.)
165real_max_memuse = 0
166
167# _original_stdout is meant to hold stdout at the time regrtest began.
168# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
169# The point is to have some flavor of stdout the user can actually see.
170_original_stdout = None
171def record_original_stdout(stdout):
172    global _original_stdout
173    _original_stdout = stdout
174
175def get_original_stdout():
176    return _original_stdout or sys.stdout
177
178def unload(name):
179    try:
180        del sys.modules[name]
181    except KeyError:
182        pass
183
184if sys.platform.startswith("win"):
185    def _waitfor(func, pathname, waitall=False):
186        # Perform the operation
187        func(pathname)
188        # Now setup the wait loop
189        if waitall:
190            dirname = pathname
191        else:
192            dirname, name = os.path.split(pathname)
193            dirname = dirname or '.'
194        # Check for `pathname` to be removed from the filesystem.
195        # The exponential backoff of the timeout amounts to a total
196        # of ~1 second after which the deletion is probably an error
197        # anyway.
198        # Testing on a i7@4.3GHz shows that usually only 1 iteration is
199        # required when contention occurs.
200        timeout = 0.001
201        while timeout < 1.0:
202            # Note we are only testing for the existence of the file(s) in
203            # the contents of the directory regardless of any security or
204            # access rights.  If we have made it this far, we have sufficient
205            # permissions to do that much using Python's equivalent of the
206            # Windows API FindFirstFile.
207            # Other Windows APIs can fail or give incorrect results when
208            # dealing with files that are pending deletion.
209            L = os.listdir(dirname)
210            if not (L if waitall else name in L):
211                return
212            # Increase the timeout and try again
213            time.sleep(timeout)
214            timeout *= 2
215        warnings.warn('tests may fail, delete still pending for ' + pathname,
216                      RuntimeWarning, stacklevel=4)
217
218    def _unlink(filename):
219        _waitfor(os.unlink, filename)
220
221    def _rmdir(dirname):
222        _waitfor(os.rmdir, dirname)
223
224    def _rmtree(path):
225        def _rmtree_inner(path):
226            for name in os.listdir(path):
227                fullname = os.path.join(path, name)
228                if os.path.isdir(fullname):
229                    _waitfor(_rmtree_inner, fullname, waitall=True)
230                    os.rmdir(fullname)
231                else:
232                    os.unlink(fullname)
233        _waitfor(_rmtree_inner, path, waitall=True)
234        _waitfor(os.rmdir, path)
235else:
236    _unlink = os.unlink
237    _rmdir = os.rmdir
238    _rmtree = shutil.rmtree
239
240def unlink(filename):
241    try:
242        _unlink(filename)
243    except OSError:
244        pass
245
246def rmdir(dirname):
247    try:
248        _rmdir(dirname)
249    except OSError as error:
250        # The directory need not exist.
251        if error.errno != errno.ENOENT:
252            raise
253
254def rmtree(path):
255    try:
256        _rmtree(path)
257    except OSError, e:
258        # Unix returns ENOENT, Windows returns ESRCH.
259        if e.errno not in (errno.ENOENT, errno.ESRCH):
260            raise
261
262def forget(modname):
263    '''"Forget" a module was ever imported by removing it from sys.modules and
264    deleting any .pyc and .pyo files.'''
265    unload(modname)
266    for dirname in sys.path:
267        unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
268        # Deleting the .pyo file cannot be within the 'try' for the .pyc since
269        # the chance exists that there is no .pyc (and thus the 'try' statement
270        # is exited) but there is a .pyo file.
271        unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
272
273# On some platforms, should not run gui test even if it is allowed
274# in `use_resources'.
275if sys.platform.startswith('win'):
276    import ctypes
277    import ctypes.wintypes
278    def _is_gui_available():
279        UOI_FLAGS = 1
280        WSF_VISIBLE = 0x0001
281        class USEROBJECTFLAGS(ctypes.Structure):
282            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
283                        ("fReserved", ctypes.wintypes.BOOL),
284                        ("dwFlags", ctypes.wintypes.DWORD)]
285        dll = ctypes.windll.user32
286        h = dll.GetProcessWindowStation()
287        if not h:
288            raise ctypes.WinError()
289        uof = USEROBJECTFLAGS()
290        needed = ctypes.wintypes.DWORD()
291        res = dll.GetUserObjectInformationW(h,
292            UOI_FLAGS,
293            ctypes.byref(uof),
294            ctypes.sizeof(uof),
295            ctypes.byref(needed))
296        if not res:
297            raise ctypes.WinError()
298        return bool(uof.dwFlags & WSF_VISIBLE)
299else:
300    def _is_gui_available():
301        return True
302
303def is_resource_enabled(resource):
304    """Test whether a resource is enabled.  Known resources are set by
305    regrtest.py."""
306    return use_resources is not None and resource in use_resources
307
308def requires(resource, msg=None):
309    """Raise ResourceDenied if the specified resource is not available.
310
311    If the caller's module is __main__ then automatically return True.  The
312    possibility of False being returned occurs when regrtest.py is executing."""
313    if resource == 'gui' and not _is_gui_available():
314        raise unittest.SkipTest("Cannot use the 'gui' resource")
315    # see if the caller's module is __main__ - if so, treat as if
316    # the resource was set
317    if sys._getframe(1).f_globals.get("__name__") == "__main__":
318        return
319    if not is_resource_enabled(resource):
320        if msg is None:
321            msg = "Use of the `%s' resource not enabled" % resource
322        raise ResourceDenied(msg)
323
324
325# Don't use "localhost", since resolving it uses the DNS under recent
326# Windows versions (see issue #18792).
327HOST = "127.0.0.1"
328HOSTv6 = "::1"
329
330
331def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
332    """Returns an unused port that should be suitable for binding.  This is
333    achieved by creating a temporary socket with the same family and type as
334    the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
335    the specified host address (defaults to 0.0.0.0) with the port set to 0,
336    eliciting an unused ephemeral port from the OS.  The temporary socket is
337    then closed and deleted, and the ephemeral port is returned.
338
339    Either this method or bind_port() should be used for any tests where a
340    server socket needs to be bound to a particular port for the duration of
341    the test.  Which one to use depends on whether the calling code is creating
342    a python socket, or if an unused port needs to be provided in a constructor
343    or passed to an external program (i.e. the -accept argument to openssl's
344    s_server mode).  Always prefer bind_port() over find_unused_port() where
345    possible.  Hard coded ports should *NEVER* be used.  As soon as a server
346    socket is bound to a hard coded port, the ability to run multiple instances
347    of the test simultaneously on the same host is compromised, which makes the
348    test a ticking time bomb in a buildbot environment. On Unix buildbots, this
349    may simply manifest as a failed test, which can be recovered from without
350    intervention in most cases, but on Windows, the entire python process can
351    completely and utterly wedge, requiring someone to log in to the buildbot
352    and manually kill the affected process.
353
354    (This is easy to reproduce on Windows, unfortunately, and can be traced to
355    the SO_REUSEADDR socket option having different semantics on Windows versus
356    Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
357    listen and then accept connections on identical host/ports.  An EADDRINUSE
358    socket.error will be raised at some point (depending on the platform and
359    the order bind and listen were called on each socket).
360
361    However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
362    will ever be raised when attempting to bind two identical host/ports. When
363    accept() is called on each socket, the second caller's process will steal
364    the port from the first caller, leaving them both in an awkwardly wedged
365    state where they'll no longer respond to any signals or graceful kills, and
366    must be forcibly killed via OpenProcess()/TerminateProcess().
367
368    The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
369    instead of SO_REUSEADDR, which effectively affords the same semantics as
370    SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
371    Source world compared to Windows ones, this is a common mistake.  A quick
372    look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
373    openssl.exe is called with the 's_server' option, for example. See
374    http://bugs.python.org/issue2550 for more info.  The following site also
375    has a very thorough description about the implications of both REUSEADDR
376    and EXCLUSIVEADDRUSE on Windows:
377    http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
378
379    XXX: although this approach is a vast improvement on previous attempts to
380    elicit unused ports, it rests heavily on the assumption that the ephemeral
381    port returned to us by the OS won't immediately be dished back out to some
382    other process when we close and delete our temporary socket but before our
383    calling code has a chance to bind the returned port.  We can deal with this
384    issue if/when we come across it."""
385    tempsock = socket.socket(family, socktype)
386    port = bind_port(tempsock)
387    tempsock.close()
388    del tempsock
389    return port
390
391def bind_port(sock, host=HOST):
392    """Bind the socket to a free port and return the port number.  Relies on
393    ephemeral ports in order to ensure we are using an unbound port.  This is
394    important as many tests may be running simultaneously, especially in a
395    buildbot environment.  This method raises an exception if the sock.family
396    is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
397    or SO_REUSEPORT set on it.  Tests should *never* set these socket options
398    for TCP/IP sockets.  The only case for setting these options is testing
399    multicasting via multiple UDP sockets.
400
401    Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
402    on Windows), it will be set on the socket.  This will prevent anyone else
403    from bind()'ing to our host/port for the duration of the test.
404    """
405    if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
406        if hasattr(socket, 'SO_REUSEADDR'):
407            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
408                raise TestFailed("tests should never set the SO_REUSEADDR "   \
409                                 "socket option on TCP/IP sockets!")
410        if hasattr(socket, 'SO_REUSEPORT'):
411            try:
412                if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
413                    raise TestFailed("tests should never set the SO_REUSEPORT "   \
414                                     "socket option on TCP/IP sockets!")
415            except EnvironmentError:
416                # Python's socket module was compiled using modern headers
417                # thus defining SO_REUSEPORT but this process is running
418                # under an older kernel that does not support SO_REUSEPORT.
419                pass
420        if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
421            sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
422
423    sock.bind((host, 0))
424    port = sock.getsockname()[1]
425    return port
426
427FUZZ = 1e-6
428
429def fcmp(x, y): # fuzzy comparison function
430    if isinstance(x, float) or isinstance(y, float):
431        try:
432            fuzz = (abs(x) + abs(y)) * FUZZ
433            if abs(x-y) <= fuzz:
434                return 0
435        except:
436            pass
437    elif type(x) == type(y) and isinstance(x, (tuple, list)):
438        for i in range(min(len(x), len(y))):
439            outcome = fcmp(x[i], y[i])
440            if outcome != 0:
441                return outcome
442        return (len(x) > len(y)) - (len(x) < len(y))
443    return (x > y) - (x < y)
444
445
446# A constant likely larger than the underlying OS pipe buffer size, to
447# make writes blocking.
448# Windows limit seems to be around 512 B, and many Unix kernels have a
449# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
450# (see issue #17835 for a discussion of this number).
451PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
452
453# A constant likely larger than the underlying OS socket buffer size, to make
454# writes blocking.
455# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
456# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643
457# for a discussion of this number).
458SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
459
460try:
461    unicode
462    have_unicode = True
463except NameError:
464    have_unicode = False
465
466is_jython = sys.platform.startswith('java')
467
468# Filename used for testing
469if os.name == 'java':
470    # Jython disallows @ in module names
471    TESTFN = '$test'
472elif os.name == 'riscos':
473    TESTFN = 'testfile'
474else:
475    TESTFN = '@test'
476    # Unicode name only used if TEST_FN_ENCODING exists for the platform.
477    if have_unicode:
478        # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
479        # TESTFN_UNICODE is a filename that can be encoded using the
480        # file system encoding, but *not* with the default (ascii) encoding
481        if isinstance('', unicode):
482            # python -U
483            # XXX perhaps unicode() should accept Unicode strings?
484            TESTFN_UNICODE = "@test-\xe0\xf2"
485        else:
486            # 2 latin characters.
487            TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
488        TESTFN_ENCODING = sys.getfilesystemencoding()
489        # TESTFN_UNENCODABLE is a filename that should *not* be
490        # able to be encoded by *either* the default or filesystem encoding.
491        # This test really only makes sense on Windows NT platforms
492        # which have special Unicode support in posixmodule.
493        if (not hasattr(sys, "getwindowsversion") or
494                sys.getwindowsversion()[3] < 2): #  0=win32s or 1=9x/ME
495            TESTFN_UNENCODABLE = None
496        else:
497            # Japanese characters (I think - from bug 846133)
498            TESTFN_UNENCODABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
499            try:
500                # XXX - Note - should be using TESTFN_ENCODING here - but for
501                # Windows, "mbcs" currently always operates as if in
502                # errors=ignore' mode - hence we get '?' characters rather than
503                # the exception.  'Latin1' operates as we expect - ie, fails.
504                # See [ 850997 ] mbcs encoding ignores errors
505                TESTFN_UNENCODABLE.encode("Latin1")
506            except UnicodeEncodeError:
507                pass
508            else:
509                print \
510                'WARNING: The filename %r CAN be encoded by the filesystem.  ' \
511                'Unicode filename tests may not be effective' \
512                % TESTFN_UNENCODABLE
513
514
515# Disambiguate TESTFN for parallel testing, while letting it remain a valid
516# module name.
517TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
518
519# Save the initial cwd
520SAVEDCWD = os.getcwd()
521
522@contextlib.contextmanager
523def temp_cwd(name='tempcwd', quiet=False):
524    """
525    Context manager that creates a temporary directory and set it as CWD.
526
527    The new CWD is created in the current directory and it's named *name*.
528    If *quiet* is False (default) and it's not possible to create or change
529    the CWD, an error is raised.  If it's True, only a warning is raised
530    and the original CWD is used.
531    """
532    if have_unicode and isinstance(name, unicode):
533        try:
534            name = name.encode(sys.getfilesystemencoding() or 'ascii')
535        except UnicodeEncodeError:
536            if not quiet:
537                raise unittest.SkipTest('unable to encode the cwd name with '
538                                        'the filesystem encoding.')
539    saved_dir = os.getcwd()
540    is_temporary = False
541    try:
542        os.mkdir(name)
543        os.chdir(name)
544        is_temporary = True
545    except OSError:
546        if not quiet:
547            raise
548        warnings.warn('tests may fail, unable to change the CWD to ' + name,
549                      RuntimeWarning, stacklevel=3)
550    try:
551        yield os.getcwd()
552    finally:
553        os.chdir(saved_dir)
554        if is_temporary:
555            rmtree(name)
556
557
558def findfile(file, here=__file__, subdir=None):
559    """Try to find a file on sys.path and the working directory.  If it is not
560    found the argument passed to the function is returned (this does not
561    necessarily signal failure; could still be the legitimate path)."""
562    if os.path.isabs(file):
563        return file
564    if subdir is not None:
565        file = os.path.join(subdir, file)
566    path = sys.path
567    path = [os.path.dirname(here)] + path
568    for dn in path:
569        fn = os.path.join(dn, file)
570        if os.path.exists(fn): return fn
571    return file
572
573def sortdict(dict):
574    "Like repr(dict), but in sorted order."
575    items = dict.items()
576    items.sort()
577    reprpairs = ["%r: %r" % pair for pair in items]
578    withcommas = ", ".join(reprpairs)
579    return "{%s}" % withcommas
580
581def make_bad_fd():
582    """
583    Create an invalid file descriptor by opening and closing a file and return
584    its fd.
585    """
586    file = open(TESTFN, "wb")
587    try:
588        return file.fileno()
589    finally:
590        file.close()
591        unlink(TESTFN)
592
593def check_syntax_error(testcase, statement):
594    testcase.assertRaises(SyntaxError, compile, statement,
595                          '<test string>', 'exec')
596
597def open_urlresource(url, check=None):
598    import urlparse, urllib2
599
600    filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
601
602    fn = os.path.join(os.path.dirname(__file__), "data", filename)
603
604    def check_valid_file(fn):
605        f = open(fn)
606        if check is None:
607            return f
608        elif check(f):
609            f.seek(0)
610            return f
611        f.close()
612
613    if os.path.exists(fn):
614        f = check_valid_file(fn)
615        if f is not None:
616            return f
617        unlink(fn)
618
619    # Verify the requirement before downloading the file
620    requires('urlfetch')
621
622    print >> get_original_stdout(), '\tfetching %s ...' % url
623    f = urllib2.urlopen(url, timeout=15)
624    try:
625        with open(fn, "wb") as out:
626            s = f.read()
627            while s:
628                out.write(s)
629                s = f.read()
630    finally:
631        f.close()
632
633    f = check_valid_file(fn)
634    if f is not None:
635        return f
636    raise TestFailed('invalid resource "%s"' % fn)
637
638
639class WarningsRecorder(object):
640    """Convenience wrapper for the warnings list returned on
641       entry to the warnings.catch_warnings() context manager.
642    """
643    def __init__(self, warnings_list):
644        self._warnings = warnings_list
645        self._last = 0
646
647    def __getattr__(self, attr):
648        if len(self._warnings) > self._last:
649            return getattr(self._warnings[-1], attr)
650        elif attr in warnings.WarningMessage._WARNING_DETAILS:
651            return None
652        raise AttributeError("%r has no attribute %r" % (self, attr))
653
654    @property
655    def warnings(self):
656        return self._warnings[self._last:]
657
658    def reset(self):
659        self._last = len(self._warnings)
660
661
662def _filterwarnings(filters, quiet=False):
663    """Catch the warnings, then check if all the expected
664    warnings have been raised and re-raise unexpected warnings.
665    If 'quiet' is True, only re-raise the unexpected warnings.
666    """
667    # Clear the warning registry of the calling module
668    # in order to re-raise the warnings.
669    frame = sys._getframe(2)
670    registry = frame.f_globals.get('__warningregistry__')
671    if registry:
672        registry.clear()
673    with warnings.catch_warnings(record=True) as w:
674        # Set filter "always" to record all warnings.  Because
675        # test_warnings swap the module, we need to look up in
676        # the sys.modules dictionary.
677        sys.modules['warnings'].simplefilter("always")
678        yield WarningsRecorder(w)
679    # Filter the recorded warnings
680    reraise = [warning.message for warning in w]
681    missing = []
682    for msg, cat in filters:
683        seen = False
684        for exc in reraise[:]:
685            message = str(exc)
686            # Filter out the matching messages
687            if (re.match(msg, message, re.I) and
688                issubclass(exc.__class__, cat)):
689                seen = True
690                reraise.remove(exc)
691        if not seen and not quiet:
692            # This filter caught nothing
693            missing.append((msg, cat.__name__))
694    if reraise:
695        raise AssertionError("unhandled warning %r" % reraise[0])
696    if missing:
697        raise AssertionError("filter (%r, %s) did not catch any warning" %
698                             missing[0])
699
700
701@contextlib.contextmanager
702def check_warnings(*filters, **kwargs):
703    """Context manager to silence warnings.
704
705    Accept 2-tuples as positional arguments:
706        ("message regexp", WarningCategory)
707
708    Optional argument:
709     - if 'quiet' is True, it does not fail if a filter catches nothing
710        (default True without argument,
711         default False if some filters are defined)
712
713    Without argument, it defaults to:
714        check_warnings(("", Warning), quiet=True)
715    """
716    quiet = kwargs.get('quiet')
717    if not filters:
718        filters = (("", Warning),)
719        # Preserve backward compatibility
720        if quiet is None:
721            quiet = True
722    return _filterwarnings(filters, quiet)
723
724
725@contextlib.contextmanager
726def check_py3k_warnings(*filters, **kwargs):
727    """Context manager to silence py3k warnings.
728
729    Accept 2-tuples as positional arguments:
730        ("message regexp", WarningCategory)
731
732    Optional argument:
733     - if 'quiet' is True, it does not fail if a filter catches nothing
734        (default False)
735
736    Without argument, it defaults to:
737        check_py3k_warnings(("", DeprecationWarning), quiet=False)
738    """
739    if sys.py3kwarning:
740        if not filters:
741            filters = (("", DeprecationWarning),)
742    else:
743        # It should not raise any py3k warning
744        filters = ()
745    return _filterwarnings(filters, kwargs.get('quiet'))
746
747
748class CleanImport(object):
749    """Context manager to force import to return a new module reference.
750
751    This is useful for testing module-level behaviours, such as
752    the emission of a DeprecationWarning on import.
753
754    Use like this:
755
756        with CleanImport("foo"):
757            importlib.import_module("foo") # new reference
758    """
759
760    def __init__(self, *module_names):
761        self.original_modules = sys.modules.copy()
762        for module_name in module_names:
763            if module_name in sys.modules:
764                module = sys.modules[module_name]
765                # It is possible that module_name is just an alias for
766                # another module (e.g. stub for modules renamed in 3.x).
767                # In that case, we also need delete the real module to clear
768                # the import cache.
769                if module.__name__ != module_name:
770                    del sys.modules[module.__name__]
771                del sys.modules[module_name]
772
773    def __enter__(self):
774        return self
775
776    def __exit__(self, *ignore_exc):
777        sys.modules.update(self.original_modules)
778
779
780class EnvironmentVarGuard(UserDict.DictMixin):
781
782    """Class to help protect the environment variable properly.  Can be used as
783    a context manager."""
784
785    def __init__(self):
786        self._environ = os.environ
787        self._changed = {}
788
789    def __getitem__(self, envvar):
790        return self._environ[envvar]
791
792    def __setitem__(self, envvar, value):
793        # Remember the initial value on the first access
794        if envvar not in self._changed:
795            self._changed[envvar] = self._environ.get(envvar)
796        self._environ[envvar] = value
797
798    def __delitem__(self, envvar):
799        # Remember the initial value on the first access
800        if envvar not in self._changed:
801            self._changed[envvar] = self._environ.get(envvar)
802        if envvar in self._environ:
803            del self._environ[envvar]
804
805    def keys(self):
806        return self._environ.keys()
807
808    def set(self, envvar, value):
809        self[envvar] = value
810
811    def unset(self, envvar):
812        del self[envvar]
813
814    def __enter__(self):
815        return self
816
817    def __exit__(self, *ignore_exc):
818        for (k, v) in self._changed.items():
819            if v is None:
820                if k in self._environ:
821                    del self._environ[k]
822            else:
823                self._environ[k] = v
824        os.environ = self._environ
825
826
827class DirsOnSysPath(object):
828    """Context manager to temporarily add directories to sys.path.
829
830    This makes a copy of sys.path, appends any directories given
831    as positional arguments, then reverts sys.path to the copied
832    settings when the context ends.
833
834    Note that *all* sys.path modifications in the body of the
835    context manager, including replacement of the object,
836    will be reverted at the end of the block.
837    """
838
839    def __init__(self, *paths):
840        self.original_value = sys.path[:]
841        self.original_object = sys.path
842        sys.path.extend(paths)
843
844    def __enter__(self):
845        return self
846
847    def __exit__(self, *ignore_exc):
848        sys.path = self.original_object
849        sys.path[:] = self.original_value
850
851
852class TransientResource(object):
853
854    """Raise ResourceDenied if an exception is raised while the context manager
855    is in effect that matches the specified exception and attributes."""
856
857    def __init__(self, exc, **kwargs):
858        self.exc = exc
859        self.attrs = kwargs
860
861    def __enter__(self):
862        return self
863
864    def __exit__(self, type_=None, value=None, traceback=None):
865        """If type_ is a subclass of self.exc and value has attributes matching
866        self.attrs, raise ResourceDenied.  Otherwise let the exception
867        propagate (if any)."""
868        if type_ is not None and issubclass(self.exc, type_):
869            for attr, attr_value in self.attrs.iteritems():
870                if not hasattr(value, attr):
871                    break
872                if getattr(value, attr) != attr_value:
873                    break
874            else:
875                raise ResourceDenied("an optional resource is not available")
876
877
878@contextlib.contextmanager
879def transient_internet(resource_name, timeout=30.0, errnos=()):
880    """Return a context manager that raises ResourceDenied when various issues
881    with the Internet connection manifest themselves as exceptions."""
882    default_errnos = [
883        ('ECONNREFUSED', 111),
884        ('ECONNRESET', 104),
885        ('EHOSTUNREACH', 113),
886        ('ENETUNREACH', 101),
887        ('ETIMEDOUT', 110),
888    ]
889    default_gai_errnos = [
890        ('EAI_AGAIN', -3),
891        ('EAI_FAIL', -4),
892        ('EAI_NONAME', -2),
893        ('EAI_NODATA', -5),
894        # Windows defines EAI_NODATA as 11001 but idiotic getaddrinfo()
895        # implementation actually returns WSANO_DATA i.e. 11004.
896        ('WSANO_DATA', 11004),
897    ]
898
899    denied = ResourceDenied("Resource '%s' is not available" % resource_name)
900    captured_errnos = errnos
901    gai_errnos = []
902    if not captured_errnos:
903        captured_errnos = [getattr(errno, name, num)
904                           for (name, num) in default_errnos]
905        gai_errnos = [getattr(socket, name, num)
906                      for (name, num) in default_gai_errnos]
907
908    def filter_error(err):
909        n = getattr(err, 'errno', None)
910        if (isinstance(err, socket.timeout) or
911            (isinstance(err, socket.gaierror) and n in gai_errnos) or
912            n in captured_errnos):
913            if not verbose:
914                sys.stderr.write(denied.args[0] + "\n")
915            raise denied
916
917    old_timeout = socket.getdefaulttimeout()
918    try:
919        if timeout is not None:
920            socket.setdefaulttimeout(timeout)
921        yield
922    except IOError as err:
923        # urllib can wrap original socket errors multiple times (!), we must
924        # unwrap to get at the original error.
925        while True:
926            a = err.args
927            if len(a) >= 1 and isinstance(a[0], IOError):
928                err = a[0]
929            # The error can also be wrapped as args[1]:
930            #    except socket.error as msg:
931            #        raise IOError('socket error', msg).with_traceback(sys.exc_info()[2])
932            elif len(a) >= 2 and isinstance(a[1], IOError):
933                err = a[1]
934            else:
935                break
936        filter_error(err)
937        raise
938    # XXX should we catch generic exceptions and look for their
939    # __cause__ or __context__?
940    finally:
941        socket.setdefaulttimeout(old_timeout)
942
943
944@contextlib.contextmanager
945def captured_output(stream_name):
946    """Return a context manager used by captured_stdout and captured_stdin
947    that temporarily replaces the sys stream *stream_name* with a StringIO."""
948    import StringIO
949    orig_stdout = getattr(sys, stream_name)
950    setattr(sys, stream_name, StringIO.StringIO())
951    try:
952        yield getattr(sys, stream_name)
953    finally:
954        setattr(sys, stream_name, orig_stdout)
955
956def captured_stdout():
957    """Capture the output of sys.stdout:
958
959       with captured_stdout() as s:
960           print "hello"
961       self.assertEqual(s.getvalue(), "hello")
962    """
963    return captured_output("stdout")
964
965def captured_stderr():
966    return captured_output("stderr")
967
968def captured_stdin():
969    return captured_output("stdin")
970
971def gc_collect():
972    """Force as many objects as possible to be collected.
973
974    In non-CPython implementations of Python, this is needed because timely
975    deallocation is not guaranteed by the garbage collector.  (Even in CPython
976    this can be the case in case of reference cycles.)  This means that __del__
977    methods may be called later than expected and weakrefs may remain alive for
978    longer than expected.  This function tries its best to force all garbage
979    objects to disappear.
980    """
981    gc.collect()
982    if is_jython:
983        time.sleep(0.1)
984    gc.collect()
985    gc.collect()
986
987
988_header = '2P'
989if hasattr(sys, "gettotalrefcount"):
990    _header = '2P' + _header
991_vheader = _header + 'P'
992
993def calcobjsize(fmt):
994    return struct.calcsize(_header + fmt + '0P')
995
996def calcvobjsize(fmt):
997    return struct.calcsize(_vheader + fmt + '0P')
998
999
1000_TPFLAGS_HAVE_GC = 1<<14
1001_TPFLAGS_HEAPTYPE = 1<<9
1002
1003def check_sizeof(test, o, size):
1004    import _testcapi
1005    result = sys.getsizeof(o)
1006    # add GC header size
1007    if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
1008        ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
1009        size += _testcapi.SIZEOF_PYGC_HEAD
1010    msg = 'wrong size for %s: got %d, expected %d' \
1011            % (type(o), result, size)
1012    test.assertEqual(result, size, msg)
1013
1014
1015#=======================================================================
1016# Decorator for running a function in a different locale, correctly resetting
1017# it afterwards.
1018
1019def run_with_locale(catstr, *locales):
1020    def decorator(func):
1021        def inner(*args, **kwds):
1022            try:
1023                import locale
1024                category = getattr(locale, catstr)
1025                orig_locale = locale.setlocale(category)
1026            except AttributeError:
1027                # if the test author gives us an invalid category string
1028                raise
1029            except:
1030                # cannot retrieve original locale, so do nothing
1031                locale = orig_locale = None
1032            else:
1033                for loc in locales:
1034                    try:
1035                        locale.setlocale(category, loc)
1036                        break
1037                    except:
1038                        pass
1039
1040            # now run the function, resetting the locale on exceptions
1041            try:
1042                return func(*args, **kwds)
1043            finally:
1044                if locale and orig_locale:
1045                    locale.setlocale(category, orig_locale)
1046        inner.func_name = func.func_name
1047        inner.__doc__ = func.__doc__
1048        return inner
1049    return decorator
1050
1051#=======================================================================
1052# Big-memory-test support. Separate from 'resources' because memory use should be configurable.
1053
1054# Some handy shorthands. Note that these are used for byte-limits as well
1055# as size-limits, in the various bigmem tests
1056_1M = 1024*1024
1057_1G = 1024 * _1M
1058_2G = 2 * _1G
1059_4G = 4 * _1G
1060
1061MAX_Py_ssize_t = sys.maxsize
1062
1063def set_memlimit(limit):
1064    global max_memuse
1065    global real_max_memuse
1066    sizes = {
1067        'k': 1024,
1068        'm': _1M,
1069        'g': _1G,
1070        't': 1024*_1G,
1071    }
1072    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
1073                 re.IGNORECASE | re.VERBOSE)
1074    if m is None:
1075        raise ValueError('Invalid memory limit %r' % (limit,))
1076    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
1077    real_max_memuse = memlimit
1078    if memlimit > MAX_Py_ssize_t:
1079        memlimit = MAX_Py_ssize_t
1080    if memlimit < _2G - 1:
1081        raise ValueError('Memory limit %r too low to be useful' % (limit,))
1082    max_memuse = memlimit
1083
1084def bigmemtest(minsize, memuse, overhead=5*_1M):
1085    """Decorator for bigmem tests.
1086
1087    'minsize' is the minimum useful size for the test (in arbitrary,
1088    test-interpreted units.) 'memuse' is the number of 'bytes per size' for
1089    the test, or a good estimate of it. 'overhead' specifies fixed overhead,
1090    independent of the testsize, and defaults to 5Mb.
1091
1092    The decorator tries to guess a good value for 'size' and passes it to
1093    the decorated test function. If minsize * memuse is more than the
1094    allowed memory use (as defined by max_memuse), the test is skipped.
1095    Otherwise, minsize is adjusted upward to use up to max_memuse.
1096    """
1097    def decorator(f):
1098        def wrapper(self):
1099            if not max_memuse:
1100                # If max_memuse is 0 (the default),
1101                # we still want to run the tests with size set to a few kb,
1102                # to make sure they work. We still want to avoid using
1103                # too much memory, though, but we do that noisily.
1104                maxsize = 5147
1105                self.assertFalse(maxsize * memuse + overhead > 20 * _1M)
1106            else:
1107                maxsize = int((max_memuse - overhead) / memuse)
1108                if maxsize < minsize:
1109                    # Really ought to print 'test skipped' or something
1110                    if verbose:
1111                        sys.stderr.write("Skipping %s because of memory "
1112                                         "constraint\n" % (f.__name__,))
1113                    return
1114                # Try to keep some breathing room in memory use
1115                maxsize = max(maxsize - 50 * _1M, minsize)
1116            return f(self, maxsize)
1117        wrapper.minsize = minsize
1118        wrapper.memuse = memuse
1119        wrapper.overhead = overhead
1120        return wrapper
1121    return decorator
1122
1123def precisionbigmemtest(size, memuse, overhead=5*_1M, dry_run=True):
1124    def decorator(f):
1125        def wrapper(self):
1126            if not real_max_memuse:
1127                maxsize = 5147
1128            else:
1129                maxsize = size
1130
1131            if ((real_max_memuse or not dry_run)
1132                and real_max_memuse < maxsize * memuse):
1133                if verbose:
1134                    sys.stderr.write("Skipping %s because of memory "
1135                                     "constraint\n" % (f.__name__,))
1136                return
1137
1138            return f(self, maxsize)
1139        wrapper.size = size
1140        wrapper.memuse = memuse
1141        wrapper.overhead = overhead
1142        return wrapper
1143    return decorator
1144
1145def bigaddrspacetest(f):
1146    """Decorator for tests that fill the address space."""
1147    def wrapper(self):
1148        if max_memuse < MAX_Py_ssize_t:
1149            if verbose:
1150                sys.stderr.write("Skipping %s because of memory "
1151                                 "constraint\n" % (f.__name__,))
1152        else:
1153            return f(self)
1154    return wrapper
1155
1156#=======================================================================
1157# unittest integration.
1158
1159class BasicTestRunner:
1160    def run(self, test):
1161        result = unittest.TestResult()
1162        test(result)
1163        return result
1164
1165def _id(obj):
1166    return obj
1167
1168def requires_resource(resource):
1169    if resource == 'gui' and not _is_gui_available():
1170        return unittest.skip("resource 'gui' is not available")
1171    if is_resource_enabled(resource):
1172        return _id
1173    else:
1174        return unittest.skip("resource {0!r} is not enabled".format(resource))
1175
1176def cpython_only(test):
1177    """
1178    Decorator for tests only applicable on CPython.
1179    """
1180    return impl_detail(cpython=True)(test)
1181
1182def impl_detail(msg=None, **guards):
1183    if check_impl_detail(**guards):
1184        return _id
1185    if msg is None:
1186        guardnames, default = _parse_guards(guards)
1187        if default:
1188            msg = "implementation detail not available on {0}"
1189        else:
1190            msg = "implementation detail specific to {0}"
1191        guardnames = sorted(guardnames.keys())
1192        msg = msg.format(' or '.join(guardnames))
1193    return unittest.skip(msg)
1194
1195def _parse_guards(guards):
1196    # Returns a tuple ({platform_name: run_me}, default_value)
1197    if not guards:
1198        return ({'cpython': True}, False)
1199    is_true = guards.values()[0]
1200    assert guards.values() == [is_true] * len(guards)   # all True or all False
1201    return (guards, not is_true)
1202
1203# Use the following check to guard CPython's implementation-specific tests --
1204# or to run them only on the implementation(s) guarded by the arguments.
1205def check_impl_detail(**guards):
1206    """This function returns True or False depending on the host platform.
1207       Examples:
1208          if check_impl_detail():               # only on CPython (default)
1209          if check_impl_detail(jython=True):    # only on Jython
1210          if check_impl_detail(cpython=False):  # everywhere except on CPython
1211    """
1212    guards, default = _parse_guards(guards)
1213    return guards.get(platform.python_implementation().lower(), default)
1214
1215
1216
1217def _run_suite(suite):
1218    """Run tests from a unittest.TestSuite-derived class."""
1219    if verbose:
1220        runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
1221    else:
1222        runner = BasicTestRunner()
1223
1224    result = runner.run(suite)
1225    if not result.wasSuccessful():
1226        if len(result.errors) == 1 and not result.failures:
1227            err = result.errors[0][1]
1228        elif len(result.failures) == 1 and not result.errors:
1229            err = result.failures[0][1]
1230        else:
1231            err = "multiple errors occurred"
1232            if not verbose:
1233                err += "; run in verbose mode for details"
1234        raise TestFailed(err)
1235
1236
1237def run_unittest(*classes):
1238    """Run tests from unittest.TestCase-derived classes."""
1239    valid_types = (unittest.TestSuite, unittest.TestCase)
1240    suite = unittest.TestSuite()
1241    for cls in classes:
1242        if isinstance(cls, str):
1243            if cls in sys.modules:
1244                suite.addTest(unittest.findTestCases(sys.modules[cls]))
1245            else:
1246                raise ValueError("str arguments must be keys in sys.modules")
1247        elif isinstance(cls, valid_types):
1248            suite.addTest(cls)
1249        else:
1250            suite.addTest(unittest.makeSuite(cls))
1251    _run_suite(suite)
1252
1253#=======================================================================
1254# Check for the presence of docstrings.
1255
1256HAVE_DOCSTRINGS = (check_impl_detail(cpython=False) or
1257                   sys.platform == 'win32' or
1258                   sysconfig.get_config_var('WITH_DOC_STRINGS'))
1259
1260requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
1261                                          "test requires docstrings")
1262
1263
1264#=======================================================================
1265# doctest driver.
1266
1267def run_doctest(module, verbosity=None):
1268    """Run doctest on the given module.  Return (#failures, #tests).
1269
1270    If optional argument verbosity is not specified (or is None), pass
1271    test_support's belief about verbosity on to doctest.  Else doctest's
1272    usual behavior is used (it searches sys.argv for -v).
1273    """
1274
1275    import doctest
1276
1277    if verbosity is None:
1278        verbosity = verbose
1279    else:
1280        verbosity = None
1281
1282    # Direct doctest output (normally just errors) to real stdout; doctest
1283    # output shouldn't be compared by regrtest.
1284    save_stdout = sys.stdout
1285    sys.stdout = get_original_stdout()
1286    try:
1287        f, t = doctest.testmod(module, verbose=verbosity)
1288        if f:
1289            raise TestFailed("%d of %d doctests failed" % (f, t))
1290    finally:
1291        sys.stdout = save_stdout
1292    if verbose:
1293        print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
1294    return f, t
1295
1296#=======================================================================
1297# Threading support to prevent reporting refleaks when running regrtest.py -R
1298
1299# NOTE: we use thread._count() rather than threading.enumerate() (or the
1300# moral equivalent thereof) because a threading.Thread object is still alive
1301# until its __bootstrap() method has returned, even after it has been
1302# unregistered from the threading module.
1303# thread._count(), on the other hand, only gets decremented *after* the
1304# __bootstrap() method has returned, which gives us reliable reference counts
1305# at the end of a test run.
1306
1307def threading_setup():
1308    if thread:
1309        return thread._count(),
1310    else:
1311        return 1,
1312
1313def threading_cleanup(nb_threads):
1314    if not thread:
1315        return
1316
1317    _MAX_COUNT = 10
1318    for count in range(_MAX_COUNT):
1319        n = thread._count()
1320        if n == nb_threads:
1321            break
1322        time.sleep(0.1)
1323    # XXX print a warning in case of failure?
1324
1325def reap_threads(func):
1326    """Use this function when threads are being used.  This will
1327    ensure that the threads are cleaned up even when the test fails.
1328    If threading is unavailable this function does nothing.
1329    """
1330    if not thread:
1331        return func
1332
1333    @functools.wraps(func)
1334    def decorator(*args):
1335        key = threading_setup()
1336        try:
1337            return func(*args)
1338        finally:
1339            threading_cleanup(*key)
1340    return decorator
1341
1342def reap_children():
1343    """Use this function at the end of test_main() whenever sub-processes
1344    are started.  This will help ensure that no extra children (zombies)
1345    stick around to hog resources and create problems when looking
1346    for refleaks.
1347    """
1348
1349    # Reap all our dead child processes so we don't leave zombies around.
1350    # These hog resources and might be causing some of the buildbots to die.
1351    if hasattr(os, 'waitpid'):
1352        any_process = -1
1353        while True:
1354            try:
1355                # This will raise an exception on Windows.  That's ok.
1356                pid, status = os.waitpid(any_process, os.WNOHANG)
1357                if pid == 0:
1358                    break
1359            except:
1360                break
1361
1362@contextlib.contextmanager
1363def swap_attr(obj, attr, new_val):
1364    """Temporary swap out an attribute with a new object.
1365
1366    Usage:
1367        with swap_attr(obj, "attr", 5):
1368            ...
1369
1370        This will set obj.attr to 5 for the duration of the with: block,
1371        restoring the old value at the end of the block. If `attr` doesn't
1372        exist on `obj`, it will be created and then deleted at the end of the
1373        block.
1374    """
1375    if hasattr(obj, attr):
1376        real_val = getattr(obj, attr)
1377        setattr(obj, attr, new_val)
1378        try:
1379            yield
1380        finally:
1381            setattr(obj, attr, real_val)
1382    else:
1383        setattr(obj, attr, new_val)
1384        try:
1385            yield
1386        finally:
1387            delattr(obj, attr)
1388
1389def py3k_bytes(b):
1390    """Emulate the py3k bytes() constructor.
1391
1392    NOTE: This is only a best effort function.
1393    """
1394    try:
1395        # memoryview?
1396        return b.tobytes()
1397    except AttributeError:
1398        try:
1399            # iterable of ints?
1400            return b"".join(chr(x) for x in b)
1401        except TypeError:
1402            return bytes(b)
1403
1404def args_from_interpreter_flags():
1405    """Return a list of command-line arguments reproducing the current
1406    settings in sys.flags."""
1407    import subprocess
1408    return subprocess._args_from_interpreter_flags()
1409
1410def strip_python_stderr(stderr):
1411    """Strip the stderr of a Python process from potential debug output
1412    emitted by the interpreter.
1413
1414    This will typically be run on the result of the communicate() method
1415    of a subprocess.Popen object.
1416    """
1417    stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip()
1418    return stderr
1419