test_support.py revision 0127de0b877600a95871e07aee8a092e9199002a
1"""Supporting definitions for the Python regression tests."""
2
3if __name__ != 'test.test_support':
4    raise ImportError('test_support must be imported from the test package')
5
6import contextlib
7import errno
8import functools
9import gc
10import socket
11import sys
12import os
13import platform
14import shutil
15import warnings
16import unittest
17import importlib
18import UserDict
19import re
20import time
21import struct
22import _testcapi
23import sysconfig
24try:
25    import thread
26except ImportError:
27    thread = None
28
29__all__ = ["Error", "TestFailed", "ResourceDenied", "import_module",
30           "verbose", "use_resources", "max_memuse", "record_original_stdout",
31           "get_original_stdout", "unload", "unlink", "rmtree", "forget",
32           "is_resource_enabled", "requires", "find_unused_port", "bind_port",
33           "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ",
34           "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error",
35           "open_urlresource", "check_warnings", "check_py3k_warnings",
36           "CleanImport", "EnvironmentVarGuard", "captured_output",
37           "captured_stdout", "TransientResource", "transient_internet",
38           "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
39           "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
40           "threading_cleanup", "reap_children", "cpython_only",
41           "check_impl_detail", "get_attribute", "py3k_bytes",
42           "import_fresh_module", "threading_cleanup", "reap_children",
43           "strip_python_stderr"]
44
45class Error(Exception):
46    """Base class for regression test exceptions."""
47
48class TestFailed(Error):
49    """Test failed."""
50
51class ResourceDenied(unittest.SkipTest):
52    """Test skipped because it requested a disallowed resource.
53
54    This is raised when a test calls requires() for a resource that
55    has not been enabled.  It is used to distinguish between expected
56    and unexpected skips.
57    """
58
59@contextlib.contextmanager
60def _ignore_deprecated_imports(ignore=True):
61    """Context manager to suppress package and module deprecation
62    warnings when importing them.
63
64    If ignore is False, this context manager has no effect."""
65    if ignore:
66        with warnings.catch_warnings():
67            warnings.filterwarnings("ignore", ".+ (module|package)",
68                                    DeprecationWarning)
69            yield
70    else:
71        yield
72
73
74def import_module(name, deprecated=False):
75    """Import and return the module to be tested, raising SkipTest if
76    it is not available.
77
78    If deprecated is True, any module or package deprecation messages
79    will be suppressed."""
80    with _ignore_deprecated_imports(deprecated):
81        try:
82            return importlib.import_module(name)
83        except ImportError, msg:
84            raise unittest.SkipTest(str(msg))
85
86
87def _save_and_remove_module(name, orig_modules):
88    """Helper function to save and remove a module from sys.modules
89
90       Raise ImportError if the module can't be imported."""
91    # try to import the module and raise an error if it can't be imported
92    if name not in sys.modules:
93        __import__(name)
94        del sys.modules[name]
95    for modname in list(sys.modules):
96        if modname == name or modname.startswith(name + '.'):
97            orig_modules[modname] = sys.modules[modname]
98            del sys.modules[modname]
99
100def _save_and_block_module(name, orig_modules):
101    """Helper function to save and block a module in sys.modules
102
103       Return True if the module was in sys.modules, False otherwise."""
104    saved = True
105    try:
106        orig_modules[name] = sys.modules[name]
107    except KeyError:
108        saved = False
109    sys.modules[name] = None
110    return saved
111
112
113def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
114    """Imports and returns a module, deliberately bypassing the sys.modules cache
115    and importing a fresh copy of the module. Once the import is complete,
116    the sys.modules cache is restored to its original state.
117
118    Modules named in fresh are also imported anew if needed by the import.
119    If one of these modules can't be imported, None is returned.
120
121    Importing of modules named in blocked is prevented while the fresh import
122    takes place.
123
124    If deprecated is True, any module or package deprecation messages
125    will be suppressed."""
126    # NOTE: test_heapq, test_json, and test_warnings include extra sanity
127    # checks to make sure that this utility function is working as expected
128    with _ignore_deprecated_imports(deprecated):
129        # Keep track of modules saved for later restoration as well
130        # as those which just need a blocking entry removed
131        orig_modules = {}
132        names_to_remove = []
133        _save_and_remove_module(name, orig_modules)
134        try:
135            for fresh_name in fresh:
136                _save_and_remove_module(fresh_name, orig_modules)
137            for blocked_name in blocked:
138                if not _save_and_block_module(blocked_name, orig_modules):
139                    names_to_remove.append(blocked_name)
140            fresh_module = importlib.import_module(name)
141        except ImportError:
142            fresh_module = None
143        finally:
144            for orig_name, module in orig_modules.items():
145                sys.modules[orig_name] = module
146            for name_to_remove in names_to_remove:
147                del sys.modules[name_to_remove]
148        return fresh_module
149
150
151def get_attribute(obj, name):
152    """Get an attribute, raising SkipTest if AttributeError is raised."""
153    try:
154        attribute = getattr(obj, name)
155    except AttributeError:
156        raise unittest.SkipTest("module %s has no attribute %s" % (
157            obj.__name__, name))
158    else:
159        return attribute
160
161
162verbose = 1              # Flag set to 0 by regrtest.py
163use_resources = None     # Flag set to [] by regrtest.py
164max_memuse = 0           # Disable bigmem tests (they will still be run with
165                         # small sizes, to make sure they work.)
166real_max_memuse = 0
167
168# _original_stdout is meant to hold stdout at the time regrtest began.
169# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
170# The point is to have some flavor of stdout the user can actually see.
171_original_stdout = None
172def record_original_stdout(stdout):
173    global _original_stdout
174    _original_stdout = stdout
175
176def get_original_stdout():
177    return _original_stdout or sys.stdout
178
179def unload(name):
180    try:
181        del sys.modules[name]
182    except KeyError:
183        pass
184
185if sys.platform.startswith("win"):
186    def _waitfor(func, pathname, waitall=False):
187        # Peform the operation
188        func(pathname)
189        # Now setup the wait loop
190        if waitall:
191            dirname = pathname
192        else:
193            dirname, name = os.path.split(pathname)
194            dirname = dirname or '.'
195        # Check for `pathname` to be removed from the filesystem.
196        # The exponential backoff of the timeout amounts to a total
197        # of ~1 second after which the deletion is probably an error
198        # anyway.
199        # Testing on a i7@4.3GHz shows that usually only 1 iteration is
200        # required when contention occurs.
201        timeout = 0.001
202        while timeout < 1.0:
203            # Note we are only testing for the existance of the file(s) in
204            # the contents of the directory regardless of any security or
205            # access rights.  If we have made it this far, we have sufficient
206            # permissions to do that much using Python's equivalent of the
207            # Windows API FindFirstFile.
208            # Other Windows APIs can fail or give incorrect results when
209            # dealing with files that are pending deletion.
210            L = os.listdir(dirname)
211            if not (L if waitall else name in L):
212                return
213            # Increase the timeout and try again
214            time.sleep(timeout)
215            timeout *= 2
216        warnings.warn('tests may fail, delete still pending for ' + pathname,
217                      RuntimeWarning, stacklevel=4)
218
219    def _unlink(filename):
220        _waitfor(os.unlink, filename)
221
222    def _rmdir(dirname):
223        _waitfor(os.rmdir, dirname)
224
225    def _rmtree(path):
226        def _rmtree_inner(path):
227            for name in os.listdir(path):
228                fullname = os.path.join(path, name)
229                if os.path.isdir(fullname):
230                    _waitfor(_rmtree_inner, fullname, waitall=True)
231                    os.rmdir(fullname)
232                else:
233                    os.unlink(fullname)
234        _waitfor(_rmtree_inner, path, waitall=True)
235        _waitfor(os.rmdir, path)
236else:
237    _unlink = os.unlink
238    _rmdir = os.rmdir
239    _rmtree = shutil.rmtree
240
241def unlink(filename):
242    try:
243        _unlink(filename)
244    except OSError:
245        pass
246
247def rmdir(dirname):
248    try:
249        _rmdir(dirname)
250    except OSError as error:
251        # The directory need not exist.
252        if error.errno != errno.ENOENT:
253            raise
254
255def rmtree(path):
256    try:
257        _rmtree(path)
258    except OSError, e:
259        # Unix returns ENOENT, Windows returns ESRCH.
260        if e.errno not in (errno.ENOENT, errno.ESRCH):
261            raise
262
263def forget(modname):
264    '''"Forget" a module was ever imported by removing it from sys.modules and
265    deleting any .pyc and .pyo files.'''
266    unload(modname)
267    for dirname in sys.path:
268        unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
269        # Deleting the .pyo file cannot be within the 'try' for the .pyc since
270        # the chance exists that there is no .pyc (and thus the 'try' statement
271        # is exited) but there is a .pyo file.
272        unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
273
274def is_resource_enabled(resource):
275    """Test whether a resource is enabled.  Known resources are set by
276    regrtest.py."""
277    return use_resources is not None and resource in use_resources
278
279def requires(resource, msg=None):
280    """Raise ResourceDenied if the specified resource is not available.
281
282    If the caller's module is __main__ then automatically return True.  The
283    possibility of False being returned occurs when regrtest.py is executing."""
284    # see if the caller's module is __main__ - if so, treat as if
285    # the resource was set
286    if sys._getframe(1).f_globals.get("__name__") == "__main__":
287        return
288    if not is_resource_enabled(resource):
289        if msg is None:
290            msg = "Use of the `%s' resource not enabled" % resource
291        raise ResourceDenied(msg)
292
293HOST = 'localhost'
294
295def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
296    """Returns an unused port that should be suitable for binding.  This is
297    achieved by creating a temporary socket with the same family and type as
298    the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
299    the specified host address (defaults to 0.0.0.0) with the port set to 0,
300    eliciting an unused ephemeral port from the OS.  The temporary socket is
301    then closed and deleted, and the ephemeral port is returned.
302
303    Either this method or bind_port() should be used for any tests where a
304    server socket needs to be bound to a particular port for the duration of
305    the test.  Which one to use depends on whether the calling code is creating
306    a python socket, or if an unused port needs to be provided in a constructor
307    or passed to an external program (i.e. the -accept argument to openssl's
308    s_server mode).  Always prefer bind_port() over find_unused_port() where
309    possible.  Hard coded ports should *NEVER* be used.  As soon as a server
310    socket is bound to a hard coded port, the ability to run multiple instances
311    of the test simultaneously on the same host is compromised, which makes the
312    test a ticking time bomb in a buildbot environment. On Unix buildbots, this
313    may simply manifest as a failed test, which can be recovered from without
314    intervention in most cases, but on Windows, the entire python process can
315    completely and utterly wedge, requiring someone to log in to the buildbot
316    and manually kill the affected process.
317
318    (This is easy to reproduce on Windows, unfortunately, and can be traced to
319    the SO_REUSEADDR socket option having different semantics on Windows versus
320    Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
321    listen and then accept connections on identical host/ports.  An EADDRINUSE
322    socket.error will be raised at some point (depending on the platform and
323    the order bind and listen were called on each socket).
324
325    However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
326    will ever be raised when attempting to bind two identical host/ports. When
327    accept() is called on each socket, the second caller's process will steal
328    the port from the first caller, leaving them both in an awkwardly wedged
329    state where they'll no longer respond to any signals or graceful kills, and
330    must be forcibly killed via OpenProcess()/TerminateProcess().
331
332    The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
333    instead of SO_REUSEADDR, which effectively affords the same semantics as
334    SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
335    Source world compared to Windows ones, this is a common mistake.  A quick
336    look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
337    openssl.exe is called with the 's_server' option, for example. See
338    http://bugs.python.org/issue2550 for more info.  The following site also
339    has a very thorough description about the implications of both REUSEADDR
340    and EXCLUSIVEADDRUSE on Windows:
341    http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
342
343    XXX: although this approach is a vast improvement on previous attempts to
344    elicit unused ports, it rests heavily on the assumption that the ephemeral
345    port returned to us by the OS won't immediately be dished back out to some
346    other process when we close and delete our temporary socket but before our
347    calling code has a chance to bind the returned port.  We can deal with this
348    issue if/when we come across it."""
349    tempsock = socket.socket(family, socktype)
350    port = bind_port(tempsock)
351    tempsock.close()
352    del tempsock
353    return port
354
355def bind_port(sock, host=HOST):
356    """Bind the socket to a free port and return the port number.  Relies on
357    ephemeral ports in order to ensure we are using an unbound port.  This is
358    important as many tests may be running simultaneously, especially in a
359    buildbot environment.  This method raises an exception if the sock.family
360    is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
361    or SO_REUSEPORT set on it.  Tests should *never* set these socket options
362    for TCP/IP sockets.  The only case for setting these options is testing
363    multicasting via multiple UDP sockets.
364
365    Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
366    on Windows), it will be set on the socket.  This will prevent anyone else
367    from bind()'ing to our host/port for the duration of the test.
368    """
369    if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
370        if hasattr(socket, 'SO_REUSEADDR'):
371            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
372                raise TestFailed("tests should never set the SO_REUSEADDR "   \
373                                 "socket option on TCP/IP sockets!")
374        if hasattr(socket, 'SO_REUSEPORT'):
375            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
376                raise TestFailed("tests should never set the SO_REUSEPORT "   \
377                                 "socket option on TCP/IP sockets!")
378        if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
379            sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
380
381    sock.bind((host, 0))
382    port = sock.getsockname()[1]
383    return port
384
385FUZZ = 1e-6
386
387def fcmp(x, y): # fuzzy comparison function
388    if isinstance(x, float) or isinstance(y, float):
389        try:
390            fuzz = (abs(x) + abs(y)) * FUZZ
391            if abs(x-y) <= fuzz:
392                return 0
393        except:
394            pass
395    elif type(x) == type(y) and isinstance(x, (tuple, list)):
396        for i in range(min(len(x), len(y))):
397            outcome = fcmp(x[i], y[i])
398            if outcome != 0:
399                return outcome
400        return (len(x) > len(y)) - (len(x) < len(y))
401    return (x > y) - (x < y)
402
403try:
404    unicode
405    have_unicode = True
406except NameError:
407    have_unicode = False
408
409is_jython = sys.platform.startswith('java')
410
411# Filename used for testing
412if os.name == 'java':
413    # Jython disallows @ in module names
414    TESTFN = '$test'
415elif os.name == 'riscos':
416    TESTFN = 'testfile'
417else:
418    TESTFN = '@test'
419    # Unicode name only used if TEST_FN_ENCODING exists for the platform.
420    if have_unicode:
421        # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
422        # TESTFN_UNICODE is a filename that can be encoded using the
423        # file system encoding, but *not* with the default (ascii) encoding
424        if isinstance('', unicode):
425            # python -U
426            # XXX perhaps unicode() should accept Unicode strings?
427            TESTFN_UNICODE = "@test-\xe0\xf2"
428        else:
429            # 2 latin characters.
430            TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
431        TESTFN_ENCODING = sys.getfilesystemencoding()
432        # TESTFN_UNENCODABLE is a filename that should *not* be
433        # able to be encoded by *either* the default or filesystem encoding.
434        # This test really only makes sense on Windows NT platforms
435        # which have special Unicode support in posixmodule.
436        if (not hasattr(sys, "getwindowsversion") or
437                sys.getwindowsversion()[3] < 2): #  0=win32s or 1=9x/ME
438            TESTFN_UNENCODABLE = None
439        else:
440            # Japanese characters (I think - from bug 846133)
441            TESTFN_UNENCODABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
442            try:
443                # XXX - Note - should be using TESTFN_ENCODING here - but for
444                # Windows, "mbcs" currently always operates as if in
445                # errors=ignore' mode - hence we get '?' characters rather than
446                # the exception.  'Latin1' operates as we expect - ie, fails.
447                # See [ 850997 ] mbcs encoding ignores errors
448                TESTFN_UNENCODABLE.encode("Latin1")
449            except UnicodeEncodeError:
450                pass
451            else:
452                print \
453                'WARNING: The filename %r CAN be encoded by the filesystem.  ' \
454                'Unicode filename tests may not be effective' \
455                % TESTFN_UNENCODABLE
456
457
458# Disambiguate TESTFN for parallel testing, while letting it remain a valid
459# module name.
460TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
461
462# Save the initial cwd
463SAVEDCWD = os.getcwd()
464
465@contextlib.contextmanager
466def temp_cwd(name='tempcwd', quiet=False):
467    """
468    Context manager that creates a temporary directory and set it as CWD.
469
470    The new CWD is created in the current directory and it's named *name*.
471    If *quiet* is False (default) and it's not possible to create or change
472    the CWD, an error is raised.  If it's True, only a warning is raised
473    and the original CWD is used.
474    """
475    if have_unicode and isinstance(name, unicode):
476        try:
477            name = name.encode(sys.getfilesystemencoding() or 'ascii')
478        except UnicodeEncodeError:
479            if not quiet:
480                raise unittest.SkipTest('unable to encode the cwd name with '
481                                        'the filesystem encoding.')
482    saved_dir = os.getcwd()
483    is_temporary = False
484    try:
485        os.mkdir(name)
486        os.chdir(name)
487        is_temporary = True
488    except OSError:
489        if not quiet:
490            raise
491        warnings.warn('tests may fail, unable to change the CWD to ' + name,
492                      RuntimeWarning, stacklevel=3)
493    try:
494        yield os.getcwd()
495    finally:
496        os.chdir(saved_dir)
497        if is_temporary:
498            rmtree(name)
499
500
501def findfile(file, here=__file__, subdir=None):
502    """Try to find a file on sys.path and the working directory.  If it is not
503    found the argument passed to the function is returned (this does not
504    necessarily signal failure; could still be the legitimate path)."""
505    if os.path.isabs(file):
506        return file
507    if subdir is not None:
508        file = os.path.join(subdir, file)
509    path = sys.path
510    path = [os.path.dirname(here)] + path
511    for dn in path:
512        fn = os.path.join(dn, file)
513        if os.path.exists(fn): return fn
514    return file
515
516def sortdict(dict):
517    "Like repr(dict), but in sorted order."
518    items = dict.items()
519    items.sort()
520    reprpairs = ["%r: %r" % pair for pair in items]
521    withcommas = ", ".join(reprpairs)
522    return "{%s}" % withcommas
523
524def make_bad_fd():
525    """
526    Create an invalid file descriptor by opening and closing a file and return
527    its fd.
528    """
529    file = open(TESTFN, "wb")
530    try:
531        return file.fileno()
532    finally:
533        file.close()
534        unlink(TESTFN)
535
536def check_syntax_error(testcase, statement):
537    testcase.assertRaises(SyntaxError, compile, statement,
538                          '<test string>', 'exec')
539
540def open_urlresource(url, check=None):
541    import urlparse, urllib2
542
543    filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
544
545    fn = os.path.join(os.path.dirname(__file__), "data", filename)
546
547    def check_valid_file(fn):
548        f = open(fn)
549        if check is None:
550            return f
551        elif check(f):
552            f.seek(0)
553            return f
554        f.close()
555
556    if os.path.exists(fn):
557        f = check_valid_file(fn)
558        if f is not None:
559            return f
560        unlink(fn)
561
562    # Verify the requirement before downloading the file
563    requires('urlfetch')
564
565    print >> get_original_stdout(), '\tfetching %s ...' % url
566    f = urllib2.urlopen(url, timeout=15)
567    try:
568        with open(fn, "wb") as out:
569            s = f.read()
570            while s:
571                out.write(s)
572                s = f.read()
573    finally:
574        f.close()
575
576    f = check_valid_file(fn)
577    if f is not None:
578        return f
579    raise TestFailed('invalid resource "%s"' % fn)
580
581
582class WarningsRecorder(object):
583    """Convenience wrapper for the warnings list returned on
584       entry to the warnings.catch_warnings() context manager.
585    """
586    def __init__(self, warnings_list):
587        self._warnings = warnings_list
588        self._last = 0
589
590    def __getattr__(self, attr):
591        if len(self._warnings) > self._last:
592            return getattr(self._warnings[-1], attr)
593        elif attr in warnings.WarningMessage._WARNING_DETAILS:
594            return None
595        raise AttributeError("%r has no attribute %r" % (self, attr))
596
597    @property
598    def warnings(self):
599        return self._warnings[self._last:]
600
601    def reset(self):
602        self._last = len(self._warnings)
603
604
605def _filterwarnings(filters, quiet=False):
606    """Catch the warnings, then check if all the expected
607    warnings have been raised and re-raise unexpected warnings.
608    If 'quiet' is True, only re-raise the unexpected warnings.
609    """
610    # Clear the warning registry of the calling module
611    # in order to re-raise the warnings.
612    frame = sys._getframe(2)
613    registry = frame.f_globals.get('__warningregistry__')
614    if registry:
615        registry.clear()
616    with warnings.catch_warnings(record=True) as w:
617        # Set filter "always" to record all warnings.  Because
618        # test_warnings swap the module, we need to look up in
619        # the sys.modules dictionary.
620        sys.modules['warnings'].simplefilter("always")
621        yield WarningsRecorder(w)
622    # Filter the recorded warnings
623    reraise = [warning.message for warning in w]
624    missing = []
625    for msg, cat in filters:
626        seen = False
627        for exc in reraise[:]:
628            message = str(exc)
629            # Filter out the matching messages
630            if (re.match(msg, message, re.I) and
631                issubclass(exc.__class__, cat)):
632                seen = True
633                reraise.remove(exc)
634        if not seen and not quiet:
635            # This filter caught nothing
636            missing.append((msg, cat.__name__))
637    if reraise:
638        raise AssertionError("unhandled warning %r" % reraise[0])
639    if missing:
640        raise AssertionError("filter (%r, %s) did not catch any warning" %
641                             missing[0])
642
643
644@contextlib.contextmanager
645def check_warnings(*filters, **kwargs):
646    """Context manager to silence warnings.
647
648    Accept 2-tuples as positional arguments:
649        ("message regexp", WarningCategory)
650
651    Optional argument:
652     - if 'quiet' is True, it does not fail if a filter catches nothing
653        (default True without argument,
654         default False if some filters are defined)
655
656    Without argument, it defaults to:
657        check_warnings(("", Warning), quiet=True)
658    """
659    quiet = kwargs.get('quiet')
660    if not filters:
661        filters = (("", Warning),)
662        # Preserve backward compatibility
663        if quiet is None:
664            quiet = True
665    return _filterwarnings(filters, quiet)
666
667
668@contextlib.contextmanager
669def check_py3k_warnings(*filters, **kwargs):
670    """Context manager to silence py3k warnings.
671
672    Accept 2-tuples as positional arguments:
673        ("message regexp", WarningCategory)
674
675    Optional argument:
676     - if 'quiet' is True, it does not fail if a filter catches nothing
677        (default False)
678
679    Without argument, it defaults to:
680        check_py3k_warnings(("", DeprecationWarning), quiet=False)
681    """
682    if sys.py3kwarning:
683        if not filters:
684            filters = (("", DeprecationWarning),)
685    else:
686        # It should not raise any py3k warning
687        filters = ()
688    return _filterwarnings(filters, kwargs.get('quiet'))
689
690
691class CleanImport(object):
692    """Context manager to force import to return a new module reference.
693
694    This is useful for testing module-level behaviours, such as
695    the emission of a DeprecationWarning on import.
696
697    Use like this:
698
699        with CleanImport("foo"):
700            importlib.import_module("foo") # new reference
701    """
702
703    def __init__(self, *module_names):
704        self.original_modules = sys.modules.copy()
705        for module_name in module_names:
706            if module_name in sys.modules:
707                module = sys.modules[module_name]
708                # It is possible that module_name is just an alias for
709                # another module (e.g. stub for modules renamed in 3.x).
710                # In that case, we also need delete the real module to clear
711                # the import cache.
712                if module.__name__ != module_name:
713                    del sys.modules[module.__name__]
714                del sys.modules[module_name]
715
716    def __enter__(self):
717        return self
718
719    def __exit__(self, *ignore_exc):
720        sys.modules.update(self.original_modules)
721
722
723class EnvironmentVarGuard(UserDict.DictMixin):
724
725    """Class to help protect the environment variable properly.  Can be used as
726    a context manager."""
727
728    def __init__(self):
729        self._environ = os.environ
730        self._changed = {}
731
732    def __getitem__(self, envvar):
733        return self._environ[envvar]
734
735    def __setitem__(self, envvar, value):
736        # Remember the initial value on the first access
737        if envvar not in self._changed:
738            self._changed[envvar] = self._environ.get(envvar)
739        self._environ[envvar] = value
740
741    def __delitem__(self, envvar):
742        # Remember the initial value on the first access
743        if envvar not in self._changed:
744            self._changed[envvar] = self._environ.get(envvar)
745        if envvar in self._environ:
746            del self._environ[envvar]
747
748    def keys(self):
749        return self._environ.keys()
750
751    def set(self, envvar, value):
752        self[envvar] = value
753
754    def unset(self, envvar):
755        del self[envvar]
756
757    def __enter__(self):
758        return self
759
760    def __exit__(self, *ignore_exc):
761        for (k, v) in self._changed.items():
762            if v is None:
763                if k in self._environ:
764                    del self._environ[k]
765            else:
766                self._environ[k] = v
767        os.environ = self._environ
768
769
770class DirsOnSysPath(object):
771    """Context manager to temporarily add directories to sys.path.
772
773    This makes a copy of sys.path, appends any directories given
774    as positional arguments, then reverts sys.path to the copied
775    settings when the context ends.
776
777    Note that *all* sys.path modifications in the body of the
778    context manager, including replacement of the object,
779    will be reverted at the end of the block.
780    """
781
782    def __init__(self, *paths):
783        self.original_value = sys.path[:]
784        self.original_object = sys.path
785        sys.path.extend(paths)
786
787    def __enter__(self):
788        return self
789
790    def __exit__(self, *ignore_exc):
791        sys.path = self.original_object
792        sys.path[:] = self.original_value
793
794
795class TransientResource(object):
796
797    """Raise ResourceDenied if an exception is raised while the context manager
798    is in effect that matches the specified exception and attributes."""
799
800    def __init__(self, exc, **kwargs):
801        self.exc = exc
802        self.attrs = kwargs
803
804    def __enter__(self):
805        return self
806
807    def __exit__(self, type_=None, value=None, traceback=None):
808        """If type_ is a subclass of self.exc and value has attributes matching
809        self.attrs, raise ResourceDenied.  Otherwise let the exception
810        propagate (if any)."""
811        if type_ is not None and issubclass(self.exc, type_):
812            for attr, attr_value in self.attrs.iteritems():
813                if not hasattr(value, attr):
814                    break
815                if getattr(value, attr) != attr_value:
816                    break
817            else:
818                raise ResourceDenied("an optional resource is not available")
819
820
821@contextlib.contextmanager
822def transient_internet(resource_name, timeout=30.0, errnos=()):
823    """Return a context manager that raises ResourceDenied when various issues
824    with the Internet connection manifest themselves as exceptions."""
825    default_errnos = [
826        ('ECONNREFUSED', 111),
827        ('ECONNRESET', 104),
828        ('EHOSTUNREACH', 113),
829        ('ENETUNREACH', 101),
830        ('ETIMEDOUT', 110),
831    ]
832    default_gai_errnos = [
833        ('EAI_AGAIN', -3),
834        ('EAI_FAIL', -4),
835        ('EAI_NONAME', -2),
836        ('EAI_NODATA', -5),
837        # Windows defines EAI_NODATA as 11001 but idiotic getaddrinfo()
838        # implementation actually returns WSANO_DATA i.e. 11004.
839        ('WSANO_DATA', 11004),
840    ]
841
842    denied = ResourceDenied("Resource '%s' is not available" % resource_name)
843    captured_errnos = errnos
844    gai_errnos = []
845    if not captured_errnos:
846        captured_errnos = [getattr(errno, name, num)
847                           for (name, num) in default_errnos]
848        gai_errnos = [getattr(socket, name, num)
849                      for (name, num) in default_gai_errnos]
850
851    def filter_error(err):
852        n = getattr(err, 'errno', None)
853        if (isinstance(err, socket.timeout) or
854            (isinstance(err, socket.gaierror) and n in gai_errnos) or
855            n in captured_errnos):
856            if not verbose:
857                sys.stderr.write(denied.args[0] + "\n")
858            raise denied
859
860    old_timeout = socket.getdefaulttimeout()
861    try:
862        if timeout is not None:
863            socket.setdefaulttimeout(timeout)
864        yield
865    except IOError as err:
866        # urllib can wrap original socket errors multiple times (!), we must
867        # unwrap to get at the original error.
868        while True:
869            a = err.args
870            if len(a) >= 1 and isinstance(a[0], IOError):
871                err = a[0]
872            # The error can also be wrapped as args[1]:
873            #    except socket.error as msg:
874            #        raise IOError('socket error', msg).with_traceback(sys.exc_info()[2])
875            elif len(a) >= 2 and isinstance(a[1], IOError):
876                err = a[1]
877            else:
878                break
879        filter_error(err)
880        raise
881    # XXX should we catch generic exceptions and look for their
882    # __cause__ or __context__?
883    finally:
884        socket.setdefaulttimeout(old_timeout)
885
886
887@contextlib.contextmanager
888def captured_output(stream_name):
889    """Return a context manager used by captured_stdout and captured_stdin
890    that temporarily replaces the sys stream *stream_name* with a StringIO."""
891    import StringIO
892    orig_stdout = getattr(sys, stream_name)
893    setattr(sys, stream_name, StringIO.StringIO())
894    try:
895        yield getattr(sys, stream_name)
896    finally:
897        setattr(sys, stream_name, orig_stdout)
898
899def captured_stdout():
900    """Capture the output of sys.stdout:
901
902       with captured_stdout() as s:
903           print "hello"
904       self.assertEqual(s.getvalue(), "hello")
905    """
906    return captured_output("stdout")
907
908def captured_stderr():
909    return captured_output("stderr")
910
911def captured_stdin():
912    return captured_output("stdin")
913
914def gc_collect():
915    """Force as many objects as possible to be collected.
916
917    In non-CPython implementations of Python, this is needed because timely
918    deallocation is not guaranteed by the garbage collector.  (Even in CPython
919    this can be the case in case of reference cycles.)  This means that __del__
920    methods may be called later than expected and weakrefs may remain alive for
921    longer than expected.  This function tries its best to force all garbage
922    objects to disappear.
923    """
924    gc.collect()
925    if is_jython:
926        time.sleep(0.1)
927    gc.collect()
928    gc.collect()
929
930
931_header = '2P'
932if hasattr(sys, "gettotalrefcount"):
933    _header = '2P' + _header
934_vheader = _header + 'P'
935
936def calcobjsize(fmt):
937    return struct.calcsize(_header + fmt + '0P')
938
939def calcvobjsize(fmt):
940    return struct.calcsize(_vheader + fmt + '0P')
941
942
943_TPFLAGS_HAVE_GC = 1<<14
944_TPFLAGS_HEAPTYPE = 1<<9
945
946def check_sizeof(test, o, size):
947    result = sys.getsizeof(o)
948    # add GC header size
949    if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
950        ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
951        size += _testcapi.SIZEOF_PYGC_HEAD
952    msg = 'wrong size for %s: got %d, expected %d' \
953            % (type(o), result, size)
954    test.assertEqual(result, size, msg)
955
956
957#=======================================================================
958# Decorator for running a function in a different locale, correctly resetting
959# it afterwards.
960
961def run_with_locale(catstr, *locales):
962    def decorator(func):
963        def inner(*args, **kwds):
964            try:
965                import locale
966                category = getattr(locale, catstr)
967                orig_locale = locale.setlocale(category)
968            except AttributeError:
969                # if the test author gives us an invalid category string
970                raise
971            except:
972                # cannot retrieve original locale, so do nothing
973                locale = orig_locale = None
974            else:
975                for loc in locales:
976                    try:
977                        locale.setlocale(category, loc)
978                        break
979                    except:
980                        pass
981
982            # now run the function, resetting the locale on exceptions
983            try:
984                return func(*args, **kwds)
985            finally:
986                if locale and orig_locale:
987                    locale.setlocale(category, orig_locale)
988        inner.func_name = func.func_name
989        inner.__doc__ = func.__doc__
990        return inner
991    return decorator
992
993#=======================================================================
994# Big-memory-test support. Separate from 'resources' because memory use should be configurable.
995
996# Some handy shorthands. Note that these are used for byte-limits as well
997# as size-limits, in the various bigmem tests
998_1M = 1024*1024
999_1G = 1024 * _1M
1000_2G = 2 * _1G
1001_4G = 4 * _1G
1002
1003MAX_Py_ssize_t = sys.maxsize
1004
1005def set_memlimit(limit):
1006    global max_memuse
1007    global real_max_memuse
1008    sizes = {
1009        'k': 1024,
1010        'm': _1M,
1011        'g': _1G,
1012        't': 1024*_1G,
1013    }
1014    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
1015                 re.IGNORECASE | re.VERBOSE)
1016    if m is None:
1017        raise ValueError('Invalid memory limit %r' % (limit,))
1018    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
1019    real_max_memuse = memlimit
1020    if memlimit > MAX_Py_ssize_t:
1021        memlimit = MAX_Py_ssize_t
1022    if memlimit < _2G - 1:
1023        raise ValueError('Memory limit %r too low to be useful' % (limit,))
1024    max_memuse = memlimit
1025
1026def bigmemtest(minsize, memuse, overhead=5*_1M):
1027    """Decorator for bigmem tests.
1028
1029    'minsize' is the minimum useful size for the test (in arbitrary,
1030    test-interpreted units.) 'memuse' is the number of 'bytes per size' for
1031    the test, or a good estimate of it. 'overhead' specifies fixed overhead,
1032    independent of the testsize, and defaults to 5Mb.
1033
1034    The decorator tries to guess a good value for 'size' and passes it to
1035    the decorated test function. If minsize * memuse is more than the
1036    allowed memory use (as defined by max_memuse), the test is skipped.
1037    Otherwise, minsize is adjusted upward to use up to max_memuse.
1038    """
1039    def decorator(f):
1040        def wrapper(self):
1041            if not max_memuse:
1042                # If max_memuse is 0 (the default),
1043                # we still want to run the tests with size set to a few kb,
1044                # to make sure they work. We still want to avoid using
1045                # too much memory, though, but we do that noisily.
1046                maxsize = 5147
1047                self.assertFalse(maxsize * memuse + overhead > 20 * _1M)
1048            else:
1049                maxsize = int((max_memuse - overhead) / memuse)
1050                if maxsize < minsize:
1051                    # Really ought to print 'test skipped' or something
1052                    if verbose:
1053                        sys.stderr.write("Skipping %s because of memory "
1054                                         "constraint\n" % (f.__name__,))
1055                    return
1056                # Try to keep some breathing room in memory use
1057                maxsize = max(maxsize - 50 * _1M, minsize)
1058            return f(self, maxsize)
1059        wrapper.minsize = minsize
1060        wrapper.memuse = memuse
1061        wrapper.overhead = overhead
1062        return wrapper
1063    return decorator
1064
1065def precisionbigmemtest(size, memuse, overhead=5*_1M):
1066    def decorator(f):
1067        def wrapper(self):
1068            if not real_max_memuse:
1069                maxsize = 5147
1070            else:
1071                maxsize = size
1072
1073                if real_max_memuse and real_max_memuse < maxsize * memuse:
1074                    if verbose:
1075                        sys.stderr.write("Skipping %s because of memory "
1076                                         "constraint\n" % (f.__name__,))
1077                    return
1078
1079            return f(self, maxsize)
1080        wrapper.size = size
1081        wrapper.memuse = memuse
1082        wrapper.overhead = overhead
1083        return wrapper
1084    return decorator
1085
1086def bigaddrspacetest(f):
1087    """Decorator for tests that fill the address space."""
1088    def wrapper(self):
1089        if max_memuse < MAX_Py_ssize_t:
1090            if verbose:
1091                sys.stderr.write("Skipping %s because of memory "
1092                                 "constraint\n" % (f.__name__,))
1093        else:
1094            return f(self)
1095    return wrapper
1096
1097#=======================================================================
1098# unittest integration.
1099
1100class BasicTestRunner:
1101    def run(self, test):
1102        result = unittest.TestResult()
1103        test(result)
1104        return result
1105
1106def _id(obj):
1107    return obj
1108
1109def requires_resource(resource):
1110    if is_resource_enabled(resource):
1111        return _id
1112    else:
1113        return unittest.skip("resource {0!r} is not enabled".format(resource))
1114
1115def cpython_only(test):
1116    """
1117    Decorator for tests only applicable on CPython.
1118    """
1119    return impl_detail(cpython=True)(test)
1120
1121def impl_detail(msg=None, **guards):
1122    if check_impl_detail(**guards):
1123        return _id
1124    if msg is None:
1125        guardnames, default = _parse_guards(guards)
1126        if default:
1127            msg = "implementation detail not available on {0}"
1128        else:
1129            msg = "implementation detail specific to {0}"
1130        guardnames = sorted(guardnames.keys())
1131        msg = msg.format(' or '.join(guardnames))
1132    return unittest.skip(msg)
1133
1134def _parse_guards(guards):
1135    # Returns a tuple ({platform_name: run_me}, default_value)
1136    if not guards:
1137        return ({'cpython': True}, False)
1138    is_true = guards.values()[0]
1139    assert guards.values() == [is_true] * len(guards)   # all True or all False
1140    return (guards, not is_true)
1141
1142# Use the following check to guard CPython's implementation-specific tests --
1143# or to run them only on the implementation(s) guarded by the arguments.
1144def check_impl_detail(**guards):
1145    """This function returns True or False depending on the host platform.
1146       Examples:
1147          if check_impl_detail():               # only on CPython (default)
1148          if check_impl_detail(jython=True):    # only on Jython
1149          if check_impl_detail(cpython=False):  # everywhere except on CPython
1150    """
1151    guards, default = _parse_guards(guards)
1152    return guards.get(platform.python_implementation().lower(), default)
1153
1154
1155
1156def _run_suite(suite):
1157    """Run tests from a unittest.TestSuite-derived class."""
1158    if verbose:
1159        runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
1160    else:
1161        runner = BasicTestRunner()
1162
1163    result = runner.run(suite)
1164    if not result.wasSuccessful():
1165        if len(result.errors) == 1 and not result.failures:
1166            err = result.errors[0][1]
1167        elif len(result.failures) == 1 and not result.errors:
1168            err = result.failures[0][1]
1169        else:
1170            err = "multiple errors occurred"
1171            if not verbose:
1172                err += "; run in verbose mode for details"
1173        raise TestFailed(err)
1174
1175
1176def run_unittest(*classes):
1177    """Run tests from unittest.TestCase-derived classes."""
1178    valid_types = (unittest.TestSuite, unittest.TestCase)
1179    suite = unittest.TestSuite()
1180    for cls in classes:
1181        if isinstance(cls, str):
1182            if cls in sys.modules:
1183                suite.addTest(unittest.findTestCases(sys.modules[cls]))
1184            else:
1185                raise ValueError("str arguments must be keys in sys.modules")
1186        elif isinstance(cls, valid_types):
1187            suite.addTest(cls)
1188        else:
1189            suite.addTest(unittest.makeSuite(cls))
1190    _run_suite(suite)
1191
1192#=======================================================================
1193# Check for the presence of docstrings.
1194
1195HAVE_DOCSTRINGS = (check_impl_detail(cpython=False) or
1196                   sys.platform == 'win32' or
1197                   sysconfig.get_config_var('WITH_DOC_STRINGS'))
1198
1199requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
1200                                          "test requires docstrings")
1201
1202
1203#=======================================================================
1204# doctest driver.
1205
1206def run_doctest(module, verbosity=None):
1207    """Run doctest on the given module.  Return (#failures, #tests).
1208
1209    If optional argument verbosity is not specified (or is None), pass
1210    test_support's belief about verbosity on to doctest.  Else doctest's
1211    usual behavior is used (it searches sys.argv for -v).
1212    """
1213
1214    import doctest
1215
1216    if verbosity is None:
1217        verbosity = verbose
1218    else:
1219        verbosity = None
1220
1221    # Direct doctest output (normally just errors) to real stdout; doctest
1222    # output shouldn't be compared by regrtest.
1223    save_stdout = sys.stdout
1224    sys.stdout = get_original_stdout()
1225    try:
1226        f, t = doctest.testmod(module, verbose=verbosity)
1227        if f:
1228            raise TestFailed("%d of %d doctests failed" % (f, t))
1229    finally:
1230        sys.stdout = save_stdout
1231    if verbose:
1232        print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
1233    return f, t
1234
1235#=======================================================================
1236# Threading support to prevent reporting refleaks when running regrtest.py -R
1237
1238# NOTE: we use thread._count() rather than threading.enumerate() (or the
1239# moral equivalent thereof) because a threading.Thread object is still alive
1240# until its __bootstrap() method has returned, even after it has been
1241# unregistered from the threading module.
1242# thread._count(), on the other hand, only gets decremented *after* the
1243# __bootstrap() method has returned, which gives us reliable reference counts
1244# at the end of a test run.
1245
1246def threading_setup():
1247    if thread:
1248        return thread._count(),
1249    else:
1250        return 1,
1251
1252def threading_cleanup(nb_threads):
1253    if not thread:
1254        return
1255
1256    _MAX_COUNT = 10
1257    for count in range(_MAX_COUNT):
1258        n = thread._count()
1259        if n == nb_threads:
1260            break
1261        time.sleep(0.1)
1262    # XXX print a warning in case of failure?
1263
1264def reap_threads(func):
1265    """Use this function when threads are being used.  This will
1266    ensure that the threads are cleaned up even when the test fails.
1267    If threading is unavailable this function does nothing.
1268    """
1269    if not thread:
1270        return func
1271
1272    @functools.wraps(func)
1273    def decorator(*args):
1274        key = threading_setup()
1275        try:
1276            return func(*args)
1277        finally:
1278            threading_cleanup(*key)
1279    return decorator
1280
1281def reap_children():
1282    """Use this function at the end of test_main() whenever sub-processes
1283    are started.  This will help ensure that no extra children (zombies)
1284    stick around to hog resources and create problems when looking
1285    for refleaks.
1286    """
1287
1288    # Reap all our dead child processes so we don't leave zombies around.
1289    # These hog resources and might be causing some of the buildbots to die.
1290    if hasattr(os, 'waitpid'):
1291        any_process = -1
1292        while True:
1293            try:
1294                # This will raise an exception on Windows.  That's ok.
1295                pid, status = os.waitpid(any_process, os.WNOHANG)
1296                if pid == 0:
1297                    break
1298            except:
1299                break
1300
1301@contextlib.contextmanager
1302def swap_attr(obj, attr, new_val):
1303    """Temporary swap out an attribute with a new object.
1304
1305    Usage:
1306        with swap_attr(obj, "attr", 5):
1307            ...
1308
1309        This will set obj.attr to 5 for the duration of the with: block,
1310        restoring the old value at the end of the block. If `attr` doesn't
1311        exist on `obj`, it will be created and then deleted at the end of the
1312        block.
1313    """
1314    if hasattr(obj, attr):
1315        real_val = getattr(obj, attr)
1316        setattr(obj, attr, new_val)
1317        try:
1318            yield
1319        finally:
1320            setattr(obj, attr, real_val)
1321    else:
1322        setattr(obj, attr, new_val)
1323        try:
1324            yield
1325        finally:
1326            delattr(obj, attr)
1327
1328def py3k_bytes(b):
1329    """Emulate the py3k bytes() constructor.
1330
1331    NOTE: This is only a best effort function.
1332    """
1333    try:
1334        # memoryview?
1335        return b.tobytes()
1336    except AttributeError:
1337        try:
1338            # iterable of ints?
1339            return b"".join(chr(x) for x in b)
1340        except TypeError:
1341            return bytes(b)
1342
1343def args_from_interpreter_flags():
1344    """Return a list of command-line arguments reproducing the current
1345    settings in sys.flags."""
1346    flag_opt_map = {
1347        'bytes_warning': 'b',
1348        'dont_write_bytecode': 'B',
1349        'ignore_environment': 'E',
1350        'no_user_site': 's',
1351        'no_site': 'S',
1352        'optimize': 'O',
1353        'py3k_warning': '3',
1354        'verbose': 'v',
1355    }
1356    args = []
1357    for flag, opt in flag_opt_map.items():
1358        v = getattr(sys.flags, flag)
1359        if v > 0:
1360            args.append('-' + opt * v)
1361    return args
1362
1363def strip_python_stderr(stderr):
1364    """Strip the stderr of a Python process from potential debug output
1365    emitted by the interpreter.
1366
1367    This will typically be run on the result of the communicate() method
1368    of a subprocess.Popen object.
1369    """
1370    stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip()
1371    return stderr
1372